Skip to content

Commit

Permalink
Mark unknown stage attempt with id -1 and drop that in JobProgressLis…
Browse files Browse the repository at this point in the history
…tener.
  • Loading branch information
rxin committed Aug 19, 2014
1 parent 6c08b07 commit 0f36075
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -897,13 +897,13 @@ class DAGScheduler(
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stageId = task.stageId
val stageInfo = stageIdToStage(task.stageId).latestInfo
val taskType = Utils.getFormattedClassName(task)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
listenerBus.post(SparkListenerTaskEnd(stageId, stageInfo.attemptId, taskType, event.reason,
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).orElse(-1)
listenerBus.post(SparkListenerTaskEnd(stageId, stageAttemptId, taskType, event.reason,
event.taskInfo, event.taskMetrics))
}

Expand All @@ -921,7 +921,6 @@ class DAGScheduler(
if (isSuccessful) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
} else {

logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
}
stage.latestInfo.completionTime = Some(clock.getTime())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
// If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
// compeletion event is for. Let's just drop it here. This means we might have some speculation
// tasks on the web ui that's never marked as complete.
if (info != null && taskEnd.stageAttemptId != -1) {
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
Expand Down

0 comments on commit 0f36075

Please sign in to comment.