From 0a3bb74b4658c2ee35696732be2a2c48ee1ef06a Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 18 Jan 2017 00:49:02 +0800 Subject: [PATCH 01/20] [SPARK-19263] DAGScheduler should avoid sending conflicting task set. --- .../apache/spark/scheduler/DAGScheduler.scala | 23 ++++- .../org/apache/spark/scheduler/Stage.scala | 2 +- .../scheduler/SchedulerIntegrationSuite.scala | 84 ++++++++++++++++++- 3 files changed, 102 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 69101acb3ac8b..a6508bf22b635 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1193,7 +1193,23 @@ class DAGScheduler( } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - markStageAsFinished(shuffleStage) + // Check if there is active tasks running for other partitions. + val noActiveTasksForOtherPartitions = taskScheduler.rootPool == null || + !taskScheduler.rootPool.getSortedTaskSetQueue.exists { + tsm => + tsm.stageId == stageId && !tsm.isZombie && tsm.copiesRunning.exists { + i => + tsm.tasks(i).partitionId != smt.partitionId && tsm.copiesRunning(i) > 0 + } + } + if (noActiveTasksForOtherPartitions) { + markStageAsFinished(shuffleStage) + } else { + // There can be tasks running for other partitions at this point + // for reasons like fetch failed. If so, should not mark the stage + // as finished. Thus the running tasks' results will be added to + // mapOutputTracker when succeed. + } logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) @@ -1212,8 +1228,9 @@ class DAGScheduler( clearCacheLocs() - if (!shuffleStage.isAvailable) { - // Some tasks had failed; let's resubmit this shuffleStage + if (!shuffleStage.isAvailable && noActiveTasksForOtherPartitions) { + // Some tasks had failed; let's resubmit this shuffleStage. + // Do not resubmit shuffleStage if there is active tasks running. // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index c6fc03812921e..32e5df6d75f4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -74,7 +74,7 @@ private[scheduler] abstract class Stage( val details: String = callSite.longForm /** - * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized + * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts * have been created). diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 398ac3d6202db..2a5d793068149 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.util.{CallSite, ThreadUtils, Utils} /** - * Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets, + * Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets, * TaskSetManagers. * * Test cases are configured by providing a set of jobs to submit, and then simulating interaction @@ -49,7 +49,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa with LocalSparkContext { var taskScheduler: TestTaskScheduler = null - var scheduler: DAGScheduler = null + var scheduler: TestDAGScheduler = null var backend: T = _ override def beforeEach(): Unit = { @@ -77,7 +77,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa backend = sc.schedulerBackend.asInstanceOf[T] taskScheduler = sc.taskScheduler.asInstanceOf[TestTaskScheduler] taskScheduler.initialize(sc.schedulerBackend) - scheduler = new DAGScheduler(sc, taskScheduler) + scheduler = new TestDAGScheduler(sc, taskScheduler) taskScheduler.setDAGScheduler(scheduler) } @@ -519,6 +519,18 @@ class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { } } +/** DAGScheduler that just tracks a tiny bit more state to enable checks in tests. */ +class TestDAGScheduler(sc: SparkContext, taskScheduler: TaskScheduler) + extends DAGScheduler(sc, taskScheduler) { + + val handledCompletionEvent = scala.collection.mutable.ListBuffer[CompletionEvent]() + + override private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = { + super.handleTaskCompletion(event) + handledCompletionEvent += event + } +} + /** * Some very basic tests just to demonstrate the use of the test framework (and verify that it * works). @@ -648,4 +660,70 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor } assertDataStructuresEmpty(noFailure = false) } + + testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") { + val a = new MockRDD(sc, 2, Nil) + val b = shuffle(2, a) + val shuffleId = b.shuffleDeps.head.shuffleId + + def runBackend(): Unit = { + val (taskDescription, task) = backend.beginTask() + task.stageId match { + // ShuffleMapTask + case 0 => + val stageAttempt = task.stageAttemptId + val partitionId = task.partitionId + (stageAttempt, partitionId) match { + case (0, 0) => + val fetchFailed = FetchFailed( + DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored") + backend.taskFailed(taskDescription, fetchFailed) + case (0, 1) => + // Wait until stage resubmission caused by FetchFailed is finished. + waitForCondition(taskScheduler.runningTaskSets.size==2, 5000, + "Wait until stage is resubmitted caused by fetch failed") + + // Task(stageAttempt=0, partition=1) will be bogus, because both two + // tasks(stageAttempt=0, partition=0, 1) run on hostA. + // Pending partitions are (0, 1) after stage resubmission, + // then change to be 0 after this bogus task. + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) + case (1, 1) => + waitForCondition(scheduler.handledCompletionEvent.map(_.task).exists { + task => task.stageAttemptId == 1 && task.partitionId == 0 + }, 5000, "Wait until Success of task(stageAttempt=1 and partition=0)" + + " is handled by DAGScheduler.") + // Task(stageAttempt=1 and partition=0) should not cause stage resubmission, + // even though shuffleStage.pendingPartitions.isEmpty and + // shuffleStage.isAvailable is false. Because the stageAttempt=0 is not finished yet. + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) + case _ => + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) + } + // ResultTask + case 1 => backend.taskSuccess(taskDescription, 10) + } + } + + withBackend(runBackend _) { + val jobFuture = submit(b, (0 until 2).toArray) + val duration = Duration(15, SECONDS) + awaitJobTermination(jobFuture, duration) + } + assert(results === (0 until 2).map { _ -> 10}.toMap) + } + + def waitForCondition(condition: => Boolean, timeout: Long, msg: String): Unit = { + val finishTime = System.currentTimeMillis() + timeout + while (System.currentTimeMillis() < finishTime) { + if (condition) { + return + } + // Sleep rather than using wait/notify, because this is used only for testing and wait/notify + // add overhead in the general case. + Thread.sleep(10) + } + throw new TimeoutException( + s"Condition '$msg' failed to become true before $timeout milliseconds elapsed") + } } From 3d6283ab5f7c4695b988fc37338d634477817098 Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 26 Jan 2017 22:28:33 +0800 Subject: [PATCH 02/20] fix --- .../apache/spark/scheduler/DAGScheduler.scala | 12 +++----- .../spark/scheduler/TaskSetManager.scala | 30 +++++++++---------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a6508bf22b635..5d8983cef4d22 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1191,18 +1191,14 @@ class DAGScheduler( } else { shuffleStage.addOutputLoc(smt.partitionId, status) } - if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { // Check if there is active tasks running for other partitions. - val noActiveTasksForOtherPartitions = taskScheduler.rootPool == null || + val noActiveTaskSetManager = taskScheduler.rootPool == null || !taskScheduler.rootPool.getSortedTaskSetQueue.exists { tsm => - tsm.stageId == stageId && !tsm.isZombie && tsm.copiesRunning.exists { - i => - tsm.tasks(i).partitionId != smt.partitionId && tsm.copiesRunning(i) > 0 - } + tsm.stageId == stageId && !tsm.isZombie } - if (noActiveTasksForOtherPartitions) { + if (noActiveTaskSetManager) { markStageAsFinished(shuffleStage) } else { // There can be tasks running for other partitions at this point @@ -1228,7 +1224,7 @@ class DAGScheduler( clearCacheLocs() - if (!shuffleStage.isAvailable && noActiveTasksForOtherPartitions) { + if (!shuffleStage.isAvailable && noActiveTaskSetManager) { // Some tasks had failed; let's resubmit this shuffleStage. // Do not resubmit shuffleStage if there is active tasks running. // TODO: Lower-level scheduler should also deal with this diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3b25513bea057..e05da9b461a54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -697,21 +697,6 @@ private[spark] class TaskSetManager( val index = info.index info.markFinished(TaskState.FINISHED) removeRunningTask(tid) - // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the - // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not - // "deserialize" the value when holding a lock to avoid blocking other threads. So we call - // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. - // Note: "result.value()" only deserializes the value when it's called at the first time, so - // here "result.value()" just returns the value and won't block other threads. - sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) - // Kill any other attempts for the same task (since those are unnecessary now that one - // attempt completed successfully). - for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { - logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + - s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + - s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") - sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) - } if (!successful(index)) { tasksSuccessful += 1 logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" + @@ -727,6 +712,21 @@ private[spark] class TaskSetManager( " because task " + index + " has already completed successfully") } maybeFinishTaskSet() + // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the + // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not + // "deserialize" the value when holding a lock to avoid blocking other threads. So we call + // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. + // Note: "result.value()" only deserializes the value when it's called at the first time, so + // here "result.value()" just returns the value and won't block other threads. + sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) + // Kill any other attempts for the same task (since those are unnecessary now that one + // attempt completed successfully). + for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") + sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) + } } /** From 1ee5a50fc5cd05887b7a97820291bf7bf1cf1806 Mon Sep 17 00:00:00 2001 From: jinxing Date: Fri, 27 Jan 2017 01:20:21 +0800 Subject: [PATCH 03/20] fix --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5d8983cef4d22..898c5a1003cae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1192,7 +1192,7 @@ class DAGScheduler( shuffleStage.addOutputLoc(smt.partitionId, status) } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - // Check if there is active tasks running for other partitions. + // Check if there is active TaskSetManager. val noActiveTaskSetManager = taskScheduler.rootPool == null || !taskScheduler.rootPool.getSortedTaskSetQueue.exists { tsm => @@ -1201,10 +1201,9 @@ class DAGScheduler( if (noActiveTaskSetManager) { markStageAsFinished(shuffleStage) } else { - // There can be tasks running for other partitions at this point - // for reasons like fetch failed. If so, should not mark the stage - // as finished. Thus the running tasks' results will be added to - // mapOutputTracker when succeed. + // There can be tasks running at this point for reasons like fetch failed. + // If so, should not mark the stage as finished. Thus the running tasks' results + // will be added to mapOutputTracker when succeed. } logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) From e652ad73257da777ea82a224e4c24f1d79b97c60 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 31 Jan 2017 01:09:43 +0800 Subject: [PATCH 04/20] Avoid impacting performance where possible. --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e05da9b461a54..18eda30e99c69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -711,7 +711,6 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } - maybeFinishTaskSet() // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -727,6 +726,7 @@ private[spark] class TaskSetManager( s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) } + maybeFinishTaskSet() } /** From b1abf353e50ebefce9f44e33b0683c700c9853f2 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 1 Feb 2017 10:48:29 +0800 Subject: [PATCH 05/20] Always mark active tasksets as zombie when there are no pending partitions. --- .../apache/spark/scheduler/DAGScheduler.scala | 32 ++++++++++++------- .../scheduler/SchedulerIntegrationSuite.scala | 2 +- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 898c5a1003cae..516763d13e71c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1193,18 +1193,28 @@ class DAGScheduler( } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { // Check if there is active TaskSetManager. - val noActiveTaskSetManager = taskScheduler.rootPool == null || - !taskScheduler.rootPool.getSortedTaskSetQueue.exists { - tsm => - tsm.stageId == stageId && !tsm.isZombie + val activeTaskSetManagerOpt = Option(taskScheduler.rootPool).flatMap { rootPool => + rootPool.getSortedTaskSetQueue.find { tsm => + tsm.stageId == stageId && !tsm.isZombie } - if (noActiveTaskSetManager) { - markStageAsFinished(shuffleStage) - } else { - // There can be tasks running at this point for reasons like fetch failed. - // If so, should not mark the stage as finished. Thus the running tasks' results - // will be added to mapOutputTracker when succeed. } + activeTaskSetManagerOpt.foreach { activeTsm => + // The scheduler thinks we don't need any more partitions for this stage, but there + // is still an active taskset for the stage. This can happen when there are stage + // retries, and we get late task completions from earlier stages. Note that all of + // the map output may or may not be available -- some of those map outputs may have + // been lost. But the most consistent way to make that determination is to end + // the running taskset, and mark the stage as finished. The DAGScheduler will + // automatically determine whether there are still partitions missing that need to + // be resubmitted. + // NOTE: this will get a lock on the TaskScheduler + // TODO yet another instance we should probably kill all running tasks when we abort + // the taskset + logInfo(s"Marking ${activeTsm.name} as zombie, as all map output may already be " + + s"available for this stage (from previous attempts)") + taskScheduler.synchronized { activeTsm.isZombie = true } + } + markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) @@ -1223,7 +1233,7 @@ class DAGScheduler( clearCacheLocs() - if (!shuffleStage.isAvailable && noActiveTaskSetManager) { + if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage. // Do not resubmit shuffleStage if there is active tasks running. // TODO: Lower-level scheduler should also deal with this diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 2a5d793068149..e1fa054a15193 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -680,7 +680,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor backend.taskFailed(taskDescription, fetchFailed) case (0, 1) => // Wait until stage resubmission caused by FetchFailed is finished. - waitForCondition(taskScheduler.runningTaskSets.size==2, 5000, + waitForCondition(taskScheduler.runningTaskSets.size == 2, 5000, "Wait until stage is resubmitted caused by fetch failed") // Task(stageAttempt=0, partition=1) will be bogus, because both two From 5e7f00a1e01d8b6ce4190801be913aaa990ade63 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 1 Feb 2017 10:55:53 +0800 Subject: [PATCH 06/20] fix race in SchedulerIntegrationSuite --- .../org/apache/spark/scheduler/SchedulerIntegrationSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index e1fa054a15193..5fd6b1db1d21f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -402,6 +402,7 @@ private[spark] abstract class MockBackend( val task = taskSet.tasks(taskDescription.index) (taskDescription, task) } + newTasks.foreach { case (taskDescription, _) => executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK } From cec7cc29df5f90fd37538030e1123bbd60d94ccb Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 1 Feb 2017 11:38:50 +0800 Subject: [PATCH 07/20] Update comment for pendingPartitions. --- .../main/scala/org/apache/spark/scheduler/Stage.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 32e5df6d75f4f..3a520143aae51 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -67,6 +67,16 @@ private[scheduler] abstract class Stage( /** Set of jobs that this stage belongs to. */ val jobIds = new HashSet[Int] + /** + * Partitions the [[DAGScheduler]] is waiting on before it tries to mark the stage / job as + * completed and continue. Tasks' successes in both the active taskset or earlier attempts + * for this stage can cause partition ids get removed from pendingPartitions. Finally, note + * that when this is empty, it does not necessarily mean that stage is completed -- Some of + * the map output from that stage may have been lost. But the [[DAGScheduler]] will check for + * this condition and resubmit the stage if necessary. + */ + val pendingPartitions = new HashSet[Int] + /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 From b02d24835311c63ddfbbacd170d14c7299754314 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 1 Feb 2017 11:41:59 +0800 Subject: [PATCH 08/20] small fix --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 516763d13e71c..8c6a8a6c92a8e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1235,7 +1235,6 @@ class DAGScheduler( if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage. - // Do not resubmit shuffleStage if there is active tasks running. // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + From 6009ce86c3a117d2093833cf900aa4a742a0d489 Mon Sep 17 00:00:00 2001 From: jinxing Date: Sat, 4 Feb 2017 17:39:22 +0800 Subject: [PATCH 09/20] Add a unit test in DAGSchedulerSuite --- .../spark/scheduler/DAGSchedulerSuite.scala | 95 ++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4e5f267e237c0..0f833722012f6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -27,8 +27,9 @@ import scala.util.control.NonFatal import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ - import org.apache.spark._ + +import org.apache.spark.TaskState.TaskState import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -2161,6 +2162,98 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("[SPARK-19263] DAGScheduler should avoid sending conflicting task set") { + val mockTaskSchedulerImpl = new TaskSchedulerImpl(sc) { + override def submitTasks(taskSet: TaskSet): Unit = { + super.submitTasks(taskSet) + taskSets += taskSet + } + } + val mockDAGScheduler = new DAGScheduler( + sc, + mockTaskSchedulerImpl, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env + ) { + override def taskEnded( + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + taskInfo: TaskInfo): Unit = { + dagEventProcessLoopTester.post( + CompletionEvent(task, reason, result, accumUpdates, taskInfo)) + } + } + + val mockSchedulerBackend = new SchedulerBackend { + override def stop(): Unit = {} + + override def defaultParallelism(): Int = 2 + + override def reviveOffers(): Unit = {} + + override def start(): Unit = {} + } + + def getTaskSetManagerByTask(task: Task[_]): TaskSetManager = { + val taskSetManagerOpt = mockTaskSchedulerImpl + .taskSetManagerForAttempt(task.stageId, task.stageAttemptId) + assert(taskSetManagerOpt.isDefined) + taskSetManagerOpt.get + } + + def resourceOffer(taskSetManager: TaskSetManager, host: String, execId: String): Unit = { + taskSetManager.resourceOffer(execId, host, TaskLocality.ANY) + } + + def taskSuccessful(tsm: TaskSetManager, task: Task[_], result: Any): Unit = { + val taskIdOpt = tsm.taskInfos.find(_._2.index == task.partitionId) + assert(taskIdOpt.isDefined) + val ser = sc.env.closureSerializer.newInstance() + val directResult = new DirectTaskResult(ser.serialize(result), Seq.empty) + mockTaskSchedulerImpl.handleSuccessfulTask(tsm, taskIdOpt.get._1, directResult) + } + + def taskFailed(tsm: TaskSetManager, task: Task[_], taskFailedReason: TaskFailedReason): Unit = { + val taskIdOpt = tsm.taskInfos.find(_._2.index == task.partitionId) + assert(taskIdOpt.isDefined) + val ser = sc.env.closureSerializer.newInstance() + val data = ser.serialize(taskFailedReason) + mockTaskSchedulerImpl.handleFailedTask(tsm, taskIdOpt.get._1, + TaskState.FAILED, taskFailedReason) + } + + mockTaskSchedulerImpl.initialize(mockSchedulerBackend) + dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(mockDAGScheduler) + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1)) + + resourceOffer(getTaskSetManagerByTask(taskSets(0).tasks(0)), "hostA", "0") + resourceOffer(getTaskSetManagerByTask(taskSets(0).tasks(1)), "hostA", "0") + + taskFailed(getTaskSetManagerByTask(taskSets(0).tasks(0)), taskSets(0).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")) + mockDAGScheduler.resubmitFailedStages() + + taskSuccessful(getTaskSetManagerByTask(taskSets(0).tasks(1)), + taskSets(0).tasks(1), makeMapStatus("hostA", 2)) + + resourceOffer(getTaskSetManagerByTask(taskSets(1).tasks(0)), "hostB", "1") + resourceOffer(getTaskSetManagerByTask(taskSets(1).tasks(1)), "hostB", "1") + + taskSuccessful(getTaskSetManagerByTask(taskSets(1).tasks(0)), + taskSets(1).tasks(0), makeMapStatus("hostB", 2)) + + taskSuccessful(getTaskSetManagerByTask(taskSets(1).tasks(1)), + taskSets(1).tasks(1), makeMapStatus("hostB", 2)) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. From 9578383d428578fb6db029429bf7b92805fe4005 Mon Sep 17 00:00:00 2001 From: jinxing Date: Sat, 4 Feb 2017 23:10:42 +0800 Subject: [PATCH 10/20] small fix --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0f833722012f6..8649b98416f34 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -27,9 +27,8 @@ import scala.util.control.NonFatal import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import org.apache.spark._ -import org.apache.spark.TaskState.TaskState +import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode From da7d18518b106b305afaa4935745f284bd36ace6 Mon Sep 17 00:00:00 2001 From: jinxing Date: Sun, 5 Feb 2017 11:40:24 +0800 Subject: [PATCH 11/20] call TaskSchedulerImpl's resourceOffers --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8649b98416f34..8f17fb2bcc9be 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +// scalastyle:off package org.apache.spark.scheduler import java.util.Properties @@ -2233,8 +2233,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) submit(reduceRdd, Array(0, 1)) - resourceOffer(getTaskSetManagerByTask(taskSets(0).tasks(0)), "hostA", "0") - resourceOffer(getTaskSetManagerByTask(taskSets(0).tasks(1)), "hostA", "0") + mockTaskSchedulerImpl.resourceOffers(Seq(new WorkerOffer("0", "hostA", 2)).toIndexedSeq) taskFailed(getTaskSetManagerByTask(taskSets(0).tasks(0)), taskSets(0).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")) @@ -2243,8 +2242,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou taskSuccessful(getTaskSetManagerByTask(taskSets(0).tasks(1)), taskSets(0).tasks(1), makeMapStatus("hostA", 2)) - resourceOffer(getTaskSetManagerByTask(taskSets(1).tasks(0)), "hostB", "1") - resourceOffer(getTaskSetManagerByTask(taskSets(1).tasks(1)), "hostB", "1") + mockTaskSchedulerImpl.resourceOffers(Seq(new WorkerOffer("0", "hostB", 2)).toIndexedSeq) taskSuccessful(getTaskSetManagerByTask(taskSets(1).tasks(0)), taskSets(1).tasks(0), makeMapStatus("hostB", 2)) From 5dce18e8b155eace3e9090226c09f3c1ee2e6573 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 7 Feb 2017 18:28:49 +0800 Subject: [PATCH 12/20] Refined unit test and some comments. --- .../apache/spark/scheduler/DAGScheduler.scala | 12 +- .../scheduler/BlacklistIntegrationSuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 92 +------------ .../scheduler/SchedulerIntegrationSuite.scala | 128 +++++++++++------- 4 files changed, 84 insertions(+), 150 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8c6a8a6c92a8e..63e6a391ef4d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1199,15 +1199,9 @@ class DAGScheduler( } } activeTaskSetManagerOpt.foreach { activeTsm => - // The scheduler thinks we don't need any more partitions for this stage, but there - // is still an active taskset for the stage. This can happen when there are stage - // retries, and we get late task completions from earlier stages. Note that all of - // the map output may or may not be available -- some of those map outputs may have - // been lost. But the most consistent way to make that determination is to end - // the running taskset, and mark the stage as finished. The DAGScheduler will - // automatically determine whether there are still partitions missing that need to - // be resubmitted. - // NOTE: this will get a lock on the TaskScheduler + // We need a lock on the taskScheduler because tsm is not thread-safe, + // it assumes that all interactions have a lock on the taskScheduler, + // even just setting isZombie. // TODO yet another instance we should probably kill all running tasks when we abort // the taskset logInfo(s"Marking ${activeTsm.name} as zombie, as all map output may already be " + diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index f6015cd51c2bd..6019a5e3665cc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -132,7 +132,7 @@ class MultiExecutorMockBackend( val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4) val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2) - override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = { + executorIdToExecutor = { (0 until nHosts).flatMap { hostIdx => val hostName = "host-" + hostIdx (0 until nExecutorsPerHost).map { subIdx => diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8f17fb2bcc9be..4e5f267e237c0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -// scalastyle:off + package org.apache.spark.scheduler import java.util.Properties @@ -2161,96 +2161,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } - test("[SPARK-19263] DAGScheduler should avoid sending conflicting task set") { - val mockTaskSchedulerImpl = new TaskSchedulerImpl(sc) { - override def submitTasks(taskSet: TaskSet): Unit = { - super.submitTasks(taskSet) - taskSets += taskSet - } - } - val mockDAGScheduler = new DAGScheduler( - sc, - mockTaskSchedulerImpl, - sc.listenerBus, - mapOutputTracker, - blockManagerMaster, - sc.env - ) { - override def taskEnded( - task: Task[_], - reason: TaskEndReason, - result: Any, - accumUpdates: Seq[AccumulatorV2[_, _]], - taskInfo: TaskInfo): Unit = { - dagEventProcessLoopTester.post( - CompletionEvent(task, reason, result, accumUpdates, taskInfo)) - } - } - - val mockSchedulerBackend = new SchedulerBackend { - override def stop(): Unit = {} - - override def defaultParallelism(): Int = 2 - - override def reviveOffers(): Unit = {} - - override def start(): Unit = {} - } - - def getTaskSetManagerByTask(task: Task[_]): TaskSetManager = { - val taskSetManagerOpt = mockTaskSchedulerImpl - .taskSetManagerForAttempt(task.stageId, task.stageAttemptId) - assert(taskSetManagerOpt.isDefined) - taskSetManagerOpt.get - } - - def resourceOffer(taskSetManager: TaskSetManager, host: String, execId: String): Unit = { - taskSetManager.resourceOffer(execId, host, TaskLocality.ANY) - } - - def taskSuccessful(tsm: TaskSetManager, task: Task[_], result: Any): Unit = { - val taskIdOpt = tsm.taskInfos.find(_._2.index == task.partitionId) - assert(taskIdOpt.isDefined) - val ser = sc.env.closureSerializer.newInstance() - val directResult = new DirectTaskResult(ser.serialize(result), Seq.empty) - mockTaskSchedulerImpl.handleSuccessfulTask(tsm, taskIdOpt.get._1, directResult) - } - - def taskFailed(tsm: TaskSetManager, task: Task[_], taskFailedReason: TaskFailedReason): Unit = { - val taskIdOpt = tsm.taskInfos.find(_._2.index == task.partitionId) - assert(taskIdOpt.isDefined) - val ser = sc.env.closureSerializer.newInstance() - val data = ser.serialize(taskFailedReason) - mockTaskSchedulerImpl.handleFailedTask(tsm, taskIdOpt.get._1, - TaskState.FAILED, taskFailedReason) - } - - mockTaskSchedulerImpl.initialize(mockSchedulerBackend) - dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(mockDAGScheduler) - val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) - val shuffleId = shuffleDep.shuffleId - val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) - submit(reduceRdd, Array(0, 1)) - - mockTaskSchedulerImpl.resourceOffers(Seq(new WorkerOffer("0", "hostA", 2)).toIndexedSeq) - - taskFailed(getTaskSetManagerByTask(taskSets(0).tasks(0)), taskSets(0).tasks(0), - FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")) - mockDAGScheduler.resubmitFailedStages() - - taskSuccessful(getTaskSetManagerByTask(taskSets(0).tasks(1)), - taskSets(0).tasks(1), makeMapStatus("hostA", 2)) - - mockTaskSchedulerImpl.resourceOffers(Seq(new WorkerOffer("0", "hostB", 2)).toIndexedSeq) - - taskSuccessful(getTaskSetManagerByTask(taskSets(1).tasks(0)), - taskSets(1).tasks(0), makeMapStatus("hostB", 2)) - - taskSuccessful(getTaskSetManagerByTask(taskSets(1).tasks(1)), - taskSets(1).tasks(1), makeMapStatus("hostB", 2)) - } - /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 5fd6b1db1d21f..f494b0306a777 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -48,8 +48,8 @@ import org.apache.spark.util.{CallSite, ThreadUtils, Utils} abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends SparkFunSuite with LocalSparkContext { - var taskScheduler: TestTaskScheduler = null - var scheduler: TestDAGScheduler = null + var taskScheduler: TaskSchedulerImplWithTracking = null + var scheduler: DAGSchedulerWithTracking = null var backend: T = _ override def beforeEach(): Unit = { @@ -75,9 +75,9 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa conf.setMaster(s"mock[${backendClassName}]") sc = new SparkContext(conf) backend = sc.schedulerBackend.asInstanceOf[T] - taskScheduler = sc.taskScheduler.asInstanceOf[TestTaskScheduler] + taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImplWithTracking] taskScheduler.initialize(sc.schedulerBackend) - scheduler = new TestDAGScheduler(sc, taskScheduler) + scheduler = new DAGSchedulerWithTracking(sc, taskScheduler) taskScheduler.setDAGScheduler(scheduler) } @@ -374,7 +374,7 @@ private[spark] abstract class MockBackend( * Most likely the only thing that needs to be protected are the inidividual ExecutorTaskStatus, * but for simplicity in this mock just lock the whole backend. */ - def executorIdToExecutor: Map[String, ExecutorTaskStatus] + var executorIdToExecutor: Map[String, ExecutorTaskStatus] = null private def generateOffers(): IndexedSeq[WorkerOffer] = { executorIdToExecutor.values.filter { exec => @@ -435,8 +435,8 @@ private[spark] class SingleCoreMockBackend( val localExecutorId = SparkContext.DRIVER_IDENTIFIER val localExecutorHostname = "localhost" - override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = Map( - localExecutorId -> new ExecutorTaskStatus(localExecutorHostname, localExecutorId, freeCores) + executorIdToExecutor = + Map(localExecutorId -> new ExecutorTaskStatus(localExecutorHostname, localExecutorId, freeCores) ) } @@ -484,7 +484,7 @@ private class MockExternalClusterManager extends ExternalClusterManager { def createTaskScheduler( sc: SparkContext, masterURL: String): TaskScheduler = { - new TestTaskScheduler(sc) + new TaskSchedulerImplWithTracking(sc) } def createSchedulerBackend( @@ -505,7 +505,7 @@ private class MockExternalClusterManager extends ExternalClusterManager { } /** TaskSchedulerImpl that just tracks a tiny bit more state to enable checks in tests. */ -class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { +class TaskSchedulerImplWithTracking(sc: SparkContext) extends TaskSchedulerImpl(sc) { /** Set of TaskSets the DAGScheduler has requested executed. */ val runningTaskSets = HashSet[TaskSet]() @@ -521,7 +521,7 @@ class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { } /** DAGScheduler that just tracks a tiny bit more state to enable checks in tests. */ -class TestDAGScheduler(sc: SparkContext, taskScheduler: TaskScheduler) +class DAGSchedulerWithTracking(sc: SparkContext, taskScheduler: TaskScheduler) extends DAGScheduler(sc, taskScheduler) { val handledCompletionEvent = scala.collection.mutable.ListBuffer[CompletionEvent]() @@ -661,53 +661,82 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor } assertDataStructuresEmpty(noFailure = false) } +} - testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") { - val a = new MockRDD(sc, 2, Nil) - val b = shuffle(2, a) - val shuffleId = b.shuffleDeps.head.shuffleId +private[spark] class SimpleMockBackend( + conf: SparkConf, + taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) { + + freeCores = Int.MaxValue + + override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", 2) +} + +class SimpleSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SimpleMockBackend] { + testScheduler("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," + + " even with late completions from earlier stage attempts.") { + // Set executor's cores large enough. Since executors are randomly shuffled + // when offer resource in TaskSchedulerImpl, Thus it's hard to predict where + // tasks run on for this integration test. Here we just provide sufficient + // cores and set returning result in backend thread when tasks finish. + backend.executorIdToExecutor = + Map("0" -> ExecutorTaskStatus("hostX", "0", 10)) + val rddA = new MockRDD(sc, 2, Nil) + val rddB = shuffle(2, rddA) + val rddC = shuffle(2, rddB) def runBackend(): Unit = { val (taskDescription, task) = backend.beginTask() - task.stageId match { - // ShuffleMapTask - case 0 => - val stageAttempt = task.stageAttemptId - val partitionId = task.partitionId - (stageAttempt, partitionId) match { - case (0, 0) => - val fetchFailed = FetchFailed( - DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored") - backend.taskFailed(taskDescription, fetchFailed) - case (0, 1) => - // Wait until stage resubmission caused by FetchFailed is finished. - waitForCondition(taskScheduler.runningTaskSets.size == 2, 5000, - "Wait until stage is resubmitted caused by fetch failed") - - // Task(stageAttempt=0, partition=1) will be bogus, because both two - // tasks(stageAttempt=0, partition=0, 1) run on hostA. - // Pending partitions are (0, 1) after stage resubmission, - // then change to be 0 after this bogus task. - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) - case (1, 1) => - waitForCondition(scheduler.handledCompletionEvent.map(_.task).exists { - task => task.stageAttemptId == 1 && task.partitionId == 0 - }, 5000, "Wait until Success of task(stageAttempt=1 and partition=0)" + - " is handled by DAGScheduler.") - // Task(stageAttempt=1 and partition=0) should not cause stage resubmission, - // even though shuffleStage.pendingPartitions.isEmpty and - // shuffleStage.isAvailable is false. Because the stageAttempt=0 is not finished yet. - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) - case _ => - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) + + // Start blocking operations below (i.e. waitForCondition) in a new thread, + // thus do not block backend thread. Backend thread acts as two roles: + // running tasks concurrently and delivering result from executor to scheduler. + new Thread { + override def run(): Unit = { + task.stageId match { + case 0 => + task.stageAttemptId match { + case 0 => + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) + case 1 => + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) + } + case 1 => + val stageAttempt = task.stageAttemptId + val partitionId = task.partitionId + (stageAttempt, partitionId) match { + case (0, 0) => + val fetchFailed = FetchFailed(DAGSchedulerSuite.makeBlockManagerId("hostA"), + rddC.shuffleDeps.head.shuffleId, 0, 0, "ignored") + backend.taskFailed(taskDescription, fetchFailed) + case (0, 1) => + // Wait until stage resubmission caused by FetchFailed is finished. + waitForCondition( + taskScheduler.runningTaskSets.exists { + ts => ts.stageId == 1 && ts.stageAttemptId == 1 + }, + 5000, "Wait until stage is resubmitted caused by fetch failed") + + // The completion of this task is ignored, because it's from an old stage. + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) + case (1, 1) => + waitForCondition(scheduler.handledCompletionEvent.map(_.task).exists { + task => task.stageId == 1 && task.stageAttemptId == 1 && task.partitionId == 0 + }, 5000, "Wait until Success of task(stageId=1, stageAttempt=1, partition=0)" + + " is handled by DAGScheduler.") + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) + case _ => + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) + } + case 2 => + backend.taskSuccess(taskDescription, 10) } - // ResultTask - case 1 => backend.taskSuccess(taskDescription, 10) - } + } + }.start() } withBackend(runBackend _) { - val jobFuture = submit(b, (0 until 2).toArray) + val jobFuture = submit(rddC, (0 until 2).toArray) val duration = Duration(15, SECONDS) awaitJobTermination(jobFuture, duration) } @@ -727,4 +756,5 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor throw new TimeoutException( s"Condition '$msg' failed to become true before $timeout milliseconds elapsed") } + } From ddab65e95b7c10165a8f5bb9070357ab5e941df7 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 7 Feb 2017 18:33:42 +0800 Subject: [PATCH 13/20] small fix --- .../org/apache/spark/scheduler/SchedulerIntegrationSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index f494b0306a777..69951f74605d2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -402,7 +402,6 @@ private[spark] abstract class MockBackend( val task = taskSet.tasks(taskDescription.index) (taskDescription, task) } - newTasks.foreach { case (taskDescription, _) => executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK } From 4a7cadc78e1bf621e234be4ffb39c2ab38c26060 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 7 Feb 2017 18:49:42 +0800 Subject: [PATCH 14/20] small fix. --- .../apache/spark/scheduler/SchedulerIntegrationSuite.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 69951f74605d2..90a6593d940dd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -694,12 +694,7 @@ class SimpleSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SimpleMo override def run(): Unit = { task.stageId match { case 0 => - task.stageAttemptId match { - case 0 => - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) - case 1 => - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) - } + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) case 1 => val stageAttempt = task.stageAttemptId val partitionId = task.partitionId From 3a5d60d74b8e37966a859d5d02b74aefb7cbee4f Mon Sep 17 00:00:00 2001 From: jinxing Date: Sun, 12 Feb 2017 12:56:17 +0800 Subject: [PATCH 15/20] Solve this bug by processing pendingPartitions correctly. --- .../apache/spark/scheduler/DAGScheduler.scala | 36 ++--- .../org/apache/spark/scheduler/Stage.scala | 10 -- .../spark/scheduler/TaskSetManager.scala | 28 ++-- .../scheduler/BlacklistIntegrationSuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 52 ++++++++ .../scheduler/SchedulerIntegrationSuite.scala | 123 ++---------------- 6 files changed, 95 insertions(+), 156 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 63e6a391ef4d3..520c934717fc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1181,33 +1181,33 @@ class DAGScheduler( case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] - shuffleStage.pendingPartitions -= task.partitionId updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId - logDebug("ShuffleMapTask finished on " + execId) + if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { + // This task was for the currently running attempt of the stage. Since the task + // completed successfully from the perspective of the TaskSetManager, mark it as + // no longer pending (the TaskSetManager may consider the task complete even + // when the output needs to be ignored because the task's epoch is too small below). + shuffleStage.pendingPartitions -= task.partitionId + } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { + // The epoch of the task is acceptable (i.e., the task was launched after the most + // recent failure we're aware of for the executor), so mark the task's output as + // available. shuffleStage.addOutputLoc(smt.partitionId, status) + // Remove the task's partition from pending partitions. This may have already been + // done above, but will not have been done yet in cases where the task attempt was + // from an earlier attempt of the stage (i.e., not the attempt that's currently + // running). This allows the DAGScheduler to mark the stage as complete when one + // copy of each task has finished successfully, even if the currently active stage + // still has tasks running. + shuffleStage.pendingPartitions -= task.partitionId } + if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - // Check if there is active TaskSetManager. - val activeTaskSetManagerOpt = Option(taskScheduler.rootPool).flatMap { rootPool => - rootPool.getSortedTaskSetQueue.find { tsm => - tsm.stageId == stageId && !tsm.isZombie - } - } - activeTaskSetManagerOpt.foreach { activeTsm => - // We need a lock on the taskScheduler because tsm is not thread-safe, - // it assumes that all interactions have a lock on the taskScheduler, - // even just setting isZombie. - // TODO yet another instance we should probably kill all running tasks when we abort - // the taskset - logInfo(s"Marking ${activeTsm.name} as zombie, as all map output may already be " + - s"available for this stage (from previous attempts)") - taskScheduler.synchronized { activeTsm.isZombie = true } - } markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 3a520143aae51..32e5df6d75f4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -67,16 +67,6 @@ private[scheduler] abstract class Stage( /** Set of jobs that this stage belongs to. */ val jobIds = new HashSet[Int] - /** - * Partitions the [[DAGScheduler]] is waiting on before it tries to mark the stage / job as - * completed and continue. Tasks' successes in both the active taskset or earlier attempts - * for this stage can cause partition ids get removed from pendingPartitions. Finally, note - * that when this is empty, it does not necessarily mean that stage is completed -- Some of - * the map output from that stage may have been lost. But the [[DAGScheduler]] will check for - * this condition and resubmit the stage if necessary. - */ - val pendingPartitions = new HashSet[Int] - /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 18eda30e99c69..3b25513bea057 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -697,20 +697,6 @@ private[spark] class TaskSetManager( val index = info.index info.markFinished(TaskState.FINISHED) removeRunningTask(tid) - if (!successful(index)) { - tasksSuccessful += 1 - logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" + - s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" + - s" ($tasksSuccessful/$numTasks)") - // Mark successful and stop if all the tasks have succeeded. - successful(index) = true - if (tasksSuccessful == numTasks) { - isZombie = true - } - } else { - logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + - " because task " + index + " has already completed successfully") - } // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -726,6 +712,20 @@ private[spark] class TaskSetManager( s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) } + if (!successful(index)) { + tasksSuccessful += 1 + logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" + + s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" + + s" ($tasksSuccessful/$numTasks)") + // Mark successful and stop if all the tasks have succeeded. + successful(index) = true + if (tasksSuccessful == numTasks) { + isZombie = true + } + } else { + logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + + " because task " + index + " has already completed successfully") + } maybeFinishTaskSet() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 6019a5e3665cc..f6015cd51c2bd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -132,7 +132,7 @@ class MultiExecutorMockBackend( val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4) val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2) - executorIdToExecutor = { + override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = { (0 until nHosts).flatMap { hostIdx => val hostName = "host-" + hostIdx (0 until nExecutorsPerHost).map { subIdx => diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4e5f267e237c0..4799115efb670 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," + + " even with late completions from earlier stage attempts") { + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + + val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + + val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + + submit(rddC, Array(0, 1)) + + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2)))) + + // Fetch failed on hostA. + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, + "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), + null)) + + scheduler.resubmitFailedStages() + + assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2)))) + + // Task succeeds on a failed executor. The success is bogus. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) + + assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + + // There should be no new attempt of stage submitted. + assert(taskSets.size === 4) + runEvent(makeCompletionEvent( + taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) + + // ResultStage submitted. + assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) + complete(taskSets(4), Seq( + (Success, 1), + (Success, 1))) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 90a6593d940dd..398ac3d6202db 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.util.{CallSite, ThreadUtils, Utils} /** - * Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets, + * Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets, * TaskSetManagers. * * Test cases are configured by providing a set of jobs to submit, and then simulating interaction @@ -48,8 +48,8 @@ import org.apache.spark.util.{CallSite, ThreadUtils, Utils} abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends SparkFunSuite with LocalSparkContext { - var taskScheduler: TaskSchedulerImplWithTracking = null - var scheduler: DAGSchedulerWithTracking = null + var taskScheduler: TestTaskScheduler = null + var scheduler: DAGScheduler = null var backend: T = _ override def beforeEach(): Unit = { @@ -75,9 +75,9 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa conf.setMaster(s"mock[${backendClassName}]") sc = new SparkContext(conf) backend = sc.schedulerBackend.asInstanceOf[T] - taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImplWithTracking] + taskScheduler = sc.taskScheduler.asInstanceOf[TestTaskScheduler] taskScheduler.initialize(sc.schedulerBackend) - scheduler = new DAGSchedulerWithTracking(sc, taskScheduler) + scheduler = new DAGScheduler(sc, taskScheduler) taskScheduler.setDAGScheduler(scheduler) } @@ -374,7 +374,7 @@ private[spark] abstract class MockBackend( * Most likely the only thing that needs to be protected are the inidividual ExecutorTaskStatus, * but for simplicity in this mock just lock the whole backend. */ - var executorIdToExecutor: Map[String, ExecutorTaskStatus] = null + def executorIdToExecutor: Map[String, ExecutorTaskStatus] private def generateOffers(): IndexedSeq[WorkerOffer] = { executorIdToExecutor.values.filter { exec => @@ -434,8 +434,8 @@ private[spark] class SingleCoreMockBackend( val localExecutorId = SparkContext.DRIVER_IDENTIFIER val localExecutorHostname = "localhost" - executorIdToExecutor = - Map(localExecutorId -> new ExecutorTaskStatus(localExecutorHostname, localExecutorId, freeCores) + override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = Map( + localExecutorId -> new ExecutorTaskStatus(localExecutorHostname, localExecutorId, freeCores) ) } @@ -483,7 +483,7 @@ private class MockExternalClusterManager extends ExternalClusterManager { def createTaskScheduler( sc: SparkContext, masterURL: String): TaskScheduler = { - new TaskSchedulerImplWithTracking(sc) + new TestTaskScheduler(sc) } def createSchedulerBackend( @@ -504,7 +504,7 @@ private class MockExternalClusterManager extends ExternalClusterManager { } /** TaskSchedulerImpl that just tracks a tiny bit more state to enable checks in tests. */ -class TaskSchedulerImplWithTracking(sc: SparkContext) extends TaskSchedulerImpl(sc) { +class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { /** Set of TaskSets the DAGScheduler has requested executed. */ val runningTaskSets = HashSet[TaskSet]() @@ -519,18 +519,6 @@ class TaskSchedulerImplWithTracking(sc: SparkContext) extends TaskSchedulerImpl( } } -/** DAGScheduler that just tracks a tiny bit more state to enable checks in tests. */ -class DAGSchedulerWithTracking(sc: SparkContext, taskScheduler: TaskScheduler) - extends DAGScheduler(sc, taskScheduler) { - - val handledCompletionEvent = scala.collection.mutable.ListBuffer[CompletionEvent]() - - override private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = { - super.handleTaskCompletion(event) - handledCompletionEvent += event - } -} - /** * Some very basic tests just to demonstrate the use of the test framework (and verify that it * works). @@ -661,94 +649,3 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor assertDataStructuresEmpty(noFailure = false) } } - -private[spark] class SimpleMockBackend( - conf: SparkConf, - taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) { - - freeCores = Int.MaxValue - - override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", 2) -} - -class SimpleSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SimpleMockBackend] { - testScheduler("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," + - " even with late completions from earlier stage attempts.") { - // Set executor's cores large enough. Since executors are randomly shuffled - // when offer resource in TaskSchedulerImpl, Thus it's hard to predict where - // tasks run on for this integration test. Here we just provide sufficient - // cores and set returning result in backend thread when tasks finish. - backend.executorIdToExecutor = - Map("0" -> ExecutorTaskStatus("hostX", "0", 10)) - val rddA = new MockRDD(sc, 2, Nil) - val rddB = shuffle(2, rddA) - val rddC = shuffle(2, rddB) - - def runBackend(): Unit = { - val (taskDescription, task) = backend.beginTask() - - // Start blocking operations below (i.e. waitForCondition) in a new thread, - // thus do not block backend thread. Backend thread acts as two roles: - // running tasks concurrently and delivering result from executor to scheduler. - new Thread { - override def run(): Unit = { - task.stageId match { - case 0 => - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) - case 1 => - val stageAttempt = task.stageAttemptId - val partitionId = task.partitionId - (stageAttempt, partitionId) match { - case (0, 0) => - val fetchFailed = FetchFailed(DAGSchedulerSuite.makeBlockManagerId("hostA"), - rddC.shuffleDeps.head.shuffleId, 0, 0, "ignored") - backend.taskFailed(taskDescription, fetchFailed) - case (0, 1) => - // Wait until stage resubmission caused by FetchFailed is finished. - waitForCondition( - taskScheduler.runningTaskSets.exists { - ts => ts.stageId == 1 && ts.stageAttemptId == 1 - }, - 5000, "Wait until stage is resubmitted caused by fetch failed") - - // The completion of this task is ignored, because it's from an old stage. - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2)) - case (1, 1) => - waitForCondition(scheduler.handledCompletionEvent.map(_.task).exists { - task => task.stageId == 1 && task.stageAttemptId == 1 && task.partitionId == 0 - }, 5000, "Wait until Success of task(stageId=1, stageAttempt=1, partition=0)" + - " is handled by DAGScheduler.") - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) - case _ => - backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2)) - } - case 2 => - backend.taskSuccess(taskDescription, 10) - } - } - }.start() - } - - withBackend(runBackend _) { - val jobFuture = submit(rddC, (0 until 2).toArray) - val duration = Duration(15, SECONDS) - awaitJobTermination(jobFuture, duration) - } - assert(results === (0 until 2).map { _ -> 10}.toMap) - } - - def waitForCondition(condition: => Boolean, timeout: Long, msg: String): Unit = { - val finishTime = System.currentTimeMillis() + timeout - while (System.currentTimeMillis() < finishTime) { - if (condition) { - return - } - // Sleep rather than using wait/notify, because this is used only for testing and wait/notify - // add overhead in the general case. - Thread.sleep(10) - } - throw new TimeoutException( - s"Condition '$msg' failed to become true before $timeout milliseconds elapsed") - } - -} From 46ef5a369902ce2ca8c0dfde64b973647f5fffeb Mon Sep 17 00:00:00 2001 From: jinxing Date: Mon, 13 Feb 2017 10:26:06 +0800 Subject: [PATCH 16/20] small fix --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 520c934717fc9..ad3b87b88e655 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1184,6 +1184,7 @@ class DAGScheduler( updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId + logDebug("ShuffleMapTask finished on " + execId) if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { // This task was for the currently running attempt of the stage. Since the task // completed successfully from the perspective of the TaskSetManager, mark it as From ab8d13efaf12182517d3b311d74b2f0a8d2fbef8 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 14 Feb 2017 11:53:13 +0800 Subject: [PATCH 17/20] fix --- .../spark/scheduler/DAGSchedulerSuite.scala | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4799115efb670..c735220da2e15 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2162,7 +2162,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," + - " even with late completions from earlier stage attempts") { + " even with late completions from earlier stage attempts") { + // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC val rddA = new MyRDD(sc, 2, Nil) val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) val shuffleIdA = shuffleDepA.shuffleId @@ -2174,39 +2175,56 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou submit(rddC, Array(0, 1)) + // Complete both tasks in rddA. assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostA", 2)))) - // Fetch failed on hostA. + // Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) running on hostA + // and task(stageId=1, stageAttemptId=0, partitionId=1) is still running. + assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0) runEvent(makeCompletionEvent( taskSets(1).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), - null)) + result = null)) + // Both original tasks in rddA should be marked as failed, because they ran on the + // failed hostA, so both should be resubmitted. Complete them on hostB successfully. scheduler.resubmitFailedStages() - - assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) + assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 + && taskSets(2).tasks.size === 2) complete(taskSets(2), Seq( (Success, makeMapStatus("hostB", 2)), (Success, makeMapStatus("hostB", 2)))) - // Task succeeds on a failed executor. The success is bogus. + // Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA + // successfully. The success should be ignored because the task started before the + // executor failed, so the output may have been lost. runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) - assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) + // Both tasks in rddB should be resubmitted, because none of them has succeeded truely. + // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully. + // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt + // is still running. + assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 + && taskSets(3).tasks.size === 2) runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) - // There should be no new attempt of stage submitted. + // There should be no new attempt of stage submitted, + // because task(stageId=1, stageAttempt=1, partitionId=1) is still running in + // the current attempt (and hasn't completed successfully in any earlier attempts). assert(taskSets.size === 4) + + // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully. runEvent(makeCompletionEvent( taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) - // ResultStage submitted. + // Now the ResultStage should be submitted, because all of the tasks of rddB have + // completed successfully on alive executors. assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) complete(taskSets(4), Seq( (Success, 1), From e34cd85424d88daf96d0252d34f2ce28b956ddde Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 15 Feb 2017 10:15:03 +0800 Subject: [PATCH 18/20] Refine comment. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ad3b87b88e655..839a98d003154 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1190,6 +1190,9 @@ class DAGScheduler( // completed successfully from the perspective of the TaskSetManager, mark it as // no longer pending (the TaskSetManager may consider the task complete even // when the output needs to be ignored because the task's epoch is too small below). + // This can result in inconsistency between pending partitions and output locations + // of stage. When pending partitions is empty, the scheduler will check output + // locations, if there are missing, the stage will be resubmitted. shuffleStage.pendingPartitions -= task.partitionId } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { From d2255654b1f6ae43ba47c0ffcec0e6adc4beed82 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 15 Feb 2017 10:21:44 +0800 Subject: [PATCH 19/20] refine comment --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 839a98d003154..c0f2d675de38a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1189,10 +1189,10 @@ class DAGScheduler( // This task was for the currently running attempt of the stage. Since the task // completed successfully from the perspective of the TaskSetManager, mark it as // no longer pending (the TaskSetManager may consider the task complete even - // when the output needs to be ignored because the task's epoch is too small below). - // This can result in inconsistency between pending partitions and output locations - // of stage. When pending partitions is empty, the scheduler will check output - // locations, if there are missing, the stage will be resubmitted. + // when the output needs to be ignored because the task's epoch is too small below, + // if so, this can result in inconsistency between pending partitions and output + // locations of stage. When pending partitions is empty, the scheduler will check + // output locations, if there is missing, the stage will be resubmitted. shuffleStage.pendingPartitions -= task.partitionId } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { From 6809d1ff5d09693e961087da35c8f6b3b50fe53c Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 16 Feb 2017 10:07:39 +0800 Subject: [PATCH 20/20] refine comments. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c0f2d675de38a..0b7d3716c19da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1189,10 +1189,9 @@ class DAGScheduler( // This task was for the currently running attempt of the stage. Since the task // completed successfully from the perspective of the TaskSetManager, mark it as // no longer pending (the TaskSetManager may consider the task complete even - // when the output needs to be ignored because the task's epoch is too small below, - // if so, this can result in inconsistency between pending partitions and output - // locations of stage. When pending partitions is empty, the scheduler will check - // output locations, if there is missing, the stage will be resubmitted. + // when the output needs to be ignored because the task's epoch is too small below. + // In this case, when pending partitions is empty, there will still be missing + // output locations, which will cause the DAGScheduler to resubmit the stage below.) shuffleStage.pendingPartitions -= task.partitionId } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {