From 093e39cf76378821284ef7d771e819afb69930ae Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Sun, 8 Jul 2018 13:20:26 -0500 Subject: [PATCH 1/6] SPARK-24755 Executor loss can cause task to not be resubmitted --- .../spark/scheduler/TaskSetManager.scala | 6 +- .../spark/scheduler/TaskSetManagerSuite.scala | 107 ++++++++++++++++++ 2 files changed, 110 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a18c66596852a..895fa35c16a4d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -87,7 +87,7 @@ private[spark] class TaskSetManager( // Set the coresponding index of Boolean var when the task killed by other attempt tasks, // this happened while we set the `spark.speculation` to true. The task killed by others // should not resubmit while executor lost. - private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks) + private val killedByOtherAttempt = new HashSet[Long] val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) private[scheduler] var tasksSuccessful = 0 @@ -735,7 +735,7 @@ private[spark] class TaskSetManager( logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") - killedByOtherAttempt(index) = true + killedByOtherAttempt += attemptInfo.taskId sched.backend.killTask( attemptInfo.taskId, attemptInfo.executorId, @@ -944,7 +944,7 @@ private[spark] class TaskSetManager( && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index - if (successful(index) && !killedByOtherAttempt(index)) { + if (successful(index) && !killedByOtherAttempt.contains(tid)) { successful(index) = false copiesRunning(index) -= 1 tasksSuccessful -= 1 diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ca6a7e5db3b17..0fa6c1143d909 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("SPARK-24755 Executor loss can cause task to not be resubmitted") { + val conf = new SparkConf().set("spark.speculation", "true") + sc = new SparkContext("local", "test", conf) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set("spark.speculation.multiplier", "0.0") + sc.conf.set("spark.speculation.quantile", "0.5") + sc.conf.set("spark.speculation", "true") + + var killTaskCalled = false + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3")) + sched.initialize(new FakeSchedulerBackend() { + override def killTask(taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String): Unit = { + // Check the only one killTask event in this case, which triggered by + // task 2.1 completed. + assert(taskId === 2) + assert(executorId === "exec3") + assert(interruptThread) + assert(reason === "another attempt succeeded") + killTaskCalled = true + } + }) + + // Keep track of the index of tasks that are resubmitted, + // so that the test can check that task is resubmitted correctly + var resubmittedTasks = new mutable.HashSet[Int] + val dagScheduler = new FakeDAGScheduler(sc, sched) { + override def taskEnded(task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + taskInfo: TaskInfo): Unit = { + super.taskEnded(task, reason, result, accumUpdates, taskInfo) + reason match { + case Resubmitted => resubmittedTasks += taskInfo.index + case _ => + } + } + } + sched.dagScheduler.stop() + sched.setDAGScheduler(dagScheduler) + + val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0, + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host3", "exec3")), + Seq(TaskLocation("host2", "exec2"))) + + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 4 tasks to start + for ((exec, host) <- Seq( + "exec1" -> "host1", + "exec1" -> "host1", + "exec3" -> "host3", + "exec2" -> "host2")) { + val taskOption = manager.resourceOffer(exec, host, NO_PREF) + assert(taskOption.isDefined) + val task = taskOption.get + assert(task.executorId === exec) + // Add an extra assert to make sure task 2.0 is running on exec3 + if (task.index == 2) { + assert(task.attemptNumber === 0) + assert(task.executorId === "exec3") + } + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + clock.advance(1) + // Complete the 2 tasks and leave 2 task in running + for (id <- Set(0, 1)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for + // > 0ms, so advance the clock by 1ms here. + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(2, 3)) + + // Offer resource to start the speculative attempt for the running task 2.0 + val taskOption = manager.resourceOffer("exec2", "host2", ANY) + assert(taskOption.isDefined) + val task4 = taskOption.get + assert(task4.index === 2) + assert(task4.taskId === 4) + assert(task4.executorId === "exec2") + assert(task4.attemptNumber === 1) + // Complete the speculative attempt for the running task + manager.handleSuccessfulTask(4, createTaskResult(2, accumUpdatesByTask(2))) + // Make sure schedBackend.killTask(2, "exec3", true, "another attempt succeeded") gets called + assert(killTaskCalled) + + assert(resubmittedTasks.isEmpty) + // Host 2 Losts, meaning we lost the map output task4 + manager.executorLost("exec2", "host2", SlaveLost()) + // Make sure that task with index 2 is re-submitted + assert(resubmittedTasks.contains(2)) + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { From 9f0e0aeae4af79a59a6d2bf2b6f84c06fd57e0d5 Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Mon, 16 Jul 2018 12:27:16 -0500 Subject: [PATCH 2/6] Fix indentation and comments --- .../spark/scheduler/TaskSetManager.scala | 5 +++-- .../spark/scheduler/TaskSetManagerSuite.scala | 22 +++++++++---------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 895fa35c16a4d..8a6649a5b2c54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -84,8 +84,9 @@ private[spark] class TaskSetManager( val successful = new Array[Boolean](numTasks) private val numFailures = new Array[Int](numTasks) - // Set the coresponding index of Boolean var when the task killed by other attempt tasks, - // this happened while we set the `spark.speculation` to true. The task killed by others + + // Add the tid of task into this HashSet when the task is killed by other attempt tasks. + // This happened while we set the `spark.speculation` to true. The task killed by others // should not resubmit while executor lost. private val killedByOtherAttempt = new HashSet[Long] diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 0fa6c1143d909..6837210d56ad4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1378,9 +1378,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg ("exec2", "host2"), ("exec3", "host3")) sched.initialize(new FakeSchedulerBackend() { override def killTask(taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String): Unit = { + executorId: String, + interruptThread: Boolean, + reason: String): Unit = { // Check the only one killTask event in this case, which triggered by // task 2.1 completed. assert(taskId === 2) @@ -1396,10 +1396,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg var resubmittedTasks = new mutable.HashSet[Int] val dagScheduler = new FakeDAGScheduler(sc, sched) { override def taskEnded(task: Task[_], - reason: TaskEndReason, - result: Any, - accumUpdates: Seq[AccumulatorV2[_, _]], - taskInfo: TaskInfo): Unit = { + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + taskInfo: TaskInfo): Unit = { super.taskEnded(task, reason, result, accumUpdates, taskInfo) reason match { case Resubmitted => resubmittedTasks += taskInfo.index @@ -1423,10 +1423,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } // Offer resources for 4 tasks to start for ((exec, host) <- Seq( - "exec1" -> "host1", - "exec1" -> "host1", - "exec3" -> "host3", - "exec2" -> "host2")) { + "exec1" -> "host1", + "exec1" -> "host1", + "exec3" -> "host3", + "exec2" -> "host2")) { val taskOption = manager.resourceOffer(exec, host, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get From b2affd221a509557cf616ba5facbe05cbd8a144f Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Mon, 16 Jul 2018 12:28:38 -0500 Subject: [PATCH 3/6] Remove extra empty line --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8a6649a5b2c54..13e3bbd5311b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -83,8 +83,7 @@ private[spark] class TaskSetManager( // be re-run because the missing map data needs to be regenerated first. val successful = new Array[Boolean](numTasks) private val numFailures = new Array[Int](numTasks) - - + // Add the tid of task into this HashSet when the task is killed by other attempt tasks. // This happened while we set the `spark.speculation` to true. The task killed by others // should not resubmit while executor lost. From 20a032c114d8208591d214f7deadd7d0b8b518ca Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Mon, 16 Jul 2018 13:01:43 -0500 Subject: [PATCH 4/6] Fix another indentation --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 6837210d56ad4..f12e45fcb599e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1377,7 +1377,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) sched.initialize(new FakeSchedulerBackend() { - override def killTask(taskId: Long, + override def killTask( + taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit = { @@ -1395,7 +1396,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // so that the test can check that task is resubmitted correctly var resubmittedTasks = new mutable.HashSet[Int] val dagScheduler = new FakeDAGScheduler(sc, sched) { - override def taskEnded(task: Task[_], + override def taskEnded( + task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Seq[AccumulatorV2[_, _]], From a67bebcf304a7f0129f44586152490c7192efbe3 Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Mon, 16 Jul 2018 13:38:04 -0500 Subject: [PATCH 5/6] fix scala style error --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 13e3bbd5311b8..07a174c1dce07 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -83,7 +83,7 @@ private[spark] class TaskSetManager( // be re-run because the missing map data needs to be regenerated first. val successful = new Array[Boolean](numTasks) private val numFailures = new Array[Int](numTasks) - + // Add the tid of task into this HashSet when the task is killed by other attempt tasks. // This happened while we set the `spark.speculation` to true. The task killed by others // should not resubmit while executor lost. From 6316e5b9823dfe0b04b4521a9ef7d9516e9b8289 Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Wed, 18 Jul 2018 15:46:31 -0500 Subject: [PATCH 6/6] fix indentation --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 041a80445784e..206b9f47eed4f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1476,10 +1476,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } // Offer resources for 4 tasks to start for ((exec, host) <- Seq( - "exec1" -> "host1", - "exec1" -> "host1", - "exec3" -> "host3", - "exec2" -> "host2")) { + "exec1" -> "host1", + "exec1" -> "host1", + "exec3" -> "host3", + "exec2" -> "host2")) { val taskOption = manager.resourceOffer(exec, host, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get