diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index cdba1c44034c0..3c54f820c1313 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -17,7 +17,7 @@ package org.apache.spark -import org.apache.spark.scheduler.ExecutorDecommissionInfo +import org.apache.spark.scheduler.ExecutorDecommissionReason /** * A client that communicates with the cluster manager to request or kill executors. @@ -88,44 +88,35 @@ private[spark] trait ExecutorAllocationClient { * Default implementation delegates to kill, scheduler must override * if it supports graceful decommissioning. * - * @param executorsAndDecomInfo identifiers of executors & decom info. + * @param executorsAndDecomReason identifiers of executors & decom reason. * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down * after these executors have been decommissioned. - * @param triggeredByExecutor whether the decommission is triggered at executor. * @return the ids of the executors acknowledged by the cluster manager to be removed. */ def decommissionExecutors( - executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], - adjustTargetNumExecutors: Boolean, - triggeredByExecutor: Boolean): Seq[String] = { - killExecutors(executorsAndDecomInfo.map(_._1), + executorsAndDecomReason: Array[(String, ExecutorDecommissionReason)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + killExecutors(executorsAndDecomReason.map(_._1), adjustTargetNumExecutors, countFailures = false) } - /** * Request that the cluster manager decommission the specified executor. * Delegates to decommissionExecutors. * * @param executorId identifiers of executor to decommission - * @param decommissionInfo information about the decommission (reason, host loss) + * @param decomReason the decommission reason of the executor * @param adjustTargetNumExecutors if we should adjust the target number of executors. - * @param triggeredByExecutor whether the decommission is triggered at executor. - * (TODO: add a new type like `ExecutorDecommissionInfo` for the - * case where executor is decommissioned at executor first, so we - * don't need this extra parameter.) * @return whether the request is acknowledged by the cluster manager. */ final def decommissionExecutor( executorId: String, - decommissionInfo: ExecutorDecommissionInfo, - adjustTargetNumExecutors: Boolean, - triggeredByExecutor: Boolean = false): Boolean = { + decomReason: ExecutorDecommissionReason, + adjustTargetNumExecutors: Boolean): Boolean = { val decommissionedExecutors = decommissionExecutors( - Array((executorId, decommissionInfo)), - adjustTargetNumExecutors = adjustTargetNumExecutors, - triggeredByExecutor = triggeredByExecutor) + Array((executorId, decomReason)), + adjustTargetNumExecutors = adjustTargetNumExecutors) decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) } diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 341334c8a29c4..45a96bc8b6884 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -579,12 +579,11 @@ private[spark] class ExecutorAllocationManager( // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. if (decommissionEnabled) { - val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( - id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray + val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(id => + (id, DynamicAllocationDecommission())).toArray[(String, ExecutorDecommissionReason)] client.decommissionExecutors( executorIdsWithoutHostLoss, - adjustTargetNumExecutors = false, - triggeredByExecutor = false) + adjustTargetNumExecutors = false) } else { client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, countFailures = false, force = false) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index e5efb15f6bc51..efa6a2ca12b3e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging import org.apache.spark.rpc._ -import org.apache.spark.scheduler.ExecutorDecommissionInfo +import org.apache.spark.scheduler.StandaloneDecommission import org.apache.spark.util.{RpcUtils, ThreadUtils} /** @@ -182,8 +182,7 @@ private[spark] class StandaloneAppClient( if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost) } else if (state == ExecutorState.DECOMMISSIONED) { - listener.executorDecommissioned(fullId, - ExecutorDecommissionInfo(message.getOrElse(""), workerHost)) + listener.executorDecommissioned(fullId, StandaloneDecommission(workerHost)) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 76970ac9829c9..7876bffda1a94 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.client -import org.apache.spark.scheduler.ExecutorDecommissionInfo +import org.apache.spark.scheduler.ExecutorDecommissionReason /** * Callbacks invoked by deploy client when various events happen. There are currently five events: @@ -41,7 +41,7 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit - def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit + def executorDecommissioned(fullId: String, decomReason: ExecutorDecommissionReason): Unit def workerRemoved(workerId: String, host: String, message: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 080e0e7f1552f..61a96cbadfe87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler( val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled val isHostDecommissioned = taskScheduler .getExecutorDecommissionState(bmAddress.executorId) - .exists(_.workerHost.isDefined) + .exists(_.isHostDecommissioned) // Shuffle output of all executors on host `bmAddress.host` may be lost if: // - External shuffle service is enabled, so we assume that all shuffle data on node is @@ -2368,7 +2368,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ExecutorLost(execId, reason) => val workerHost = reason match { case ExecutorProcessLost(_, workerHost, _) => workerHost - case ExecutorDecommission(workerHost) => workerHost + case ExecutorDecommission(_, host) => host case _ => None } dagScheduler.handleExecutorLost(execId, workerHost) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala deleted file mode 100644 index 7eec070232c3b..0000000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -/** - * Message providing more detail when an executor is being decommissioned. - * @param message Human readable reason for why the decommissioning is happening. - * @param workerHost When workerHost is defined, it means the host (aka the `node` or `worker` - * in other places) has been decommissioned too. Used to infer if the - * shuffle data might be lost even if the external shuffle service is enabled. - */ -private[spark] -case class ExecutorDecommissionInfo(message: String, workerHost: Option[String] = None) - -/** - * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived - * from the info message above but it is kept distinct to allow the state to evolve independently - * from the message. - */ -case class ExecutorDecommissionState( - // Timestamp the decommissioning commenced as per the Driver's clock, - // to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL - // is configured. - startTime: Long, - workerHost: Option[String] = None) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala new file mode 100644 index 0000000000000..1a7ab0a927c99 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +private[spark] sealed trait ExecutorDecommissionReason { + val reason: String = "decommissioned" + override def toString: String = reason +} + +/** + * For the case where decommission is trigger because of executor dynamic allocation + */ +case class DynamicAllocationDecommission() extends ExecutorDecommissionReason { + override val reason: String = "decommissioned by dynamic allocation" +} + +/** + * For the case where decommission is triggered at executor fist. + */ +class ExecutorTriggeredDecommission extends ExecutorDecommissionReason + +/** + * For the Kubernetes workloads + */ +case class K8SDecommission() extends ExecutorTriggeredDecommission + +/** + * For the Standalone workloads. + * @param workerHost When workerHost is defined, it means the worker has been decommissioned too. + * Used to infer if the shuffle data might be lost even if the external shuffle + * service is enabled. + */ +case class StandaloneDecommission(workerHost: Option[String] = None) + extends ExecutorDecommissionReason { + override val reason: String = if (workerHost.isDefined) { + s"Worker ${workerHost.get} decommissioned" + } else { + "decommissioned" + } +} + +/** + * For test only. + */ +case class TestExecutorDecommission(host: Option[String] = None) + extends ExecutorDecommissionReason { + override val reason: String = if (host.isDefined) { + s"Host ${host.get} decommissioned(test)" + } else { + "decommissioned(test)" + } +} + +/** + * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived + * from the ExecutorDecommissionReason above but it is kept distinct to allow the state to evolve + * independently from the message. + */ +case class ExecutorDecommissionState( + // Timestamp the decommissioning commenced as per the Driver's clock, + // to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL + // is configured. + startTime: Long, + reason: ExecutorDecommissionReason) { + + def isHostDecommissioned: Boolean = reason match { + case StandaloneDecommission(workerHost) => workerHost.isDefined + case _ => false + } + + def host: Option[String] = reason match { + case StandaloneDecommission(workerHost) => workerHost + case _ => None + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index f2eb4a7047b56..70fc49ed29208 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -71,7 +71,8 @@ case class ExecutorProcessLost( * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. * - * @param workerHost it is defined when the worker is decommissioned too + * @param reason the reason why the executor is decommissioned + * @param host it is defined when the host where the executor located is decommissioned too */ -private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) - extends ExecutorLossReason("Executor decommission.") +private [spark] case class ExecutorDecommission(reason: String, host: Option[String] = None) + extends ExecutorLossReason(reason) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 0fa80bbafdedd..cb29b3a9f6d01 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -101,7 +101,7 @@ private[spark] trait TaskScheduler { /** * Process a decommissioning executor. */ - def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit + def executorDecommission(executorId: String, reason: ExecutorDecommissionReason): Unit /** * If an executor is decommissioned, return its corresponding decommission info diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2fcf13d5268f8..bdb458f3be79e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -906,12 +906,12 @@ private[spark] class TaskSchedulerImpl( } override def executorDecommission( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { + executorId: String, reason: ExecutorDecommissionReason): Unit = { synchronized { // Don't bother noting decommissioning for executors that we don't know about if (executorIdToHost.contains(executorId)) { executorsPendingDecommission(executorId) = - ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.workerHost) + ExecutorDecommissionState(clock.getTimeMillis(), reason) } } rootPool.executorDecommission(executorId) @@ -970,6 +970,9 @@ private[spark] class TaskSchedulerImpl( logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.") case ExecutorKilled => logInfo(s"Executor $executorId on $hostPort killed by driver.") + case ExecutorDecommission(reason, _) => + // use logInfo instead of logError as the loss of decommissioned executor is what we expect + logInfo(s"Decommissioned executor $executorId on $hostPort shutdown: $reason") case _ => logError(s"Lost executor $executorId on $hostPort: $reason") } @@ -1055,7 +1058,7 @@ private[spark] class TaskSchedulerImpl( // exposed for test protected final def isHostDecommissioned(host: String): Boolean = { hostToExecutors.get(host).exists { executors => - executors.exists(e => getExecutorDecommissionState(e).exists(_.workerHost.isDefined)) + executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 78fd412ef154c..9e0c6651dbc66 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -991,7 +991,7 @@ private[spark] class TaskSetManager( for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { val exitCausedByApp: Boolean = reason match { case exited: ExecutorExited => exited.exitCausedByApp - case ExecutorKilled | ExecutorDecommission(_) => false + case ExecutorKilled | ExecutorDecommission(_, _) => false case ExecutorProcessLost(_, _, false) => false case _ => true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index d1b0e798c51be..e162ce5cd8dc0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -22,7 +22,6 @@ import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.scheduler.ExecutorLossReason import org.apache.spark.util.SerializableBuffer diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f6930da96a390..c879d8c018f97 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,8 +92,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. private val executorsPendingLossReason = new HashSet[String] - // Executors which are being decommissioned. Maps from executorId to workerHost. - protected val executorsPendingDecommission = new HashMap[String, Option[String]] + // Executors which are being decommissioned. + protected val executorsPendingDecommission = new HashSet[String] // A map of ResourceProfile id to map of hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") @@ -273,9 +273,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply( decommissionExecutor( executorId, - ExecutorDecommissionInfo(s"Executor $executorId is decommissioned."), - adjustTargetNumExecutors = false, - triggeredByExecutor = true)) + new ExecutorTriggeredDecommission(), + adjustTargetNumExecutors = false)) case RetrieveSparkAppConfig(resourceProfileId) => val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId) @@ -390,16 +389,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case Some(executorInfo) => // This must be synchronized because variables mutated // in this block are read when requesting executors - val lossReason = CoarseGrainedSchedulerBackend.this.synchronized { + val lossReason = withLock { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) - val workerHostOpt = executorsPendingDecommission.remove(executorId) + val decommissioned = executorsPendingDecommission.remove(executorId) if (killedByDriver) { ExecutorKilled - } else if (workerHostOpt.isDefined) { - ExecutorDecommission(workerHostOpt.get) + } else if (decommissioned) { + val decomState = scheduler.getExecutorDecommissionState(executorId) + assert(decomState.isDefined, s"DecommissionState is missing of executor $executorId") + ExecutorDecommission(decomState.get.reason.toString, decomState.get.host) } else { reason } @@ -460,25 +461,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Request that the cluster manager decommission the specified executors. * - * @param executorsAndDecomInfo Identifiers of executors & decommission info. + * @param executorsAndDecomReason Identifiers of executors & decommission reason. * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down * after these executors have been decommissioned. - * @param triggeredByExecutor whether the decommission is triggered at executor. * @return the ids of the executors acknowledged by the cluster manager to be removed. */ override def decommissionExecutors( - executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], - adjustTargetNumExecutors: Boolean, - triggeredByExecutor: Boolean): Seq[String] = withLock { - val executorsToDecommission = executorsAndDecomInfo.flatMap { case (executorId, decomInfo) => - // Only bother decommissioning executors which are alive. - if (isExecutorActive(executorId)) { - scheduler.executorDecommission(executorId, decomInfo) - executorsPendingDecommission(executorId) = decomInfo.workerHost - Some(executorId) - } else { - None - } + executorsAndDecomReason: Array[(String, ExecutorDecommissionReason)], + adjustTargetNumExecutors: Boolean): Seq[String] = withLock { + val executorsToDecommission = executorsAndDecomReason.flatMap { + case (executorId, decomReason) => + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + scheduler.executorDecommission(executorId, decomReason) + executorsPendingDecommission.add(executorId) + Some(executorId) + } else { + None + } } // If we don't want to replace the executors we are decommissioning @@ -494,10 +494,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission) } - if (!triggeredByExecutor) { - executorsToDecommission.foreach { executorId => - logInfo(s"Asking executor $executorId to decommissioning.") - executorDataMap(executorId).executorEndpoint.send(DecommissionExecutor) + // the caller should ensure that there's only one distinct decommission reason + executorsAndDecomReason.headOption.foreach { case (_, reason) => + reason match { + case _: ExecutorTriggeredDecommission => + // Driver doesn't need to notify the executor since decommission notification + // already comes from the executor + case _ => + executorsToDecommission.foreach { executorId => + logInfo(s"Asking executor $executorId to decommissioning.") + executorDataMap(executorId).executorEndpoint.send(DecommissionExecutor) + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index b9ac8d2ba2784..09fedfb5ec9c3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -177,14 +177,11 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } - override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { + override def executorDecommissioned(fullId: String, decomReason: ExecutorDecommissionReason) { logInfo(s"Asked to decommission executor $fullId") val execId = fullId.split("/")(1) - decommissionExecutors( - Array((execId, decommissionInfo)), - adjustTargetNumExecutors = false, - triggeredByExecutor = false) - logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) + decommissionExecutors(Array((execId, decomReason)), adjustTargetNumExecutors = false) + logInfo("Executor %s decommissioned: %s".format(fullId, decomReason)) } override def workerRemoved(workerId: String, host: String, message: String): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index a3438cab5b0a3..6993f66f98480 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, Master} import org.apache.spark.deploy.worker.Worker import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv -import org.apache.spark.scheduler.ExecutorDecommissionInfo +import org.apache.spark.scheduler.{ExecutorDecommissionReason, StandaloneDecommission} import org.apache.spark.util.Utils /** @@ -131,8 +131,9 @@ class AppClientSuite eventually(timeout(1.seconds), interval(10.millis)) { // We only record decommissioning for the executor we've requested assert(ci.listener.execDecommissionedMap.size === 1) - val decommissionInfo = ci.listener.execDecommissionedMap.get(executorId) - assert(decommissionInfo != null && decommissionInfo.workerHost.isDefined, + val decomReason = ci.listener.execDecommissionedMap.get(executorId) + .asInstanceOf[StandaloneDecommission] + assert(decomReason != null && decomReason.workerHost.isDefined, s"$executorId should have been decommissioned along with its worker") } @@ -222,7 +223,7 @@ class AppClientSuite val deadReasonList = new ConcurrentLinkedQueue[String]() val execAddedList = new ConcurrentLinkedQueue[String]() val execRemovedList = new ConcurrentLinkedQueue[String]() - val execDecommissionedMap = new ConcurrentHashMap[String, ExecutorDecommissionInfo]() + val execDecommissionedMap = new ConcurrentHashMap[String, ExecutorDecommissionReason]() def connected(id: String): Unit = { connectedIdList.add(id) @@ -252,9 +253,10 @@ class AppClientSuite execRemovedList.add(id) } - def executorDecommissioned(id: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { - val previousDecommissionInfo = execDecommissionedMap.putIfAbsent(id, decommissionInfo) - assert(previousDecommissionInfo === null, s"Expected no previous decommission info for $id") + def executorDecommissioned(id: String, decomReason: ExecutorDecommissionReason): Unit = { + val previousDecomReason = execDecommissionedMap.putIfAbsent(id, decomReason) + assert(previousDecomReason === null, + s"Expected no previous decommission reason for executor $id") } def workerRemoved(workerId: String, host: String, message: String): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 99be1faab8b85..4fe0b7759d19b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -177,7 +177,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti override def applicationAttemptId(): Option[String] = None override def executorDecommission( executorId: String, - decommissionInfo: ExecutorDecommissionInfo): Unit = {} + reason: ExecutorDecommissionReason): Unit = {} override def getExecutorDecommissionState( executorId: String): Option[ExecutorDecommissionState] = None } @@ -788,7 +788,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti override def applicationAttemptId(): Option[String] = None override def executorDecommission( executorId: String, - decommissionInfo: ExecutorDecommissionInfo): Unit = {} + reason: ExecutorDecommissionReason): Unit = {} override def getExecutorDecommissionState( executorId: String): Option[ExecutorDecommissionState] = None } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 08191d09a9f2d..ca4432f002381 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -100,7 +100,7 @@ private class DummyTaskScheduler extends TaskScheduler { executorMetrics: Map[(Int, Int), ExecutorMetrics]): Boolean = true override def executorDecommission( executorId: String, - decommissionInfo: ExecutorDecommissionInfo): Unit = {} + reason: ExecutorDecommissionReason): Unit = {} override def getExecutorDecommissionState( executorId: String): Option[ExecutorDecommissionState] = None } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f29eb70eb3628..34925a4a3c281 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -158,8 +158,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B .exists(s => s.contains(exec0) && s.contains(exec1))) assert(scheduler.getExecutorsAliveOnHost(host1).exists(_.contains(exec2))) - scheduler.executorDecommission(exec1, ExecutorDecommissionInfo("test", None)) - scheduler.executorDecommission(exec2, ExecutorDecommissionInfo("test", Some(host1))) + scheduler.executorDecommission(exec1, TestExecutorDecommission()) + scheduler.executorDecommission(exec2, TestExecutorDecommission(Some(host1))) assert(scheduler.isExecutorAlive(exec0)) assert(!Seq(exec1, exec2).exists(scheduler.isExecutorAlive)) @@ -1865,13 +1865,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val clock = new ManualClock(10000L) val scheduler = setupSchedulerForDecommissionTests(clock, 2) val decomTime = clock.getTimeMillis() - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", None)) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", Some("host1"))) + scheduler.executorDecommission("executor0", TestExecutorDecommission()) + scheduler.executorDecommission("executor1", TestExecutorDecommission(Some("host1"))) assert(scheduler.getExecutorDecommissionState("executor0") - === Some(ExecutorDecommissionState(decomTime, None))) + === Some(ExecutorDecommissionState(decomTime, TestExecutorDecommission()))) assert(scheduler.getExecutorDecommissionState("executor1") - === Some(ExecutorDecommissionState(decomTime, Some("host1")))) + === Some(ExecutorDecommissionState(decomTime, TestExecutorDecommission(Some("host1"))))) assert(scheduler.getExecutorDecommissionState("executor2").isEmpty) } @@ -1886,7 +1886,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) scheduler.executorLost("executor0", ExecutorExited(0, false, "normal")) assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", None)) + scheduler.executorDecommission("executor0", TestExecutorDecommission()) assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) // 0th task just died above @@ -1899,7 +1899,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionState("executor1").isEmpty) // executor 1 is decommissioned before loosing - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None)) + scheduler.executorDecommission("executor1", TestExecutorDecommission()) assert(scheduler.getExecutorDecommissionState("executor1").isDefined) clock.advance(2000) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c389fd2ffa8b1..8b2ba0d4d6ef6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -681,8 +681,8 @@ class TaskSetManagerSuite assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY)) // Decommission all executors on host0, to mimic CoarseGrainedSchedulerBackend. - sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", Some(host0))) - sched.executorDecommission(exec1, ExecutorDecommissionInfo("test", Some(host0))) + sched.executorDecommission(exec0, TestExecutorDecommission(Some(host0))) + sched.executorDecommission(exec1, TestExecutorDecommission(Some(host0))) assert(manager.myLocalityLevels === Array(ANY)) } @@ -707,7 +707,7 @@ class TaskSetManagerSuite assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY)) // Decommission the only executor (without the host) that the task is interested in running on. - sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", None)) + sched.executorDecommission(exec0, TestExecutorDecommission()) assert(manager.myLocalityLevels === Array(NODE_LOCAL, ANY)) } @@ -2029,7 +2029,7 @@ class TaskSetManagerSuite // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be now // checked if they should be speculated. // (TASK 2 -> 15, TASK 3 -> 15) - sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom", None)) + sched.executorDecommission("exec2", TestExecutorDecommission()) assert(sched.getExecutorDecommissionState("exec2").map(_.startTime) === Some(clock.getTimeMillis())) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index 129eb8bf91051..f8f1d5f3a8b53 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -64,7 +64,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] sc.getExecutorIds().tail.foreach { id => - sched.decommissionExecutor(id, ExecutorDecommissionInfo("", None), + sched.decommissionExecutor(id, StandaloneDecommission(), adjustTargetNumExecutors = false) assert(rdd3.sortByKey().collect().length === 100) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 4a92cbcb85847..7bd0161573dd1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -77,11 +77,11 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() // Make the executors decommission, finish, exit, and not be replaced. - val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", None))).toArray + val execsAndDecomReason = execs.map((_, StandaloneDecommission())) + .toArray[(String, ExecutorDecommissionReason)] sched.decommissionExecutors( - execsAndDecomInfo, - adjustTargetNumExecutors = true, - triggeredByExecutor = false) + execsAndDecomReason, + adjustTargetNumExecutors = true) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 094b893cdda2e..512660f498089 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -192,7 +192,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", None), + TestExecutorDecommission(), adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 5655ef50d214f..bd1abdde7bc4b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -22,15 +22,16 @@ import scala.concurrent.Future import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO import org.apache.spark.resource.ResourceProfile -import org.apache.spark.rpc.RpcAddress -import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl} +import org.apache.spark.rpc.{RpcAddress, RpcCallContext} +import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, K8SDecommission, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.ExecutorDecommissioning import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( @@ -191,6 +192,18 @@ private[spark] class KubernetesClusterSchedulerBackend( private class KubernetesDriverEndpoint extends DriverEndpoint { + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case ExecutorDecommissioning(executorId) => + context.reply( + decommissionExecutor( + executorId, + K8SDecommission(), + adjustTargetNumExecutors = false)) + case message => super.receiveAndReply(context).applyOrElse(message, + throw new SparkException(s"Unsupported message $message") + ) + } + override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to // drive the rest of the lifecycle decisions diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 1037950a4424f..4652f2fa64345 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -25,7 +25,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.Streaming._ import org.apache.spark.resource.ResourceProfile -import org.apache.spark.scheduler.ExecutorDecommissionInfo +import org.apache.spark.scheduler.DynamicAllocationDecommission import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} @@ -137,7 +137,7 @@ private[streaming] class ExecutorAllocationManager( val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) if (conf.get(DECOMMISSION_ENABLED)) { client.decommissionExecutor(execIdToRemove, - ExecutorDecommissionInfo("spark scale down", None), + DynamicAllocationDecommission(), adjustTargetNumExecutors = true) } else { client.killExecutor(execIdToRemove) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 293498ae5c37b..c2e7fd49f3a17 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.config.{DECOMMISSION_ENABLED, DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.internal.config.Streaming._ import org.apache.spark.resource.ResourceProfile -import org.apache.spark.scheduler.ExecutorDecommissionInfo +import org.apache.spark.scheduler.DynamicAllocationDecommission import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase} import org.apache.spark.util.{ManualClock, Utils} @@ -98,15 +98,15 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase /** Verify that a particular executor was scaled down. */ def verifyScaledDownExec(expectedExec: Option[String]): Unit = { if (expectedExec.nonEmpty) { - val decomInfo = ExecutorDecommissionInfo("spark scale down", None) + val decomReason = DynamicAllocationDecommission() if (decommissioning) { verify(allocationClient, times(1)).decommissionExecutor( - meq(expectedExec.get), meq(decomInfo), meq(true), any()) + meq(expectedExec.get), meq(decomReason), meq(true)) verify(allocationClient, never).killExecutor(meq(expectedExec.get)) } else { verify(allocationClient, times(1)).killExecutor(meq(expectedExec.get)) verify(allocationClient, never).decommissionExecutor( - meq(expectedExec.get), meq(decomInfo), meq(true), any()) + meq(expectedExec.get), meq(decomReason), meq(true)) } } else { if (decommissioning) {