diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 684db6646765f..11bd2d3b939de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -152,17 +152,24 @@ class DAGScheduler( // may lead to more delay in scheduling if those locations are busy. private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2 - // Called by TaskScheduler to report task's starting. + /** + * Called by the TaskSetManager to report task's starting. + */ def taskStarted(task: Task[_], taskInfo: TaskInfo) { eventProcessLoop.post(BeginEvent(task, taskInfo)) } - // Called to report that a task has completed and results are being fetched remotely. + /** + * Called by the TaskSetManager to report that a task has completed + * and results are being fetched remotely. + */ def taskGettingResult(taskInfo: TaskInfo) { eventProcessLoop.post(GettingResultEvent(taskInfo)) } - // Called by TaskScheduler to report task completions or failures. + /** + * Called by the TaskSetManager to report task completions or failures. + */ def taskEnded( task: Task[_], reason: TaskEndReason, @@ -188,18 +195,24 @@ class DAGScheduler( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) } - // Called by TaskScheduler when an executor fails. + /** + * Called by TaskScheduler implementation when an executor fails. + */ def executorLost(execId: String): Unit = { eventProcessLoop.post(ExecutorLost(execId)) } - // Called by TaskScheduler when a host is added + /** + * Called by TaskScheduler implementation when a host is added. + */ def executorAdded(execId: String, host: String): Unit = { eventProcessLoop.post(ExecutorAdded(execId, host)) } - // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or - // cancellation of the job itself. + /** + * Called by the TaskSetManager to cancel an entire TaskSet due to either repeated failures or + * cancellation of the job itself. + */ def taskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]): Unit = { eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception)) } @@ -352,7 +365,9 @@ class DAGScheduler( parents.toList } - /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */ + /** + * Find ancestor missing shuffle dependencies and register into shuffleToMapStage. + */ private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) { val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) while (parentsWithNoMapStage.nonEmpty) { @@ -362,7 +377,9 @@ class DAGScheduler( } } - /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ + /** + * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet. + */ private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { val parents = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] @@ -646,7 +663,9 @@ class DAGScheduler( } } - /** Finds the earliest-created active job that needs the stage */ + /** + * Finds the earliest-created active job that needs the stage. + */ // TODO: Probably should actually find among the active jobs that need this // stage the one with the highest priority (highest-priority pool, earliest created). // That should take care of at least part of the priority inversion problem with @@ -746,7 +765,9 @@ class DAGScheduler( submitWaitingStages() } - /** Submits stage, but first recursively submits any missing parents. */ + /** + * Submits stage, but first recursively submits any missing parents. + */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { @@ -769,7 +790,9 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ + /** + * Called when stage's parents are available and we can now do its task. + */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry @@ -912,7 +935,9 @@ class DAGScheduler( } } - /** Merge updates from a task to our local accumulator values */ + /** + * Merge updates from a task to our local accumulator values. + */ private def updateAccumulators(event: CompletionEvent): Unit = { val task = event.task val stage = stageIdToStage(task.stageId) @@ -1259,7 +1284,9 @@ class DAGScheduler( } } - /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */ + /** + * Fails a job and all stages that are only used by that job, and cleans up relevant state. + */ private def failJobAndIndependentStages( job: ActiveJob, failureReason: String, @@ -1309,7 +1336,9 @@ class DAGScheduler( } } - /** Return true if one of stage's ancestors is target. */ + /** + * Return true if one of stage's ancestors is target. + */ private def stageDependsOn(stage: Stage, target: Stage): Boolean = { if (stage == target) { return true