Skip to content

Commit

Permalink
[SPARK-30388][CORE] Mark running map stages of finished job as finish…
Browse files Browse the repository at this point in the history
…ed, and cancel running tasks

### What changes were proposed in this pull request?

When a job finished, its running (re-submitted) map stages should be marked as finished if not used by other jobs. The running tasks of these stages are cancelled.

And the ListenerBus should be notified too, otherwise, these map stage items will stay on the "Active Stages" page of web UI and never gone.

For example:

Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 has two partitions, and its result stage 1 has two partitions too.

**Steps to reproduce the bug:**
1. map stage 0:    start task 0(```TID 0```) and task 1 (```TID 1```), then both finished successfully.
2. result stage 1:  start task 0(```TID 2```) and task 1 (```TID 3```)
3. result stage 1:  task 0(```TID 2```) finished successfully
4. result stage 1:  speculative task 1.1(```TID 4```) launched, but then failed due to FetchFailedException.
5. driver re-submits map stage 0 and result stage 1.
6. map stage 0 (retry 1): task0(```TID 5```) launched
7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 finished.
8. map stage 0 is removed from ```runningStages``` and ```stageIdToStage```, because it doesn't belong to any job.
```
  private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: ActiveJob): HashSet[Stage] = {
   ...
      stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach {
        case (stageId, stage) =>
            ...
            def removeStage(stageId: Int): Unit = {
              for (stage <- stageIdToStage.get(stageId)) {
                if (runningStages.contains(stage)) {
                  logDebug("Removing running stage %d".format(stageId))
                  runningStages -= stage
                }
                ...
              }
              stageIdToStage -= stageId
            }

            jobSet -= job.jobId
            if (jobSet.isEmpty) { // no other job needs this stage
              removeStage(stageId)
            }
          }
  ...
  }

```
9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished```
```
  private[scheduler] def DAGScheduler#handleTaskCompletion(event: CompletionEvent): Unit = {
    val task = event.task
    val stageId = task.stageId
    ...
    if (!stageIdToStage.contains(task.stageId)) {
      postTaskEnd(event)
      // Skip all the actions if the stage has been cancelled.
      return
    }
    ...
```

#### Relevant spark driver logs as follows:

```
20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions
20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0)
20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks
20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.854 s
20/01/02 11:22:26 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool

20/01/02 11:22:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/01/02 11:22:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 1.0 with 2 tasks
20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 9.179.143.4, executor 1, partition 0, NODE_LOCAL, 7929 bytes)
20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 9.76.13.26, executor 2, partition 1, NODE_LOCAL, 7929 bytes)
20/01/02 11:22:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 79 ms on 9.179.143.4 (executor 1) (1/2)

20/01/02 11:22:26 INFO TaskSetManager: Marking task 1 in stage 1.0 (on 9.76.13.26) as speculatable because it ran more than 158 ms
20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3, partition 1, ANY, 7929 bytes)
20/01/02 11:22:26 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3): FetchFailed(BlockManagerId(1, 9.179.143.4, 7337, None), shuffleId=0, mapId=0, reduceId=1, message=org.apache.spark.shuffle.FetchFailedException: Connection reset by peer)
20/01/02 11:22:26 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
20/01/02 11:22:26 INFO DAGScheduler: Marking ResultStage 1 (main at NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0)
20/01/02 11:22:26 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) failed in 0.261 s due to org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
20/01/02 11:22:26 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at NativeMethodAccessorImpl.java:0) due to fetch failure
20/01/02 11:22:26 INFO DAGScheduler: Resubmitting failed stages

20/01/02 11:22:26 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/01/02 11:22:26 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 0.1 with 1 tasks
20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 5, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)

// NOTE: Here should be "INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 10000 ms on 9.76.13.26 (executor 2) (2/2)"
// and this bug is being fixed in https://issues.apache.org/jira/browse/SPARK-30404

20/01/02 11:22:36 INFO TaskSetManager: Ignoring task-finished event for 1.0 in stage 1.0 because task 1 has already completed successfully

20/01/02 11:22:36 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
20/01/02 11:22:36 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) finished in 10.131 s
20/01/02 11:22:36 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 51.031212 s

20/01/02 11:22:58 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 5) in 32029 ms on 9.179.143.4 (executor 1) (1/1)
20/01/02 11:22:58 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose tasks have all completed, from pool
```

### Why are the changes needed?

web UI is incorrect: ```stage 0 (retry 1)``` is finished, but it stays in ```Active Stages``` Page.

![active_stage](https://user-images.githubusercontent.com/4401756/71656718-71185680-2d77-11ea-8dbc-fd8085ab3dfb.png)

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
A new test case is added.

And test manually on cluster. The result is as follows:
![cancel_stage](https://user-images.githubusercontent.com/4401756/71658434-04a15580-2d7f-11ea-952b-dd8dd685f37d.png)

Closes #27050 from liangxs/master.

Authored-by: xuesenliang <xuesenliang@tencent.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
  • Loading branch information
xuesenliang authored and tgravescs committed Mar 3, 2020
1 parent 97d9a22 commit 7a4cf33
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 12 deletions.
27 changes: 15 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Expand Up @@ -1434,6 +1434,7 @@ private[spark] class DAGScheduler(
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
cleanupStateForJobAndIndependentStages(job)
try {
// killAllTaskAttempts will fail if a SchedulerBackend does not implement
Expand Down Expand Up @@ -1978,18 +1979,12 @@ private[spark] class DAGScheduler(
}
}

/** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
private def failJobAndIndependentStages(
job: ActiveJob,
failureReason: String,
exception: Option[Throwable] = None): Unit = {
val error = new SparkException(failureReason, exception.orNull)
/** Cancel all independent, running stages that are only used by this job. */
private def cancelRunningIndependentStages(job: ActiveJob, reason: String): Boolean = {
var ableToCancelStages = true

// Cancel all independent, running stages.
val stages = jobIdToStageIds(job.jobId)
if (stages.isEmpty) {
logError("No stages registered for job " + job.jobId)
logError(s"No stages registered for job ${job.jobId}")
}
stages.foreach { stageId =>
val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds)
Expand All @@ -2001,12 +1996,12 @@ private[spark] class DAGScheduler(
if (!stageIdToStage.contains(stageId)) {
logError(s"Missing Stage for stage with id $stageId")
} else {
// This is the only job that uses this stage, so fail the stage if it is running.
// This stage is only used by the job, so finish the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job))
markStageAsFinished(stage, Some(failureReason))
markStageAsFinished(stage, Some(reason))
} catch {
case e: UnsupportedOperationException =>
logWarning(s"Could not cancel tasks for stage $stageId", e)
Expand All @@ -2016,11 +2011,19 @@ private[spark] class DAGScheduler(
}
}
}
ableToCancelStages
}

if (ableToCancelStages) {
/** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
private def failJobAndIndependentStages(
job: ActiveJob,
failureReason: String,
exception: Option[Throwable] = None): Unit = {
if (cancelRunningIndependentStages(job, failureReason)) {
// SPARK-15783 important to cleanup state first, just for tests where we have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in the tests.
cleanupStateForJobAndIndependentStages(job)
val error = new SparkException(failureReason, exception.orNull)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
Expand Down
Expand Up @@ -1933,6 +1933,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}

test("shuffle fetch failed on speculative task, but original task succeed (SPARK-30388)") {
var completedStage: List[Int] = Nil
val listener = new SparkListener() {
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
completedStage = completedStage :+ event.stageInfo.stageId
}
}
sc.addSparkListener(listener)

val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
completeShuffleMapStageSuccessfully(0, 0, 2)
assert(completedStage === List(0))

// result task 0.0 succeed
runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, 42))
// speculative result task 1.1 fetch failed
val info = new TaskInfo(4, index = 1, attemptNumber = 1, 0L, "", "", TaskLocality.ANY, true)
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0, 1, "ignored"),
null,
Seq.empty,
Array.empty,
info
)
)
assert(completedStage === List(0, 1))

Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
// map stage resubmitted
assert(scheduler.runningStages.size === 1)
val mapStage = scheduler.runningStages.head
assert(mapStage.id === 0)
assert(mapStage.latestInfo.failureReason.isEmpty)

// original result task 1.0 succeed
runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, 42))
assert(completedStage === List(0, 1, 1, 0))
assert(scheduler.activeJobs.isEmpty)
}

test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
val acc = new LongAccumulator {
override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException
Expand Down

0 comments on commit 7a4cf33

Please sign in to comment.