Skip to content

Commit

Permalink
Skip re-computing getMissingParentStages.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Aug 18, 2015
1 parent dd0614f commit 8416b7f
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,8 @@ class DAGScheduler(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val missingStages = getMissingParentStages(finalStage)
logInfo("Missing parents: " + missingStages)
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
Expand All @@ -741,18 +742,18 @@ class DAGScheduler(
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitStage(finalStage, Some(missingStages))
}
submitWaitingStages()
}

/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
private def submitStage(stage: Stage, missingStages: Option[List[Stage]] = None) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
val missing = missingStages.getOrElse(getMissingParentStages(stage)).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
Expand Down

0 comments on commit 8416b7f

Please sign in to comment.