From 5fc19c7b292365644e8e615227f2cfa0b211d261 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 10 Feb 2016 21:49:58 +0000 Subject: [PATCH] include changes to commonize TaskEnd call from SPARK-13054 and rework --- .../apache/spark/scheduler/DAGScheduler.scala | 26 +++++-------------- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f275e00f065af..ecef083be43fb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1144,33 +1144,21 @@ class DAGScheduler( null } - // The success case is dealt with separately below. - // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here. - if (event.reason != Success) { - val attemptId = task.stageAttemptId - listenerBus.post(SparkListenerTaskEnd( - stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics)) - } + // Note: this stage may already have been canceled, in which case this task end event + // maybe posted after the stage completed event. There's not much we can do here without + // introducing additional complexity in the scheduler to wait for all the task end events + // before posting the stage completed event. + listenerBus.post(SparkListenerTaskEnd( + stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics)) if (!stageIdToStage.contains(task.stageId)) { - logInfo("skip normal actions as stage cancelled") - // Need to handle tasks coming in late (speculative and jobs killed) - // post a task end event so accounting for things manually tracking tasks work. - // This really should be something other then success since the other speculative task - // finished first. - if (event.reason == Success) { - val attemptId = task.stageAttemptId - listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, - event.reason, event.taskInfo, taskMetrics)) - } + // Skip all the actions if the stage has been cancelled. return } val stage = stageIdToStage(task.stageId) event.reason match { case Success => - listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, - event.reason, event.taskInfo, taskMetrics)) stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4ad44182b1735..0e4c1b264f9d3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -988,7 +988,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(countSubmittedMapStageAttempts() === 2) } - test("late task events posted") { + test("task events always posted in speculation / when stage is killed") { val baseRdd = new MyRDD(sc, 4, Nil) val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd))) submit(finalRdd, Array(0, 1, 2, 3))