Skip to content

Commit

Permalink
rollback some changes
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Nov 24, 2014
1 parent 6b0aff9 commit 02261b8
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -409,31 +409,6 @@ class DAGScheduler(
updateJobIdStageIdMapsList(List(stage))
}

def removeStage(stageId: Int) {
// data structures based on Stage
for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
runningStages -= stage
}
for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
shuffleToMapStage.remove(k)
}
if (waitingStages.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
waitingStages -= stage
}
if (failedStages.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
failedStages -= stage
}
}
// data structures based on StageId
stageIdToStage -= stageId
logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}

/**
* Removes state for job and any stages that are not needed by any other job. Does not
* handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks.
Expand All @@ -453,6 +428,31 @@ class DAGScheduler(
"Job %d not registered for stage %d even though that stage was registered for the job"
.format(job.jobId, stageId))
} else {
def removeStage(stageId: Int) {
// data structures based on Stage
for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
runningStages -= stage
}
for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
shuffleToMapStage.remove(k)
}
if (waitingStages.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
waitingStages -= stage
}
if (failedStages.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
failedStages -= stage
}
}
// data structures based on StageId
stageIdToStage -= stageId
logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}

jobSet -= job.jobId
if (jobSet.isEmpty) { // no other job needs this stage
removeStage(stageId)
Expand Down

0 comments on commit 02261b8

Please sign in to comment.