Skip to content

Commit

Permalink
[SPARK-33230][SQL] Hadoop committers to get unique job ID in "spark.s…
Browse files Browse the repository at this point in the history
…ql.sources.writeJobUUID"

### What changes were proposed in this pull request?

This reinstates the old option `spark.sql.sources.write.jobUUID` to set a unique jobId in the jobconf so that hadoop MR committers have a unique ID which is (a) consistent across tasks and workers and (b) not brittle compared to generated-timestamp job IDs. The latter matches that of what JobID requires, but as they are generated per-thread, may not always be unique within a cluster.

### Why are the changes needed?

If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique ID then any two jobs started within the same second have the same ID, so can clash.

### Does this PR introduce _any_ user-facing change?

Good Q. It is "developer-facing" in the context of anyone writing a committer. But it reinstates a property which was in Spark 1.x and "went away"

### How was this patch tested?

Testing: no test here. You'd have to create a new committer which extracted the value in both job and task(s) and verified consistency. That is possible (with a task output whose records contained the UUID), but it would be pretty convoluted and a high maintenance cost.

Because it's trying to address a race condition, it's hard to regenerate the problem downstream and so verify a fix in a test run...I'll just look at the logs to see what temporary dir is being used in the cluster FS and verify it's a UUID

Closes #30141 from steveloughran/SPARK-33230-jobId.

Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 02fa19f)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
steveloughran authored and dongjoon-hyun committed Oct 26, 2020
1 parent a85d690 commit 5001349
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object FileFormatWriter extends Logging {
fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema)

val description = new WriteJobDescription(
uuid = UUID.randomUUID().toString,
uuid = UUID.randomUUID.toString,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = outputSpec.outputColumns,
Expand Down Expand Up @@ -134,6 +134,10 @@ object FileFormatWriter extends Logging {

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the decription UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)
Expand Down

0 comments on commit 5001349

Please sign in to comment.