Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13054] Always post TaskEnd event for tasks #10951

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ object SparkHadoopMapRedUtil extends Logging {
performCommit()
}
} else {
// Some other attempt committed the output, so we do nothing and signal success
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
// Some other attempt committed the output, this generally means speculation, we need to mark
// this task as failure so accounting work correctly
val taskAttemptNumber = TaskContext.get().attemptNumber()
val message =
s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID"
logInfo(message)
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,16 @@ class DAGScheduler(
}

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
logInfo("skip normal actions as stage cancelled")
// Need to handle tasks coming in late (speculative and jobs killed)
// post a task end event so accounting for things manually tracking tasks work.
// This really should be something other then success since the other speculative task
// finished first.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a better way to always post this event. I have some changes in #10958 to do this in a cleaner way: https://github.com/apache/spark/pull/10958/files#r51510044. I believe the semantics there are the same as the ones here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is fine for now since its combining failed vs success tasks. I do think its a bit weird that Spark marks all speculative tasks as Success even when both obviously don't commit. That is part of the other jira I was going to file though and if needed it can be split back apart at that point.

It does seem a bit odd to throw SPARK-13054 in with the other changes in the same pr though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way you should put SPARK-13054 in the title of this patch if you plan to do that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating now, need to test and then will post updated version.

if (event.reason == Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
}
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
var endedTasks = new HashSet[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be a val, its a mutable hashset.


override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
submittedStageInfos += stageSubmitted.stageInfo
Expand All @@ -148,6 +149,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
failedStages += stageInfo.stageId
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
endedTasks += taskEnd.taskInfo.taskId
}
}

var mapOutputTracker: MapOutputTrackerMaster = null
Expand Down Expand Up @@ -195,6 +200,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
sparkListener.endedTasks.clear()
failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
Expand Down Expand Up @@ -982,6 +988,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(countSubmittedMapStageAttempts() === 2)
}

test("late task events posted") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you call this task events always posted in speculation / when stage is killed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure even though late events could happen for lots of reasons, including things like network issues.

val baseRdd = new MyRDD(sc, 4, Nil)
val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0, 1, 2, 3))

// complete two tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(0), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(1), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// verify stage exists
assert(scheduler.stageIdToStage.contains(0))
assert(sparkListener.endedTasks.size == 2)

// finish other 2 tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(2), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 4)

// verify the stage is done
assert(!scheduler.stageIdToStage.contains(0))

// stage should be complete finish one other Successful task to simulate what can happen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit "stage should be complete. ..." (or whatever other punctuation you prefer)

// with a speculative task and make sure the event is sent out
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 5)

// make sure non successful tasks also send out event
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), UnknownReason, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 6)
}

test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand Down Expand Up @@ -1947,6 +1999,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
info
}

private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
info.finishTime = 1 // to prevent spurious errors in JobProgressListener
info
}

private def makeCompletionEvent(
task: Task[_],
reason: TaskEndReason,
Expand Down