Skip to content

Commit

Permalink
[SPARK-22162][BRANCH-2.2] Executors and the driver should use consist…
Browse files Browse the repository at this point in the history
…ent JobIDs in the RDD commit protocol

I have modified SparkHadoopMapReduceWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses  actual stageId to check whether a stage can be committed unlike before that  it was using executors' jobId to do this check.
In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix.

Author: Reza Safi <rezasafi@cloudera.com>

Closes apache#19886 from rezasafi/stagerdd22.
  • Loading branch information
Reza Safi authored and MatthewRBruce committed Jul 31, 2018
1 parent e4432be commit ce086d7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ object SparkHadoopMapReduceWriter extends Logging {
hadoopConf: Configuration): Unit = {
// Extract context and configuration from RDD.
val sparkContext = rdd.context
val stageId = rdd.id
val commitJobId = rdd.id
val sparkConf = rdd.conf
val conf = new SerializableConfiguration(hadoopConf)

// Set up a job.
val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0)
val jobAttemptId = new TaskAttemptID(jobTrackerId, commitJobId, TaskType.MAP, 0, 0)
val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
val format = jobContext.getOutputFormatClass

Expand All @@ -78,7 +78,7 @@ object SparkHadoopMapReduceWriter extends Logging {

val committer = FileCommitProtocol.instantiate(
className = classOf[HadoopMapReduceCommitProtocol].getName,
jobId = stageId.toString,
jobId = commitJobId.toString,
outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)
Expand All @@ -89,7 +89,7 @@ object SparkHadoopMapReduceWriter extends Logging {
executeTask(
context = context,
jobTrackerId = jobTrackerId,
sparkStageId = context.stageId,
commitJobId = commitJobId,
sparkPartitionId = context.partitionId,
sparkAttemptNumber = context.attemptNumber,
committer = committer,
Expand All @@ -112,15 +112,15 @@ object SparkHadoopMapReduceWriter extends Logging {
private def executeTask[K, V: ClassTag](
context: TaskContext,
jobTrackerId: String,
sparkStageId: Int,
commitJobId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
hadoopConf: Configuration,
outputFormat: Class[_ <: OutputFormat[K, V]],
iterator: Iterator[(K, V)]): TaskCommitMessage = {
// Set up a task.
val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE,
val attemptId = new TaskAttemptID(jobTrackerId, commitJobId, TaskType.REDUCE,
sparkPartitionId, sparkAttemptNumber)
val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
committer.setupTask(taskContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val taskAttemptNumber = TaskContext.get().attemptNumber()
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
val stageId = TaskContext.get().stageId()
val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)

if (canCommit) {
performCommit()
Expand All @@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext => NewJobContext,
OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext}
import org.apache.hadoop.util.Progressable
import org.scalatest.Assertions

import org.apache.spark._
import org.apache.spark.Partitioner
Expand Down Expand Up @@ -524,6 +525,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
}

test("The JobId on the driver and executors should be the same during the commit") {
// Create more than one rdd to mimic stageId not equal to rddId
val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2)
.map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }
.filter { p => p._1 > 0 }
pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
assert(JobID.jobid != -1)
}

test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()
Expand Down Expand Up @@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat {
}
}

class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
def setupJob(j: NewJobContext): Unit = {
JobID.jobid = j.getJobID().getId
}

def needsTaskCommit(t: NewTaskAttempContext): Boolean = false

def setupTask(t: NewTaskAttempContext): Unit = {
val jobId = t.getTaskAttemptID().getJobID().getId
assert(jobId === JobID.jobid)
}

def commitTask(t: NewTaskAttempContext): Unit = {}

def abortTask(t: NewTaskAttempContext): Unit = {}
}

class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {

def checkOutputSpecs(j: NewJobContext): Unit = {}

def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
new NewFakeWriter()
}

def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
new YetAnotherFakeCommitter()
}
}

object JobID {
var jobid = -1
}

class ConfigTestFormat() extends NewFakeFormat() with Configurable {

var setConfCalled = false
Expand Down

0 comments on commit ce086d7

Please sign in to comment.