Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24238][SQL] HadoopFsRelation can't append the same table with multi job at the same time #21286

Closed
wants to merge 2 commits into from

Conversation

zheh12
Copy link

@zheh12 zheh12 commented May 10, 2018

What changes were proposed in this pull request?

When there are multiple jobs at the same time append a HadoopFsRelation, there will be an error, there are the following two errors:

  1. A job will succeed, but the data will be wrong and more data than excepted will appear
  2. Other jobs will fail with java.io.FileNotFoundException: Failed to get file status skip_dir/_temporary/0

The main reason for this problem is because multiple job will use the same _temporary directory.

So the core idea of this PR is to create a different temporary directory with jobId for the different Job in the output folder , so that conflicts can be avoided.

How was this patch tested?

I manually tested.

@zheh12 zheh12 changed the title [SPARK-24194] HadoopFsRelation cannot overwrite a path that is also b… [SPARK-24238][SQL] HadoopFsRelation can't append the same table with multi job in the same time May 10, 2018
@zheh12 zheh12 changed the title [SPARK-24238][SQL] HadoopFsRelation can't append the same table with multi job in the same time [SPARK-24238][SQL] HadoopFsRelation can't append the same table with multi job at the same time May 10, 2018
@zheh12
Copy link
Author

zheh12 commented May 10, 2018

cc @cloud-fan @jiangxb1987
Is there some drawbacks for this idea? Please give some advice when you have time.

@zheh12
Copy link
Author

zheh12 commented May 10, 2018

relates to #21257

@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

It seems like we somehow set the job id wrong, and caused different jobs share the same working directory. I don't believe HDFS has this issue by design. Can you look into why jobs share the same working directory in Spark?

@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90463 has finished for PR 21286 at commit b676a36.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zheh12
Copy link
Author

zheh12 commented May 11, 2018

I think the Hadoop design does not allow two jobs to share the same output folder.

Hadoop has a related patch that can partially solve this problem. You can configure the parameters to not clean up the _temporary directory. But I think this is not a good solution.

MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup failure during commitJob.

For this problem, we'd better use different temporary output directories for different jobs, and then copy the files.

However, the current implementation breaks some unit tests. There are two ways to fix it.

  1. Add the check of presence of tempDir in HadoopMapReduceCommitProtocal.commitJob, but this requires an external set FileOutputFormat.setOutputPath(job, s".temp-${commiter.getJobId()}")

  2. Another approach is that we enable the tempDir directory for all HadoopMapReduceCommitProtocal.
      The shield tempDir setting problem, but for all jobs will be one more files move.

cc @cloud-fan. Which do you think is better? Please give me some advice?

@cloud-fan
Copy link
Contributor

cc @steveloughran who I believe is the expert in this area.

@steveloughran
Copy link
Contributor

steveloughran commented May 14, 2018

cc @steveloughran who I believe is the expert in this area.

I suppose "Stepped through the FileOutputCommit operations with a debugger and a pen and paper" counts, given the complexity there. There's still a lot of corner cases which I'm not 100% sure on (or confident that the expectations of the job coordinators are met). Otherwise its more folklore "we did this because of job A failed with error...", plus some experiments with fault injection. I'd point to @rdblue as having put in this work too.

  • Hadoop MR uses the jobID for unique temp paths, which comes from yarn and guaranteed to be unique within the cluster, at least until everything is restarted. See Hadoop committer architecture
  • And to handle job restart, has a temp ID too.

Using a temp dir and then renaming in is ~what the FileOutputCommitter v1 algorithm does

  1. task commit: _temporary/$jobAttemptId/_temporary/$taskID_$taskAttemptID -> _temporary/$jobAttemptId/$taskID
  2. Job commit: list _temporary/$jobAttemptId, move over. This is sequential renaming, slow on very large jobs on HDFS &c, where it's O(files), performance killer on any object store where it's O(data)
  3. The "v2" algorithm avoids this job commit overhead by incrementally committing tasks as they complete, so breaking fundamental assumptions about observability of output and the ability to recover from failure of tasks and jobs.

Adding an extra directory with another rename has some serious issues

  • Completely breaks all the work Ryan and I have done with committers which PUT directly into place in S3, where "place" can include specific partitions with specific conflict resolution
  • Adds another O(files) or O(data) rename process. So doubles the commit time of V1, and for v2 restores the v1 commit overhead, while at least fixing the task commit semantics. Essentially: it reinstates v1, just less efficiently.
  • still has that problem of how to handle failure in object stores (s3, GCS) which don't do atomic directory rename.

Which is why I think it's the wrong solution

Normally Spark rejects work to the destination if it's already there, so only one job will have a temp dir. This conflict will only be an issue if overwrite is allowed, which is going to have other adverse consequences if files with the same name are ever created. If the two jobs commit simultaneously, you'll get a mixture of results. This is partly why the S3A committers insert UUIDs into their filenames by default, the other being S3's lack of update consistency.

Ignoring that little issue, @cloud-fan is right: giving jobs a unique ID should be enough to ensure that FileOutputCommitter does all it's work in isolation.

Any ID known to be unique to all work actively potentially able to write to the same dest dir. Hadoop MR has a strict ordering requirement so that it can attempt to recover from job attempt failures (it looks for temporary/$job_id($job-attempt-id-1)/ to find committed work from the previous attempt). Spark should be able to just create a UUID.

@zheh12 : welcome to the world of distributed commit protocols. My writeup is here. Also check out Gil Vernik's Stocator Paper. Start with those and the source and assume we've all made mistakes...

finally, regarding MAPREDUCE cleanup JIRAs, the most recent is MAPREDUCE-7029. That includes comments from the google team on their store's behaviour.

@zheh12
Copy link
Author

zheh12 commented May 14, 2018

Thanks @cloud-fan @steveloughran for your reply, I will look more detail on this problem.

@cloud-fan
Copy link
Contributor

Thanks @steveloughran for your deep explanation!

Spark does have a unique job id, but it's only unique within a SparkContext, we may have 2 different spark applications writing to the same directory. I think timestamp+uuid should be good enough as a job id. Spark doesn't retry jobs so we can always set job attempt id to 0.

@steveloughran
Copy link
Contributor

that would work. Like you say, no need to worry about job attempt IDs, just uniqueness. If you put the timestamp first, you could still sort the listing by time, which might be good for diagnostics.

Some org.apache.hadoop code snippets do attempt to parse the yarn app attempt strings into numeric job & task IDs in exactly the way they shouldn't. It should already have surfaced if it was a problem in the committer codepaths, but it's worth remembering & maybe replicate in the new IDs.

@steveloughran
Copy link
Contributor

steveloughran commented May 14, 2018

...this makes me think that the FileOutputCommitter actually has an assumption that nobody has called out before, specifically "only one application will be writing data to the target FS with the same job id". It's probably been implicit in MR with a local HDFS for a long time, first on the assumption of all jobs getting unique job Ids from the same central source and nothing outside the cluster writing to the same destinations. With cloud stores, that doesn't hold; it's conceivable that >1 YARN cluster could start jobs with the same dest. As the timestamp of YARN launch is used as the initial part of the identifier, if >1 cluster was launched in the same minute, things are lined up to collide. Oops.

FWIW, the parsing code I mentioned is org.apache.hadoop.mapreduce.JobID.forName(): any numbering scheme spark uses should be able to map from a string to a job ID through that & back again.

@zheh12
Copy link
Author

zheh12 commented May 15, 2018

I think I may not have described this issue clearly.

First of all,the scene of the problem is this.

When multiple applications simultaneously append data to the same parquet datasource table.

They will run simultaneously and share the same output directory.

FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

ouputSepc is the output table directory skip_dir/tab1/

skip_dir/tab1/_temporary will be created as temporary dir.

But once one Job is successfully committed, it will run cleanupJob

Path pendingJobAttemptsPath = getPendingJobAttemptsPath();

fs.delete(pendingJobAttemptsPath, true);

The pendingJobAttemptsPath is skip_dir/tab1/_temporary

Private Path getPendingJobAttemptsPath() {
    Return getPendingJobAttemptsPath(getOutputPath());
}

Private static Path getPendingJobAttemptsPath(Path out) {
    Return new Path(out, PENDING_DIR_NAME);
}

Public static final String PENDING_DIR_NAME = "_temporary";

After the job is committed, skip_dir/tab1/_temporary will be deleted. Then when other jobs attempt to commit, an error will be reported.

Meanwhile, due to all applications share the same app appempt id, they write temporary data to the same temporary dir skip_dir/tab1/_temporary/0. Data committed by the successful application is also corrupted.

@steveloughran
Copy link
Contributor

steveloughran commented May 15, 2018

After the job is committed, skip_dir/tab1/_temporary will be deleted. Then when other jobs attempt to commit, an error will be reported.

I see. Yes, that's org.apache.hadoop.mapreduce.OutputCommitter.cleanupJob() doing the work. It does this as it wants to cleanup all attempts, including predecessors which have failed, and expects only one job to be writing at a time.

Like I said, this proposed patch breaks all the blobstore-specific committer work, causes problems at scale with HDFS alone, and adds a new problem: how do you clean up from failed jobs writing to the same destination?

It's causing these problems because it's using another layer of temp dir and then the rename.

Assuming you only want to work with org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter and subclasses thereof (like the Parquet one), why not

  1. Pick up my SPARK-23977 patch and Hadoop 3.1. There are some problems with hive versioning there, but that is a WiP of mine.
  2. make your own subclass of FileOutputCommitter whose cleanupJob() method doesn't do that full $dest/_temporary dir cleanup, just deletes the current job ID's subdir
  3. Configure the jobs (new) committer factory underneath the FileOutputFormat to return your committer; do the same for parquet via the BindingParquetOutputCommitter.

That way, you get to choose cleanup policy, don't create conflict, don't need to rename things.

There's also the option of providing a MAPREDUCE- patch to add a switch to change cleanup to only purge that job's data...you'd need to make sure all attempts of that job get cleaned up, as MR can make multiple attempts. There's a general fear of going near that class as its such a critical piece of code, but cleanup is not the bit everyone is scared of. Get a change in there and all the file output committer subclasses get it. That'd be for Hadoop 3.2 & 2.10; no need to change anything in spark other than the job ID problem.

No, that's not needed. Once spark has unique job IDs across apps, all that's needed is to set spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped to true, and set up your own workflow to cleanup temp dirs somehow.

Meanwhile, due to all applications share the same app appempt id, they write temporary data to the same temporary dir skip_dir/tab1/_temporary/0. Data committed by the successful application is also corrupted.

that's the issue we've been discussing related to job IDs. If each spark driver comes up with a unique job ID, that conflict will go away. So turn off the full directory cleanup and you will get that multiple job feature. Up to you to make sure that the files generated are unique across jobs though, as job commit is not atomic.

@jinxing64
Copy link

Does Spark have a jobID in writing path? Below path is an example in my debugging log:

parquettest2/_temporary/0/_temporary/attempt_20180515215310_0000_m_000000_0/part-00000-9104445e-e54a-4e3f-9ba4-e624d60e6247-c000.snappy.parquet

parquettest2 is a non-partitioned table. Seems that the jobAttemptId in _temporary/$jobAttemptId/_temporary/$taskID_$taskAttemptID is always 0 if no retry.
If no unique jobID is provided in the writing path, think about below scenario:

1. JobA started and writes data to dir/tab/_temporary/0/_temporary/$taskID_$taskAttemptID
2. JobB started and writes data to dir/tab/_temporary/0/_temporary/$taskID_$taskAttemptID
3. Note that JobA and JobB write data to dir/tab/_temporary/0/_temporary at the same time
4. When JobA commits, all data under dir/tab/_temporary/0/ are commited as the output -- Yes, it's a mixture from both JobA and jobB, the generated data to the target table is incorrect.
5. When JobA commits and cleanup, dir/tab/_temporary/ will be deleted. But at this moment, JobB is not finisehd yet and cann
ot find dir/tab/_temporary/0/ and failed.

If I understand correctly, this pr proposes to add a jobId outside the _temporary and the writing path format is like below:
$jobID/_temporary/$jobAttemptId/_temporary/$taskID_$taskAttemptID.
Thus the change outside committer and doesn't break commiterr's logic.
Did I understand correctly ?

@steveloughran
Copy link
Contributor

steveloughran commented May 16, 2018

@jinxing64 from my reading of the code, the original patch proposed creating a temp dir for every query, which could then do its own work & cleanup in parallel, with a new meta-commit on each job commit, moving stuff from this per-job temp dir into the final dest.

This is to address

  • conflict of work in the _temporary/0 path
  • rm of _temporary in job abort, post-commit cleanup

And the reason for that '0' is that spark's job id is just a counter of queries done from app start, whereas on hadoop MR it's unique for across a live YARN cluster. Spark deploys in different ways, and can't rely on that value.

The job id discussion proposes generating unique job IDs for every spark app, so allowing _temporary/$jobID1 to work alongside _temporary/$jobID2. With that and disabling cleanup in the FileOutputCommitter (mapreduce.fileoutputcommitter.cleanup.skipped), @zheh12 should get what they need: parallel queries to same dest using FileOutputCommitter without conflict of temp data

Thus the change outside committer and doesn't break commiterr's logic. Did I understand correctly ?

Exactly. It also makes it a simpler change, which is good as the commit algorithms are pretty complex and its hard to test all the failure modes.

@jinxing64
Copy link

jinxing64 commented May 16, 2018

@steveloughran Thanks a lot for explanation.
So do I understand correctly that there are only two things need to do:

  1. create a unique jobID, permaps timestamp+uuid?
  2. disable cleanup in FileOutputCommitter mapreduce.fileoutputcommitter.cleanup.skipped

@steveloughran
Copy link
Contributor

@jinxing64 yes, with the detail that the way some bits of hadoop parse a jobattempt, they like it to be an integer. Some random number used as the upper digits of counter could work; it'd still give meaningful job IDs like "45630001" for the first, "45630002", for the process which came up with "4563" as its prefix. Yes, eventually it'll wrap, but that's integers for you.

BTW, the newFileAbsPath code creates the staging dir ".spark-staging-" + jobId. Again, a jobID unique across all processes is enough

… job at the same time

fix unit test error, add some check
@SparkQA
Copy link

SparkQA commented May 19, 2018

Test build #90827 has finished for PR 21286 at commit 49532fe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@medb
Copy link
Contributor

medb commented Feb 22, 2019

ping

@weixiuli
Copy link
Contributor

@zheh12 is there some in progress? thanks.

@cloud-fan
Copy link
Contributor

there is a new proposal in #25863

@weixiuli
Copy link
Contributor

@cloud-fan thank you.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label May 31, 2020
@github-actions github-actions bot closed this Jun 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants