Skip to content

Commit

Permalink
roll back stageDependsOn
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 23, 2015
1 parent ebc35e1 commit 53d44f4
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,30 +196,30 @@ class DAGScheduler(
}

/**
* iterate RDD graph through lineage, and process ShuffleDependency by user function
* iterate RDD graph through lineage, and maybe stop at some point if doesn't satisfy
* user's keepGoing function.
*/
private def iterateRDDGraph(
rdd: RDD[_],
continueAfterShuffle: Boolean,
fun: ShuffleDependency[_, _, _] => Unit,
filter: RDD[_] => Boolean = _ => true): Unit = {
keepGoingByShuf: ShuffleDependency[_, _, _] => Boolean,
keepGoingByRDD: RDD[_] => Boolean = _ => true): collection.Set[RDD[_]] = {
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val r = waitingForVisit.pop()
if (visited.add(r) && filter(r)) {
if (visited.add(r) && keepGoingByRDD(r)) {
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
fun(shufDep)
if (continueAfterShuffle) waitingForVisit.push(shufDep.rdd)
if (keepGoingByShuf(shufDep)) waitingForVisit.push(shufDep.rdd)
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
visited
}

/**
Expand Down Expand Up @@ -303,7 +303,10 @@ class DAGScheduler(
*/
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
iterateRDDGraph(rdd, false, shufDep => parents += getShuffleMapStage(shufDep, jobId))
iterateRDDGraph(rdd, shufDep => {
parents += getShuffleMapStage(shufDep, jobId)
false
})
parents.toList
}

Expand All @@ -323,21 +326,23 @@ class DAGScheduler(
// Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
iterateRDDGraph(rdd, true, shufDep => {
iterateRDDGraph(rdd, shufDep => {
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}
true
})
parents
}

private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
iterateRDDGraph(stage.rdd, false, shufDep => {
iterateRDDGraph(stage.rdd, shufDep => {
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
false
}, rdd => getCacheLocs(rdd).contains(Nil))
missing.toList
}
Expand All @@ -352,7 +357,8 @@ class DAGScheduler(
val s = stages.head
s.jobIds += jobId
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
val parentsWithoutThisJobId = stage.parents.filterNot(_.jobIds.contains(jobId))
val parents: List[Stage] = getParentStages(s.rdd, jobId)
val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
}
}
Expand Down Expand Up @@ -1197,7 +1203,10 @@ class DAGScheduler(
if (stage == target) {
return true
}
stage.parents.exists(_.rdd == target.rdd)
val visitedRdds = iterateRDDGraph(stage.rdd, shufDep => {
!getShuffleMapStage(shufDep, stage.jobId).isAvailable
})
visitedRdds.contains(target.rdd)
}

/**
Expand Down

0 comments on commit 53d44f4

Please sign in to comment.