Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13054] Always post TaskEnd event for tasks #10951

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1144,13 +1144,12 @@ class DAGScheduler(
null
}

// The success case is dealt with separately below.
// TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(
stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}
// Note: this stage may already have been canceled, in which case this task end event
// maybe posted after the stage completed event. There's not much we can do here without
// introducing additional complexity in the scheduler to wait for all the task end events
// before posting the stage completed event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this comment does not fully explain why you want this. Also the part at the end makes it sound like it would be better if there was a larger refactoring, so there was no stage end until we heard back from all the tasks -- but I don't even think we'd want that, especially with speculation. How about something like

The stage may have already finished when we get this event -- eg. maybe it was a speculative task. Its important that we send the TaskEnd event in any case, so listeners know how many tasks executors are currently running. In particular, its important for DynamicAllocation to know if an executor is busy, and its also needed for the UI to update correctly. See SPARK-11701 / SPARK-13054

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it doesn't seem like we need to be so specific here. I don't see the benefit of adding the spark jira, you can go back to history to find change if needed I don't see that the information there provides much benefit over the rest of the comment.

I'll update shortly, let me know what you think.

listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
Expand All @@ -1160,8 +1159,6 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
var endedTasks = new HashSet[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be a val, its a mutable hashset.


override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
submittedStageInfos += stageSubmitted.stageInfo
Expand All @@ -148,6 +149,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
failedStages += stageInfo.stageId
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
endedTasks += taskEnd.taskInfo.taskId
}
}

var mapOutputTracker: MapOutputTrackerMaster = null
Expand Down Expand Up @@ -195,6 +200,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
sparkListener.endedTasks.clear()
failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
Expand Down Expand Up @@ -982,6 +988,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(countSubmittedMapStageAttempts() === 2)
}

test("task events always posted in speculation / when stage is killed") {
val baseRdd = new MyRDD(sc, 4, Nil)
val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0, 1, 2, 3))

// complete two tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(0), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(1), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// verify stage exists
assert(scheduler.stageIdToStage.contains(0))
assert(sparkListener.endedTasks.size == 2)

// finish other 2 tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(2), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 4)

// verify the stage is done
assert(!scheduler.stageIdToStage.contains(0))

// stage should be complete finish one other Successful task to simulate what can happen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit "stage should be complete. ..." (or whatever other punctuation you prefer)

// with a speculative task and make sure the event is sent out
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 5)

// make sure non successful tasks also send out event
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), UnknownReason, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 6)
}

test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand Down Expand Up @@ -1947,6 +1999,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
info
}

private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
info.finishTime = 1 // to prevent spurious errors in JobProgressListener
info
}

private def makeCompletionEvent(
task: Task[_],
reason: TaskEndReason,
Expand Down