Skip to content

Commit

Permalink
Fixed typos in DAGScheduler and reformatted method docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
zzvara committed Aug 24, 2015
1 parent 053d94f commit f835ae8
Showing 1 changed file with 44 additions and 15 deletions.
59 changes: 44 additions & 15 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,24 @@ class DAGScheduler(
// may lead to more delay in scheduling if those locations are busy.
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2

// Called by TaskScheduler to report task's starting.
/**
* Called by the TaskSetManager to report task's starting.
*/
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
}

// Called to report that a task has completed and results are being fetched remotely.
/**
* Called by the TaskSetManager to report that a task has completed
* and results are being fetched remotely.
*/
def taskGettingResult(taskInfo: TaskInfo) {
eventProcessLoop.post(GettingResultEvent(taskInfo))
}

// Called by TaskScheduler to report task completions or failures.
/**
* Called by the TaskSetManager to report task completions or failures.
*/
def taskEnded(
task: Task[_],
reason: TaskEndReason,
Expand All @@ -188,18 +195,24 @@ class DAGScheduler(
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}

// Called by TaskScheduler when an executor fails.
/**
* Called by TaskScheduler implementation when an executor fails.
*/
def executorLost(execId: String): Unit = {
eventProcessLoop.post(ExecutorLost(execId))
}

// Called by TaskScheduler when a host is added
/**
* Called by TaskScheduler implementation when a host is added.
*/
def executorAdded(execId: String, host: String): Unit = {
eventProcessLoop.post(ExecutorAdded(execId, host))
}

// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
// cancellation of the job itself.
/**
* Called by the TaskSetManager to cancel an entire TaskSet due to either repeated failures or
* cancellation of the job itself.
*/
def taskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]): Unit = {
eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
}
Expand Down Expand Up @@ -352,7 +365,9 @@ class DAGScheduler(
parents.toList
}

/** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
/**
* Find ancestor missing shuffle dependencies and register into shuffleToMapStage.
*/
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
while (parentsWithNoMapStage.nonEmpty) {
Expand All @@ -362,7 +377,9 @@ class DAGScheduler(
}
}

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
/**
* Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet.
*/
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
Expand Down Expand Up @@ -646,7 +663,9 @@ class DAGScheduler(
}
}

/** Finds the earliest-created active job that needs the stage */
/**
* Finds the earliest-created active job that needs the stage.
*/
// TODO: Probably should actually find among the active jobs that need this
// stage the one with the highest priority (highest-priority pool, earliest created).
// That should take care of at least part of the priority inversion problem with
Expand Down Expand Up @@ -746,7 +765,9 @@ class DAGScheduler(
submitWaitingStages()
}

/** Submits stage, but first recursively submits any missing parents. */
/**
* Submits stage, but first recursively submits any missing parents.
*/
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
Expand All @@ -769,7 +790,9 @@ class DAGScheduler(
}
}

/** Called when stage's parents are available and we can now do its task. */
/**
* Called when stage's parents are available and we can now do its task.
*/
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
Expand Down Expand Up @@ -912,7 +935,9 @@ class DAGScheduler(
}
}

/** Merge updates from a task to our local accumulator values */
/**
* Merge updates from a task to our local accumulator values.
*/
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
Expand Down Expand Up @@ -1259,7 +1284,9 @@ class DAGScheduler(
}
}

/** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
/**
* Fails a job and all stages that are only used by that job, and cleans up relevant state.
*/
private def failJobAndIndependentStages(
job: ActiveJob,
failureReason: String,
Expand Down Expand Up @@ -1309,7 +1336,9 @@ class DAGScheduler(
}
}

/** Return true if one of stage's ancestors is target. */
/**
* Return true if one of stage's ancestors is target.
*/
private def stageDependsOn(stage: Stage, target: Stage): Boolean = {
if (stage == target) {
return true
Expand Down

0 comments on commit f835ae8

Please sign in to comment.