Skip to content

Commit

Permalink
[SPARK-10003] Improve readability of DAGScheduler
Browse files Browse the repository at this point in the history
Note: this is not intended to be in Spark 1.5!

This patch rewrites some code in the `DAGScheduler` to make it more readable. In particular
- there were blocks of code that are unnecessary and removed for simplicity
- there were abstractions that are unnecessary and made the code hard to navigate
- other minor changes

Author: Andrew Or <andrew@databricks.com>

Closes #8217 from andrewor14/dag-scheduler-readability and squashes the following commits:

57abca3 [Andrew Or] Move comment back into if case
574fb1e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-scheduler-readability
64a9ed2 [Andrew Or] Remove unnecessary code + minor code rewrites
  • Loading branch information
Andrew Or authored and kayousterhout committed Sep 4, 2015
1 parent 208fbca commit cf42138
Showing 1 changed file with 9 additions and 37 deletions.
46 changes: 9 additions & 37 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,12 @@ class DAGScheduler(
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, firstJobId)
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage

stage
}
}
Expand Down Expand Up @@ -365,16 +366,6 @@ class DAGScheduler(
parents.toList
}

/** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
Expand All @@ -391,11 +382,9 @@ class DAGScheduler(
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}

waitingForVisit.push(shufDep.rdd)
case _ =>
waitingForVisit.push(dep.rdd)
}
waitingForVisit.push(dep.rdd)
}
}
}
Expand Down Expand Up @@ -1052,10 +1041,11 @@ class DAGScheduler(
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head),
shuffleStage.outputLocs.map(_.headOption.orNull),
changeEpoch = true)

clearCacheLocs()

if (shuffleStage.outputLocs.contains(Nil)) {
// Some tasks had failed; let's resubmit this shuffleStage
// TODO: Lower-level scheduler should also deal with this
Expand All @@ -1064,27 +1054,9 @@ class DAGScheduler(
shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
.map(_._2).mkString(", "))
submitStage(shuffleStage)
} else {
val newlyRunnable = new ArrayBuffer[Stage]
for (shuffleStage <- waitingStages) {
logInfo("Missing parents for " + shuffleStage + ": " +
getMissingParentStages(shuffleStage))
}
for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
{
newlyRunnable += shuffleStage
}
waitingStages --= newlyRunnable
runningStages ++= newlyRunnable
for {
shuffleStage <- newlyRunnable.sortBy(_.id)
jobId <- activeJobForStage(shuffleStage)
} {
logInfo("Submitting " + shuffleStage + " (" +
shuffleStage.rdd + "), which is now runnable")
submitMissingTasks(shuffleStage, jobId)
}
}

// Note: newly runnable stages will be submitted below when we submit waiting stages
}
}

Expand Down Expand Up @@ -1186,7 +1158,7 @@ class DAGScheduler(
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head)
val locs = stage.outputLocs.map(_.headOption.orNull)
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
if (shuffleToMapStage.isEmpty) {
Expand Down

0 comments on commit cf42138

Please sign in to comment.