Skip to content

Commit

Permalink
change function name; comply with code style.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jul 31, 2014
1 parent 5f072a7 commit 6b2c615
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,9 @@ class DAGScheduler(
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
val parentsWithNoMapStage = getParentShuffleDependencies(shuffleDep.rdd)
while (!parentsWithNoMapStage.isEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
val stage =
newOrUsedStage(
currentShufDep.rdd, currentShufDep.rdd.partitions.size, currentShufDep, jobId,
currentShufDep.rdd.creationSite)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId)
// Then register current shuffleDep
val stage =
newOrUsedStage(
shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
Expand Down Expand Up @@ -275,7 +269,8 @@ class DAGScheduler(
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError caused by recursively visiting
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
Expand All @@ -299,10 +294,25 @@ class DAGScheduler(
parents.toList
}

private def getParentShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
// Find ancestor missing shuffle dependencies and register into shuffleToMapStage
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) = {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
while (!parentsWithNoMapStage.isEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
val stage =
newOrUsedStage(
currentShufDep.rdd, currentShufDep.rdd.partitions.size, currentShufDep, jobId,
currentShufDep.rdd.creationSite)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}

// Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError caused by recursively visiting
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
Expand Down Expand Up @@ -332,7 +342,8 @@ class DAGScheduler(
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError caused by recursively visiting
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
Expand Down Expand Up @@ -1149,7 +1160,8 @@ class DAGScheduler(
}
val visitedRdds = new HashSet[RDD[_]]
val visitedStages = new HashSet[Stage]
// We are manually maintaining a stack here to prevent StackOverflowError caused by recursively visiting
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visitedRdds(rdd)) {
Expand Down

0 comments on commit 6b2c615

Please sign in to comment.