-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33230][SQL] Hadoop committers to get unique job ID in "spark.sql.sources.writeJobUUID" #30141
Conversation
…JobUUID" to description.uuid. Change-Id: I5f45f58be17c809f7eb15dc1ab5840eeabb9f955
@dongjoon-hyun FYI |
Moving the generation down into the HadoopMapReduceCommitProtocol so that wherever a job is set up (SQL, RDD) they get a consistent URI. I'm going to modify the S3A Staging committer to have an option which requires the UUID to be set. This can be used as a way to verify that the property is propagating correctly. Consistent setting across jobs and tasks will be inferred simply by whether jobs complete with the expected set of files |
Kubernetes integration test starting |
Kubernetes integration test status success |
Thank you, @steveloughran ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think you can add a test case, @steveloughran ?
@@ -164,6 +164,10 @@ object FileFormatWriter extends Logging { | |||
|
|||
SQLExecution.checkSQLExecutionId(sparkSession) | |||
|
|||
// propagate the decription UUID into the jobs, so that committers | |||
// get an ID guaranteed to be unique. | |||
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean Apache Hadoop S3A committer use this? If not, spark.sql.sources.writeJobUUID
is not used inside Spark 3.x/2.x code base.
As you wrote, in Spark 1.6, it was a part of file name explicitly.
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
...
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It picked it up if set, so yes, it was being used. We've hit a problem where if >1 job kicks off in the same second for that user, the generated app ID is the same for both, so the staging committers end up using the same dir in HDFS. The committers already use the writeJobUUID property if set: restoring the original config option will mean that the shipping artifacts will work
I think what we ended up doing was to generate a UUID in I'm okay adding this. It matches what we do in v2 writes. The new API also passes a UUID in, so I think there is a reasonable precedent for it, even if the committer or writer could generate one itself. |
Test build #130204 has finished for PR 30141 at commit
|
@rdblue I am going to add two things to the committers
|
not easily. Would need a new Hadoop committer (subclass of FileOutputCommitter easiest) which then failed if the option wasn't set on spark queries, and somewhere to put that. If you've got suggestions as to where I could put it & point a test I could work off, I'll do my best. I like ScalaTest. |
cc @sunchao |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @steveloughran and @rdblue .
I agree that this is helpful for Hadoop StagingCommitter
.
Merged to master/3.0/2.4.
…ql.sources.writeJobUUID" ### What changes were proposed in this pull request? This reinstates the old option `spark.sql.sources.write.jobUUID` to set a unique jobId in the jobconf so that hadoop MR committers have a unique ID which is (a) consistent across tasks and workers and (b) not brittle compared to generated-timestamp job IDs. The latter matches that of what JobID requires, but as they are generated per-thread, may not always be unique within a cluster. ### Why are the changes needed? If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique ID then any two jobs started within the same second have the same ID, so can clash. ### Does this PR introduce _any_ user-facing change? Good Q. It is "developer-facing" in the context of anyone writing a committer. But it reinstates a property which was in Spark 1.x and "went away" ### How was this patch tested? Testing: no test here. You'd have to create a new committer which extracted the value in both job and task(s) and verified consistency. That is possible (with a task output whose records contained the UUID), but it would be pretty convoluted and a high maintenance cost. Because it's trying to address a race condition, it's hard to regenerate the problem downstream and so verify a fix in a test run...I'll just look at the logs to see what temporary dir is being used in the cluster FS and verify it's a UUID Closes #30141 from steveloughran/SPARK-33230-jobId. Authored-by: Steve Loughran <stevel@cloudera.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 02fa19f) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ql.sources.writeJobUUID" ### What changes were proposed in this pull request? This reinstates the old option `spark.sql.sources.write.jobUUID` to set a unique jobId in the jobconf so that hadoop MR committers have a unique ID which is (a) consistent across tasks and workers and (b) not brittle compared to generated-timestamp job IDs. The latter matches that of what JobID requires, but as they are generated per-thread, may not always be unique within a cluster. ### Why are the changes needed? If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique ID then any two jobs started within the same second have the same ID, so can clash. ### Does this PR introduce _any_ user-facing change? Good Q. It is "developer-facing" in the context of anyone writing a committer. But it reinstates a property which was in Spark 1.x and "went away" ### How was this patch tested? Testing: no test here. You'd have to create a new committer which extracted the value in both job and task(s) and verified consistency. That is possible (with a task output whose records contained the UUID), but it would be pretty convoluted and a high maintenance cost. Because it's trying to address a race condition, it's hard to regenerate the problem downstream and so verify a fix in a test run...I'll just look at the logs to see what temporary dir is being used in the cluster FS and verify it's a UUID Closes #30141 from steveloughran/SPARK-33230-jobId. Authored-by: Steve Loughran <stevel@cloudera.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 02fa19f) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
cc @cloud-fan |
…ql.sources.writeJobUUID" ### What changes were proposed in this pull request? This reinstates the old option `spark.sql.sources.write.jobUUID` to set a unique jobId in the jobconf so that hadoop MR committers have a unique ID which is (a) consistent across tasks and workers and (b) not brittle compared to generated-timestamp job IDs. The latter matches that of what JobID requires, but as they are generated per-thread, may not always be unique within a cluster. ### Why are the changes needed? If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique ID then any two jobs started within the same second have the same ID, so can clash. ### Does this PR introduce _any_ user-facing change? Good Q. It is "developer-facing" in the context of anyone writing a committer. But it reinstates a property which was in Spark 1.x and "went away" ### How was this patch tested? Testing: no test here. You'd have to create a new committer which extracted the value in both job and task(s) and verified consistency. That is possible (with a task output whose records contained the UUID), but it would be pretty convoluted and a high maintenance cost. Because it's trying to address a race condition, it's hard to regenerate the problem downstream and so verify a fix in a test run...I'll just look at the logs to see what temporary dir is being used in the cluster FS and verify it's a UUID Closes apache#30141 from steveloughran/SPARK-33230-jobId. Authored-by: Steve Loughran <stevel@cloudera.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Thanks! |
What changes were proposed in this pull request?
This reinstates the old option
spark.sql.sources.write.jobUUID
to set a unique jobId in the jobconf so that hadoop MR committers have a unique ID which is (a) consistent across tasks and workers and (b) not brittle compared to generated-timestamp job IDs. The latter matches that of what JobID requires, but as they are generated per-thread, may not always be unique within a cluster.Why are the changes needed?
If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique ID then any two jobs started within the same second have the same ID, so can clash.
Does this PR introduce any user-facing change?
Good Q. It is "developer-facing" in the context of anyone writing a committer. But it reinstates a property which was in Spark 1.x and "went away"
How was this patch tested?
Testing: no test here. You'd have to create a new committer which extracted the value in both job and task(s) and verified consistency. That is possible (with a task output whose records contained the UUID), but it would be pretty convoluted and a high maintenance cost.
Because it's trying to address a race condition, it's hard to regenerate the problem downstream and so verify a fix in a test run...I'll just look at the logs to see what temporary dir is being used in the cluster FS and verify it's a UUID