From 8f7d98177816e11659cf79a2b28f96bd4b7173d5 Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Wed, 27 Jun 2018 23:19:14 -0500 Subject: [PATCH 1/6] Fixed issue and added unit test --- .../spark/scheduler/TaskSetManager.scala | 6 ++ .../spark/scheduler/TaskSetManagerSuite.scala | 60 +++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a18c66596852a..415af893cbc49 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -723,6 +723,12 @@ private[spark] class TaskSetManager( def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index + // Check if any other attempt succeeded before this and this attempt has not been handled + if (successful(index) && killedByOtherAttempt(index)) { + handleFailedTask(tid, TaskState.KILLED, TaskKilled("another attempt succeeded")) + return + } + info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) if (speculationEnabled) { successfulTaskDurations.insert(info.duration) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ca6a7e5db3b17..c0b418bd219a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1371,4 +1371,64 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) } + + test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success"){ + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set("spark.speculation.multiplier", "0.0") + sc.conf.set("spark.speculation", "true") + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + + // Offer resources for 4 tasks to start + for ((k, v) <- List( + "exec1" -> "host1", + "exec1" -> "host1", + "exec2" -> "host2", + "exec2" -> "host2")) { + val taskOption = manager.resourceOffer(k, v, NO_PREF) + assert(taskOption.isDefined) + val task = taskOption.get + assert(task.executorId === k) + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + clock.advance(1) + // Complete the 3 tasks and leave 1 task in running + for (id <- Set(0, 1, 2)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for + // > 0ms, so advance the clock by 1ms here. + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(3)) + + // Offer resource to start the speculative attempt for the running task + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + assert(taskOption5.isDefined) + val task5 = taskOption5.get + assert(task5.index === 3) + assert(task5.taskId === 4) + assert(task5.executorId === "exec1") + assert(task5.attemptNumber === 1) + sched.backend = mock(classOf[SchedulerBackend]) + + // Complete one attempt for the running task + manager.handleSuccessfulTask(3, createTaskResult(3, accumUpdatesByTask(3))) + // Verify that it kills other running attempt + verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded") + // Complete another attempt for the running task + manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3))) + + assert(manager.taskInfos(3).successful == true) + assert(manager.taskInfos(4).killed == true) + } } From 980a9333e4c9ed8752ec4c3f12d8f5407f84da5d Mon Sep 17 00:00:00 2001 From: hthuynh2 Date: Thu, 28 Jun 2018 09:16:45 -0500 Subject: [PATCH 2/6] Fixed Scala Style Error --- .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c0b418bd219a9..6b39d7eb5ac64 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1372,7 +1372,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) } - test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success"){ + test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) From 7db26f2791edc9af27111371cd8578828c3b372c Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Fri, 6 Jul 2018 09:47:32 -0500 Subject: [PATCH 3/6] Modified Unit Test and Kill Message --- .../spark/scheduler/TaskSetManager.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 40 ++++++++++++------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 415af893cbc49..abf384df662ac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -725,7 +725,7 @@ private[spark] class TaskSetManager( val index = info.index // Check if any other attempt succeeded before this and this attempt has not been handled if (successful(index) && killedByOtherAttempt(index)) { - handleFailedTask(tid, TaskState.KILLED, TaskKilled("another attempt succeeded")) + handleFailedTask(tid, TaskState.KILLED, TaskKilled("Finish but did not commit due to another attempt succeeded")) return } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c0b418bd219a9..0db680cae1223 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1372,7 +1372,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) } - test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success"){ + test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) @@ -1384,13 +1384,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } - // Offer resources for 4 tasks to start for ((k, v) <- List( - "exec1" -> "host1", - "exec1" -> "host1", - "exec2" -> "host2", - "exec2" -> "host2")) { + "exec1" -> "host1", + "exec1" -> "host1", + "exec2" -> "host2", + "exec2" -> "host2")) { val taskOption = manager.resourceOffer(k, v, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get @@ -1403,7 +1402,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) assert(sched.endedTasks(id) === Success) } - // checkSpeculatableTasks checks that the task runtime is greater than the threshold for // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for // > 0ms, so advance the clock by 1ms here. @@ -1420,15 +1418,27 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(task5.executorId === "exec1") assert(task5.attemptNumber === 1) sched.backend = mock(classOf[SchedulerBackend]) - + sched.dagScheduler.stop() + sched.dagScheduler = mock(classOf[DAGScheduler]) // Complete one attempt for the running task - manager.handleSuccessfulTask(3, createTaskResult(3, accumUpdatesByTask(3))) - // Verify that it kills other running attempt + val result = createTaskResult(3, accumUpdatesByTask(3)) + manager.handleSuccessfulTask(3, result) + // There is a race between the scheduler asking to kill the other task, and that task + // actually finishing. We simulate what happens if the other task finishes before we kill it. verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded") - // Complete another attempt for the running task - manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3))) - - assert(manager.taskInfos(3).successful == true) - assert(manager.taskInfos(4).killed == true) + manager.handleSuccessfulTask(4, result) + + val info3 = manager.taskInfos(3) + val info4 = manager.taskInfos(4) + assert(info3.successful) + assert(info4.killed) + verify(sched.dagScheduler).taskEnded( + manager.tasks(3), + TaskKilled("Finish but did not commit due to another attempt succeeded"), + null, + Seq.empty, + info4) + verify(sched.dagScheduler).taskEnded(manager.tasks(3), Success, result.value(), + result.accumUpdates, info3) } } From 3c655f2b24e65602bb889b512a68fc598662a898 Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Fri, 6 Jul 2018 09:56:54 -0500 Subject: [PATCH 4/6] Fix scala Style --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index abf384df662ac..ec2028c00c3ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -725,7 +725,8 @@ private[spark] class TaskSetManager( val index = info.index // Check if any other attempt succeeded before this and this attempt has not been handled if (successful(index) && killedByOtherAttempt(index)) { - handleFailedTask(tid, TaskState.KILLED, TaskKilled("Finish but did not commit due to another attempt succeeded")) + handleFailedTask(tid, TaskState.KILLED, + TaskKilled("Finish but did not commit due to another attempt succeeded")) return } From 2c7d33d88a9023604ab6343988f3263ceea333ca Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Thu, 19 Jul 2018 18:17:33 -0500 Subject: [PATCH 5/6] undo effect on totalResultSize and calculatedTasks --- .../org/apache/spark/scheduler/TaskSetManager.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8136d8b3aac55..c1bbb237eec7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.{config, Logging} import org.apache.spark.scheduler.SchedulingMode._ -import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils} +import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils} import org.apache.spark.util.collection.MedianHeap /** @@ -724,7 +724,15 @@ private[spark] class TaskSetManager( val info = taskInfos(tid) val index = info.index // Check if any other attempt succeeded before this and this attempt has not been handled - if (successful(index) && killedByOtherAttempt(index)) { + if (successful(index) && killedByOtherAttempt.contains(tid)) { + calculatedTasks -= 1 + + val resultSizeAcc = result.accumUpdates.find(a => + a.name == Some(InternalAccumulator.RESULT_SIZE)) + if (resultSizeAcc.isDefined) { + totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value + } + handleFailedTask(tid, TaskState.KILLED, TaskKilled("Finish but did not commit due to another attempt succeeded")) return From b6585da0f137d3d3675925368c4668c884de900c Mon Sep 17 00:00:00 2001 From: Hieu Huynh <“Hieu.huynh@oath.com”> Date: Sun, 22 Jul 2018 19:13:53 -0500 Subject: [PATCH 6/6] add more comments --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c1bbb237eec7b..972abf273c6aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -725,14 +725,16 @@ private[spark] class TaskSetManager( val index = info.index // Check if any other attempt succeeded before this and this attempt has not been handled if (successful(index) && killedByOtherAttempt.contains(tid)) { + // Undo the effect on calculatedTasks and totalResultSize made earlier when + // checking if can fetch more results calculatedTasks -= 1 - val resultSizeAcc = result.accumUpdates.find(a => a.name == Some(InternalAccumulator.RESULT_SIZE)) if (resultSizeAcc.isDefined) { totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value } + // Handle this task as a killed task handleFailedTask(tid, TaskState.KILLED, TaskKilled("Finish but did not commit due to another attempt succeeded")) return