From 711a3cba917523fba919bacdbdb05726fab5075b Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Tue, 2 Aug 2022 16:40:23 -0700 Subject: [PATCH 1/3] improve LaunchTask process --- .../org/apache/spark/scheduler/TaskInfo.scala | 6 +++ .../spark/scheduler/TaskSchedulerImpl.scala | 3 ++ .../spark/scheduler/TaskSetManager.scala | 7 ++- .../scheduler/TaskSchedulerImplSuite.scala | 47 +++++++++++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 15 +++++- 5 files changed, 74 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index b135a82145dcd..4a0f774c06a6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -93,6 +93,8 @@ class TaskInfo( var killed = false + var launching = true + private[spark] def markGettingResult(time: Long): Unit = { gettingResultTime = time } @@ -108,6 +110,10 @@ class TaskInfo( } } + private[spark] def launchSucceeded: Unit = { + launching = false + } + def gettingResult: Boolean = gettingResultTime != 0 def finished: Boolean = finishTime != 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 9bd6b976f40b4..fa01e82013fd4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -825,6 +825,9 @@ private[spark] class TaskSchedulerImpl( taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } } + if (state == TaskState.RUNNING) { + taskSet.taskInfos(tid).launchSucceeded + } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1636587e9dd96..dc558a5184249 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1067,11 +1067,14 @@ private[spark] class TaskSetManager( } } for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { + // If the task is launching, this indicates that Driver has sent LaunchTask to Executor, + // but Executor has not sent StatusUpdate(TaskState.RUNNING) to Driver. Hence, we assume that + // the task is not running, and it is NetworkFailure rather than TaskFailure. val exitCausedByApp: Boolean = reason match { - case exited: ExecutorExited => exited.exitCausedByApp + case ExecutorExited(_, false, _) => false case ExecutorKilled | ExecutorDecommission(_) => false case ExecutorProcessLost(_, _, false) => false - case _ => true + case _ => !info.launching } handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp, Some(reason.toString))) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 6b71bc8a6ede3..869a72324376b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -2085,6 +2085,53 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext assert(!taskSetManager.successful(taskDescriptions(0).index)) } + Seq(true, false).foreach { hasLaunched => + val testName = if (hasLaunched) { + "executor lost could fail task set if task is running" + } else { + "executor lost should not fail task set if task is launching" + } + test(s"SPARK-39955: $testName") { + val taskCpus = 2 + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", + config.TASK_MAX_FAILURES.key -> "1") + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {} + override def executorAdded(execId: String, host: String): Unit = {} + } + + val workerOffer = IndexedSeq( + WorkerOffer("executor0", "host0", 1)) + val taskSet = FakeTask.createTaskSet(1) + // submit tasks, offer resources, task gets scheduled + taskScheduler.submitTasks(taskSet) + var tsm: Option[TaskSetManager] = None + eventually(timeout(10.seconds)) { + tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId) + assert(tsm.isDefined && !tsm.get.isZombie) + } + val taskDescriptions = taskScheduler.resourceOffers(workerOffer) + assert(1 === taskDescriptions.length) + assert(taskScheduler.runningTasksByExecutors("executor0") === 1) + if (hasLaunched) { + taskScheduler.statusUpdate( + 0, + TaskState.RUNNING, + ByteBuffer.allocate(0)) + eventually(timeout(10.seconds)) { + assert(!tsm.get.taskInfos(0).launching) + } + } + taskScheduler.executorLost("executor0", ExecutorProcessLost()) + eventually(timeout(10.seconds)) { + assert(tsm.get.isZombie === hasLaunched) + } + } + } + /** * Used by tests to simulate a task failure. This calls the failure handler explicitly, to ensure * that all the state is updated when this method returns. Otherwise, there's no way to know when diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c1c62ea4b85d3..efb64a6f1bb34 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -639,6 +639,11 @@ class TaskSetManagerSuite ExecutorExited(143, false, "Terminated for reason unrelated to running tasks")) assert(!sched.taskSetsFailed.contains(taskSet.id)) assert(manager.resourceOffer("execC", "host2", ANY)._1.isDefined) + + // Driver receives StatusUpdate(RUNNING) from Executors + for ((tid, info) <- manager.taskInfos if info.running) { + manager.taskInfos(tid).launchSucceeded + } sched.removeExecutor("execC") manager.executorLost( "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks")) @@ -2203,14 +2208,20 @@ class TaskSetManagerSuite val (taskId0, index0, exec0) = (task0.taskId, task0.index, task0.executorId) val (taskId1, index1, exec1) = (task1.taskId, task1.index, task1.executorId) // set up two running tasks - assert(manager.taskInfos(taskId0).running) - assert(manager.taskInfos(taskId1).running) + assert(manager.taskInfos(taskId0).running && manager.taskInfos(taskId0).launching) + assert(manager.taskInfos(taskId1).running && manager.taskInfos(taskId1).launching) val numFailures = PrivateMethod[Array[Int]](Symbol("numFailures")) // no task failures yet assert(manager.invokePrivate(numFailures())(index0) === 0) assert(manager.invokePrivate(numFailures())(index1) === 0) + sched.asInstanceOf[TaskSchedulerImpl] + .statusUpdate(taskId1, TaskState.RUNNING, ByteBuffer.allocate(0)) + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert(manager.taskInfos(taskId0).running && manager.taskInfos(taskId0).launching) + assert(manager.taskInfos(taskId1).running && !manager.taskInfos(taskId1).launching) + } // let exec1 count task failures but exec0 doesn't backend.executorsPendingToRemove(exec0) = true backend.executorsPendingToRemove(exec1) = false From 112db52a3201e303c5f72d5b5ab6bc0ce990218e Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Wed, 3 Aug 2022 10:33:06 -0700 Subject: [PATCH 2/3] fix flaky --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index efb64a6f1bb34..f6d3572e620d2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2207,9 +2207,10 @@ class TaskSetManagerSuite val (taskId0, index0, exec0) = (task0.taskId, task0.index, task0.executorId) val (taskId1, index1, exec1) = (task1.taskId, task1.index, task1.executorId) + // set up two running tasks - assert(manager.taskInfos(taskId0).running && manager.taskInfos(taskId0).launching) - assert(manager.taskInfos(taskId1).running && manager.taskInfos(taskId1).launching) + assert(manager.taskInfos(taskId0).running) + assert(manager.taskInfos(taskId1).running) val numFailures = PrivateMethod[Array[Int]](Symbol("numFailures")) // no task failures yet @@ -2219,7 +2220,7 @@ class TaskSetManagerSuite sched.asInstanceOf[TaskSchedulerImpl] .statusUpdate(taskId1, TaskState.RUNNING, ByteBuffer.allocate(0)) eventually(timeout(10.seconds), interval(100.milliseconds)) { - assert(manager.taskInfos(taskId0).running && manager.taskInfos(taskId0).launching) + assert(manager.taskInfos(taskId0).running) assert(manager.taskInfos(taskId1).running && !manager.taskInfos(taskId1).launching) } // let exec1 count task failures but exec0 doesn't From 5f267372f72bf5ca49162655ab70183497415294 Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Thu, 11 Aug 2022 02:39:05 -0400 Subject: [PATCH 3/3] update --- .../main/scala/org/apache/spark/scheduler/TaskInfo.scala | 2 +- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 6 +++--- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 4a0f774c06a6b..0ee0dc6ae6016 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -110,7 +110,7 @@ class TaskInfo( } } - private[spark] def launchSucceeded: Unit = { + private[spark] def launchSucceeded(): Unit = { launching = false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index fa01e82013fd4..d3e27a94e2944 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -826,7 +826,7 @@ private[spark] class TaskSchedulerImpl( } } if (state == TaskState.RUNNING) { - taskSet.taskInfos(tid).launchSucceeded + taskSet.taskInfos(tid).launchSucceeded() } case None => logError( diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index dc558a5184249..d3560f61e9dec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1067,13 +1067,13 @@ private[spark] class TaskSetManager( } } for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - // If the task is launching, this indicates that Driver has sent LaunchTask to Executor, - // but Executor has not sent StatusUpdate(TaskState.RUNNING) to Driver. Hence, we assume that - // the task is not running, and it is NetworkFailure rather than TaskFailure. val exitCausedByApp: Boolean = reason match { case ExecutorExited(_, false, _) => false case ExecutorKilled | ExecutorDecommission(_) => false case ExecutorProcessLost(_, _, false) => false + // If the task is launching, this indicates that Driver has sent LaunchTask to Executor, + // but Executor has not sent StatusUpdate(TaskState.RUNNING) to Driver. Hence, we assume + // that the task is not running, and it is NetworkFailure rather than TaskFailure. case _ => !info.launching } handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp, diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index f6d3572e620d2..23aa396a4b268 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -642,7 +642,7 @@ class TaskSetManagerSuite // Driver receives StatusUpdate(RUNNING) from Executors for ((tid, info) <- manager.taskInfos if info.running) { - manager.taskInfos(tid).launchSucceeded + manager.taskInfos(tid).launchSucceeded() } sched.removeExecutor("execC") manager.executorLost(