diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 40ea5c59ec04e..df757d8f5b852 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -219,9 +219,16 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { + val heartbeatTimeoutReason = ExecutorProcessLost( + s"Executor heartbeat timed out after ${now - lastSeenMs} ms") // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) - sc.killAndReplaceExecutor(executorId) + sc.schedulerBackend match { + case backend: CoarseGrainedSchedulerBackend => + backend.killAndReplaceExecutor(executorId, heartbeatTimeoutReason) + case _ => + sc.killAndReplaceExecutor(executorId) + } // SPARK-27348: in case of the executors which are not gracefully shut down, // we should remove lost executors from CoarseGrainedSchedulerBackend manually // here to guarantee two things: @@ -231,9 +238,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // those executors to avoid app hang sc.schedulerBackend match { case backend: CoarseGrainedSchedulerBackend => - backend.driverEndpoint.send(RemoveExecutor(executorId, - ExecutorProcessLost( - s"Executor heartbeat timed out after ${now - lastSeenMs} ms"))) + backend.driverEndpoint.send(RemoveExecutor(executorId, heartbeatTimeoutReason)) // LocalSchedulerBackend is used locally and only has one single executor case _: LocalSchedulerBackend => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ad92e22424c77..2503d3ccd9528 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -100,6 +100,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") private[scheduler] val executorsPendingToRemove = new HashMap[String, Boolean] + // When a forced driver-side removal should preserve a more specific loss reason, keep it until + // the backend reports the executor removed. + @GuardedBy("CoarseGrainedSchedulerBackend.this") + private val pendingRemovalLossReasons = new HashMap[String, ExecutorLossReason] + // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] @@ -467,12 +472,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap -= executorId executorsPendingLossReason -= executorId val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) + val pendingRemovalLossReason = pendingRemovalLossReasons.remove(executorId) val decommissionInfoOpt = executorsPendingDecommission.remove(executorId) if (killedByDriver) { ExecutorKilled } else if (decommissionInfoOpt.isDefined) { val decommissionInfo = decommissionInfoOpt.get ExecutorDecommission(decommissionInfo.workerHost, decommissionInfo.message) + } else if (reason == ExecutorKilled) { + pendingRemovalLossReason.getOrElse(reason) } else { reason } @@ -938,6 +946,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp adjustTargetNumExecutors: Boolean, countFailures: Boolean, force: Boolean): Seq[String] = { + killExecutors(executorIds, adjustTargetNumExecutors, countFailures, force, None) + } + + private[spark] def killAndReplaceExecutor( + executorId: String, + lossReason: ExecutorLossReason): Boolean = { + killExecutors(Seq(executorId), adjustTargetNumExecutors = false, countFailures = true, + force = true, lossReason = Some(lossReason)).nonEmpty + } + + private[spark] def killExecutors( + executorIds: Seq[String], + adjustTargetNumExecutors: Boolean, + countFailures: Boolean, + force: Boolean, + lossReason: Option[ExecutorLossReason]): Seq[String] = { logInfo( log"Requesting to kill executor(s) ${MDC(LogKeys.EXECUTOR_IDS, executorIds.mkString(", "))}") @@ -952,7 +976,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } - executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures } + executorsToKill.foreach { id => + executorsPendingToRemove(id) = !countFailures + lossReason.foreach { reason => + pendingRemovalLossReasons(id) = reason + } + } logInfo(log"Actual list of executor(s) to be killed is " + log"${MDC(LogKeys.EXECUTOR_IDS, executorsToKill.mkString(", "))}") @@ -975,8 +1004,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + val cleanedKillResponse = killResponse.transform { result => + result match { + case scala.util.Success(true) => + case _ => + CoarseGrainedSchedulerBackend.this.synchronized { + executorsToKill.foreach(pendingRemovalLossReasons -= _) + } + } + result + }(ThreadUtils.sameThread) - killResponse.flatMap(killSuccessful => + cleanedKillResponse.flatMap(killSuccessful => Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String]) )(ThreadUtils.sameThread) } diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 7d1b8f8ea508b..c285bad12895e 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -167,7 +167,12 @@ class HeartbeatReceiverSuite val fakeClusterManager = new FakeClusterManager(rpcEnv, conf) val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager) val fakeSchedulerBackend = - new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef, sc.resourceProfileManager) + new FakeSchedulerBackend( + scheduler, + rpcEnv, + fakeClusterManagerRef, + sc.resourceProfileManager, + removeExecutorsOnKillWithReason = Some(ExecutorKilled)) when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend) // Register fake executors with our fake scheduler backend @@ -215,6 +220,51 @@ class HeartbeatReceiverSuite eventually(timeout(5.seconds)) { assert(!fakeSchedulerBackend.getExecutorIds().contains(executorId1)) assert(!fakeSchedulerBackend.getExecutorIds().contains(executorId2)) + verify(scheduler).executorLost(executorId1, + ExecutorProcessLost(s"Executor heartbeat timed out after ${executorTimeout * 2} ms")) + verify(scheduler).executorLost(executorId2, + ExecutorProcessLost(s"Executor heartbeat timed out after ${executorTimeout * 2} ms")) + } + fakeSchedulerBackend.stop() + } + + test("heartbeat timeout should not override a concrete backend loss reason") { + val rpcEnv = sc.env.rpcEnv + val fakeClusterManager = new FakeClusterManager(rpcEnv, conf) + val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm-concrete-loss", fakeClusterManager) + val concreteLossReason = ExecutorExited( + 1, + exitCausedByApp = false, + "Executor pod was deleted by the cluster manager.") + val fakeSchedulerBackend = + new FakeSchedulerBackend( + scheduler, + rpcEnv, + fakeClusterManagerRef, + sc.resourceProfileManager, + removeExecutorsOnKillWithReason = Some(concreteLossReason)) + when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend) + + fakeSchedulerBackend.start() + val dummyExecutorEndpoint = new FakeExecutorEndpoint(rpcEnv) + val dummyExecutorEndpointRef = + rpcEnv.setupEndpoint("fake-executor-concrete-loss", dummyExecutorEndpoint) + fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( + RegisterExecutor(executorId1, dummyExecutorEndpointRef, "1.2.3.4", 0, Map.empty, + Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) + addExecutorAndVerify(executorId1) + triggerHeartbeat(executorId1, executorShouldReregister = false) + + val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs()) + heartbeatReceiverClock.advance(executorTimeout * 2) + heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts) + val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread()) + killThread.shutdown() + killThread.awaitTermination(10L, TimeUnit.SECONDS) + + eventually(timeout(5.seconds)) { + verify(scheduler).executorLost(executorId1, concreteLossReason) } fakeSchedulerBackend.stop() } @@ -328,19 +378,23 @@ private class FakeSchedulerBackend( scheduler: TaskSchedulerImpl, rpcEnv: RpcEnv, clusterManagerEndpoint: RpcEndpointRef, - resourceProfileManager: ResourceProfileManager) + resourceProfileManager: ResourceProfileManager, + removeExecutorsOnKillWithReason: Option[ExecutorLossReason] = None) extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { - def this() = this(null, null, null, null) + def this() = this(null, null, null, null, removeExecutorsOnKillWithReason = None) protected override def doRequestTotalExecutors( resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = { clusterManagerEndpoint.ask[Boolean]( RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId, rpHostToLocalTaskCount, Set.empty)) -} + } protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { + removeExecutorsOnKillWithReason.foreach { reason => + executorIds.foreach(removeExecutor(_, reason)) + } clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 127e4b2e413ce..0bbc9bd220569 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -27,7 +27,7 @@ import scala.language.postfixOps import scala.reflect.ClassTag import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito.when +import org.mockito.Mockito.{verify, when} import org.mockito.invocation.InvocationOnMock import org.scalatest.concurrent.Eventually import org.scalatestplus.mockito.MockitoSugar._ @@ -606,6 +606,36 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(mockEndpointRef.decommissionReceived) } + test("pending replacement loss reason should be cleared when executor kill is rejected") { + val conf = new SparkConf() + .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") + .set(EXECUTOR_INSTANCES, 0) + .setMaster( + "coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]") + .setAppName("test") + + sc = new SparkContext(conf) + val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend] + val scheduler = backend.getTaskSchedulerImpl() + val mockEndpointRef = mock[RpcEndpointRef] + val mockAddress = mock[RpcAddress] + backend.driverEndpoint.askSync[Boolean]( + RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, + Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) + + assert(backend.killExecutors( + Seq("1"), + adjustTargetNumExecutors = false, + countFailures = true, + force = true, + lossReason = Some(ExecutorProcessLost("Executor heartbeat timed out"))).isEmpty) + + backend.driverEndpoint.send(RemoveExecutor("1", ExecutorKilled)) + eventually(timeout(5.seconds)) { + verify(scheduler).executorLost("1", ExecutorKilled) + } + } + private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = { sc.submitJob( rdd,