Skip to content

Commit

Permalink
include changes to commonize TaskEnd call from SPARK-13054 and rework
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Graves committed Feb 10, 2016
1 parent 249fc78 commit 5fc19c7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 20 deletions.
26 changes: 7 additions & 19 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1144,33 +1144,21 @@ class DAGScheduler(
null
}

// The success case is dealt with separately below.
// TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(
stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}
// Note: this stage may already have been canceled, in which case this task end event
// maybe posted after the stage completed event. There's not much we can do here without
// introducing additional complexity in the scheduler to wait for all the task end events
// before posting the stage completed event.
listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

if (!stageIdToStage.contains(task.stageId)) {
logInfo("skip normal actions as stage cancelled")
// Need to handle tasks coming in late (speculative and jobs killed)
// post a task end event so accounting for things manually tracking tasks work.
// This really should be something other then success since the other speculative task
// finished first.
if (event.reason == Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
}
// Skip all the actions if the stage has been cancelled.
return
}

val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(countSubmittedMapStageAttempts() === 2)
}

test("late task events posted") {
test("task events always posted in speculation / when stage is killed") {
val baseRdd = new MyRDD(sc, 4, Nil)
val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0, 1, 2, 3))
Expand Down

0 comments on commit 5fc19c7

Please sign in to comment.