Skip to content

Commit

Permalink
- Rename attempt to retry in UI.
Browse files Browse the repository at this point in the history
- Properly report stage failure in FetchFailed.
  • Loading branch information
rxin committed Aug 20, 2014
1 parent 40a6bd5 commit 3ee1d2a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -915,17 +915,18 @@ class DAGScheduler(
}
val stage = stageIdToStage(task.stageId)

def markStageAsFinished(stage: Stage, isSuccessful: Boolean) = {
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
case _ => "Unknown"
}
if (isSuccessful) {
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTime())
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
}
stage.latestInfo.completionTime = Some(clock.getTime())
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
}
Expand Down Expand Up @@ -964,7 +965,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(stage, isSuccessful = true)
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
}
Expand Down Expand Up @@ -993,7 +994,7 @@ class DAGScheduler(
stage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {
markStageAsFinished(stage, isSuccessful = true)
markStageAsFinished(stage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
Expand Down Expand Up @@ -1046,7 +1047,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
markStageAsFinished(failedStage, isSuccessful = false)
markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private[ui] class StageTableBase(
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""

{if (s.attemptId > 0) {
<td>{s.stageId} (attempt {s.attemptId})</td>
<td>{s.stageId} (retry {s.attemptId})</td>
} else {
<td>{s.stageId}</td>
}} ++
Expand Down

0 comments on commit 3ee1d2a

Please sign in to comment.