From ce086d79fdbf99ea3c83f2a329a2ac831fefb8b9 Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Tue, 5 Dec 2017 09:16:22 -0800 Subject: [PATCH] [SPARK-22162][BRANCH-2.2] Executors and the driver should use consistent JobIDs in the RDD commit protocol I have modified SparkHadoopMapReduceWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses actual stageId to check whether a stage can be committed unlike before that it was using executors' jobId to do this check. In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix. Author: Reza Safi Closes #19886 from rezasafi/stagerdd22. --- .../io/SparkHadoopMapReduceWriter.scala | 12 ++--- .../spark/mapred/SparkHadoopMapRedUtil.scala | 5 ++- .../spark/rdd/PairRDDFunctionsSuite.scala | 44 +++++++++++++++++++ 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 376ff9bb19f74..3b0a15848cd3b 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -60,13 +60,13 @@ object SparkHadoopMapReduceWriter extends Logging { hadoopConf: Configuration): Unit = { // Extract context and configuration from RDD. val sparkContext = rdd.context - val stageId = rdd.id + val commitJobId = rdd.id val sparkConf = rdd.conf val conf = new SerializableConfiguration(hadoopConf) // Set up a job. val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date()) - val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0) + val jobAttemptId = new TaskAttemptID(jobTrackerId, commitJobId, TaskType.MAP, 0, 0) val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) val format = jobContext.getOutputFormatClass @@ -78,7 +78,7 @@ object SparkHadoopMapReduceWriter extends Logging { val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, - jobId = stageId.toString, + jobId = commitJobId.toString, outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) @@ -89,7 +89,7 @@ object SparkHadoopMapReduceWriter extends Logging { executeTask( context = context, jobTrackerId = jobTrackerId, - sparkStageId = context.stageId, + commitJobId = commitJobId, sparkPartitionId = context.partitionId, sparkAttemptNumber = context.attemptNumber, committer = committer, @@ -112,7 +112,7 @@ object SparkHadoopMapReduceWriter extends Logging { private def executeTask[K, V: ClassTag]( context: TaskContext, jobTrackerId: String, - sparkStageId: Int, + commitJobId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, @@ -120,7 +120,7 @@ object SparkHadoopMapReduceWriter extends Logging { outputFormat: Class[_ <: OutputFormat[K, V]], iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. - val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE, + val attemptId = new TaskAttemptID(jobTrackerId, commitJobId, TaskType.REDUCE, sparkPartitionId, sparkAttemptNumber) val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId) committer.setupTask(taskContext) diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 607283a306b8f..764735dc4eae7 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() - val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) + val stageId = TaskContext.get().stageId() + val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) if (canCommit) { performCommit() @@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(message) // We need to abort the task so that the driver can reschedule new attempts, if necessary committer.abortTask(mrTaskContext) - throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber) + throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber) } } else { // Speculation is disabled or a user has chosen to manually bypass the commit coordination diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 2820c15c67191..5d4d1cebe997d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext => NewJobContext, OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext} import org.apache.hadoop.util.Progressable +import org.scalatest.Assertions import org.apache.spark._ import org.apache.spark.Partitioner @@ -524,6 +525,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") } + test("The JobId on the driver and executors should be the same during the commit") { + // Create more than one rdd to mimic stageId not equal to rddId + val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2) + .map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) } + .filter { p => p._1 > 0 } + pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored") + assert(JobID.jobid != -1) + } + test("saveAsHadoopFile should respect configured output committers") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) val conf = new JobConf() @@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat { } } +class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions { + def setupJob(j: NewJobContext): Unit = { + JobID.jobid = j.getJobID().getId + } + + def needsTaskCommit(t: NewTaskAttempContext): Boolean = false + + def setupTask(t: NewTaskAttempContext): Unit = { + val jobId = t.getTaskAttemptID().getJobID().getId + assert(jobId === JobID.jobid) + } + + def commitTask(t: NewTaskAttempContext): Unit = {} + + def abortTask(t: NewTaskAttempContext): Unit = {} +} + +class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() { + + def checkOutputSpecs(j: NewJobContext): Unit = {} + + def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { + new NewFakeWriter() + } + + def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = { + new YetAnotherFakeCommitter() + } +} + +object JobID { + var jobid = -1 +} + class ConfigTestFormat() extends NewFakeFormat() with Configurable { var setConfCalled = false