diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index cdba1c44034c0..ce47f3fd32203 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -91,13 +91,11 @@ private[spark] trait ExecutorAllocationClient { * @param executorsAndDecomInfo identifiers of executors & decom info. * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down * after these executors have been decommissioned. - * @param triggeredByExecutor whether the decommission is triggered at executor. * @return the ids of the executors acknowledged by the cluster manager to be removed. */ def decommissionExecutors( - executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], - adjustTargetNumExecutors: Boolean, - triggeredByExecutor: Boolean): Seq[String] = { + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { killExecutors(executorsAndDecomInfo.map(_._1), adjustTargetNumExecutors, countFailures = false) @@ -111,21 +109,14 @@ private[spark] trait ExecutorAllocationClient { * @param executorId identifiers of executor to decommission * @param decommissionInfo information about the decommission (reason, host loss) * @param adjustTargetNumExecutors if we should adjust the target number of executors. - * @param triggeredByExecutor whether the decommission is triggered at executor. - * (TODO: add a new type like `ExecutorDecommissionInfo` for the - * case where executor is decommissioned at executor first, so we - * don't need this extra parameter.) * @return whether the request is acknowledged by the cluster manager. */ - final def decommissionExecutor( - executorId: String, + final def decommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo, - adjustTargetNumExecutors: Boolean, - triggeredByExecutor: Boolean = false): Boolean = { + adjustTargetNumExecutors: Boolean): Boolean = { val decommissionedExecutors = decommissionExecutors( Array((executorId, decommissionInfo)), - adjustTargetNumExecutors = adjustTargetNumExecutors, - triggeredByExecutor = triggeredByExecutor) + adjustTargetNumExecutors = adjustTargetNumExecutors) decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) } diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1dd64df106bc2..596508a2cf8c8 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -580,10 +580,7 @@ private[spark] class ExecutorAllocationManager( if (decommissionEnabled) { val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray - client.decommissionExecutors( - executorIdsWithoutHostLoss, - adjustTargetNumExecutors = false, - triggeredByExecutor = false) + client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) } else { client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, countFailures = false, force = false) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 8bc909b096e71..83f373d526e90 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -61,34 +61,13 @@ private[deploy] object DeployMessages { } /** - * An internal message that used by Master itself, in order to handle the - * `DecommissionWorkersOnHosts` request from `MasterWebUI` asynchronously. - * @param ids A collection of Worker ids, which should be decommissioned. - */ - case class DecommissionWorkers(ids: Seq[String]) extends DeployMessage - - /** - * A message that sent from Master to Worker to decommission the Worker. - * It's used for the case where decommission is triggered at MasterWebUI. - * - * Note that decommission a Worker will cause all the executors on that Worker - * to be decommissioned as well. - */ - object DecommissionWorker extends DeployMessage - - /** - * A message that sent to the Worker itself when it receives PWR signal, - * indicating the Worker starts to decommission. - */ - object WorkerSigPWRReceived extends DeployMessage - - /** - * A message sent from Worker to Master to tell Master that the Worker has started - * decommissioning. It's used for the case where decommission is triggered at Worker. - * * @param id the worker id + * @param worker the worker endpoint ref */ - case class WorkerDecommissioning(id: String, workerRef: RpcEndpointRef) extends DeployMessage + case class WorkerDecommission( + id: String, + worker: RpcEndpointRef) + extends DeployMessage case class ExecutorStateChanged( appId: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 15f8be69d97bd..48516cdf83291 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -245,27 +245,15 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) - case WorkerDecommissioning(id, workerRef) => + case WorkerDecommission(id, workerRef) => + logInfo("Recording worker %s decommissioning".format(id)) if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else { // We use foreach since get gives us an option and we can skip the failures. - idToWorker.get(id).foreach(w => decommissionWorker(w)) + idToWorker.get(id).foreach(decommissionWorker) } - case DecommissionWorkers(ids) => - // The caller has already checked the state when handling DecommissionWorkersOnHosts, - // so it should not be the STANDBY - assert(state != RecoveryState.STANDBY) - ids.foreach ( id => - // We use foreach since get gives us an option and we can skip the failures. - idToWorker.get(id).foreach { w => - decommissionWorker(w) - // Also send a message to the worker node to notify. - w.endpoint.send(DecommissionWorker) - } - ) - case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress, resources) => @@ -903,7 +891,10 @@ private[deploy] class Master( logInfo(s"Decommissioning the workers with host:ports ${workersToRemoveHostPorts}") // The workers are removed async to avoid blocking the receive loop for the entire batch - self.send(DecommissionWorkers(workersToRemove.map(_.id).toSeq)) + workersToRemove.foreach(wi => { + logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}") + self.send(WorkerDecommission(wi.id, wi.endpoint)) + }) // Return the count of workers actually removed workersToRemove.size diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 2e8474e3e3fc2..7649bc37c30b6 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -70,10 +70,7 @@ private[deploy] class Worker( if (conf.get(config.DECOMMISSION_ENABLED)) { logInfo("Registering SIGPWR handler to trigger decommissioning.") SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + - "disabling worker decommission feature.") { - self.send(WorkerSigPWRReceived) - true - } + "disabling worker decommission feature.")(decommissionSelf) } else { logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.") } @@ -140,8 +137,7 @@ private[deploy] class Worker( private var registered = false private var connected = false private var decommissioned = false - // expose for test - private[spark] val workerId = generateWorkerId() + private val workerId = generateWorkerId() private val sparkHome = if (sys.props.contains(IS_TESTING.key)) { assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!") @@ -672,13 +668,8 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case DecommissionWorker => - decommissionSelf() - - case WorkerSigPWRReceived => + case WorkerDecommission(_, _) => decommissionSelf() - // Tell master we starts decommissioning so it stops trying to launch executor/driver on us - sendToMaster(WorkerDecommissioning(workerId, self)) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -777,15 +768,16 @@ private[deploy] class Worker( } } - private[deploy] def decommissionSelf(): Unit = { - if (conf.get(config.DECOMMISSION_ENABLED) && !decommissioned) { + private[deploy] def decommissionSelf(): Boolean = { + if (conf.get(config.DECOMMISSION_ENABLED)) { + logDebug("Decommissioning self") decommissioned = true - logInfo(s"Decommission worker $workerId.") - } else if (decommissioned) { - logWarning(s"Worker $workerId already started decommissioning.") + sendToMaster(WorkerDecommission(workerId, self)) } else { - logWarning(s"Receive decommission request, but decommission feature is disabled.") + logWarning("Asked to decommission self, but decommissioning not enabled") } + // Return true since can be called as a signal handler + true } private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index d002f7b407e5e..48045bafe6e3f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -40,7 +40,7 @@ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ -import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} +import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils} @@ -79,17 +79,12 @@ private[spark] class CoarseGrainedExecutorBackend( */ private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] - private var decommissioned = false + @volatile private var decommissioned = false override def onStart(): Unit = { - if (env.conf.get(DECOMMISSION_ENABLED)) { - logInfo("Registering PWR handler to trigger decommissioning.") - SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + - "disabling executor decommission feature.") { - self.send(ExecutorSigPWRReceived) - true - } - } + logInfo("Registering PWR handler.") + SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + + "disabling decommission feature.")(decommissionSelf) logInfo("Connecting to driver: " + driverUrl) try { @@ -171,6 +166,17 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { + if (decommissioned) { + val msg = "Asked to launch a task while decommissioned." + logError(msg) + driver match { + case Some(endpoint) => + logInfo("Sending DecommissionExecutor to driver.") + endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg))) + case _ => + logError("No registered driver to send Decommission to.") + } + } val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) taskResources(taskDesc.taskId) = taskDesc.resources @@ -207,17 +213,9 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) - case DecommissionExecutor => + case DecommissionSelf => + logInfo("Received decommission self") decommissionSelf() - - case ExecutorSigPWRReceived => - decommissionSelf() - if (driver.nonEmpty) { - // Tell driver we starts decommissioning so it stops trying to schedule us - driver.get.askSync[Boolean](ExecutorDecommissioning(executorId)) - } else { - logError("No driver to message decommissioning.") - } } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -266,20 +264,17 @@ private[spark] class CoarseGrainedExecutorBackend( System.exit(code) } - private def decommissionSelf(): Unit = { - if (!env.conf.get(DECOMMISSION_ENABLED)) { - logWarning(s"Receive decommission request, but decommission feature is disabled.") - return - } else if (decommissioned) { - logWarning(s"Executor $executorId already started decommissioning.") - return - } - val msg = s"Decommission executor $executorId." + private def decommissionSelf(): Boolean = { + val msg = "Decommissioning self w/sync" logInfo(msg) try { decommissioned = true - if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { - env.blockManager.decommissionBlockManager() + // Tell master we are are decommissioned so it stops trying to schedule us + if (driver.nonEmpty) { + driver.get.askSync[Boolean](DecommissionExecutor( + executorId, ExecutorDecommissionInfo(msg))) + } else { + logError("No driver to message decommissioning.") } if (executor != null) { executor.decommission() @@ -338,9 +333,12 @@ private[spark] class CoarseGrainedExecutorBackend( shutdownThread.start() logInfo("Will exit when finished decommissioning") + // Return true since we are handling a signal + true } catch { case e: Exception => logError("Unexpected error while decommissioning self", e) + false } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index d1b0e798c51be..7242ab7786061 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -95,17 +95,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage - // A message that sent from executor to driver to tell driver that the executor has started - // decommissioning. It's used for the case where decommission is triggered at executor (e.g., K8S) - case class ExecutorDecommissioning(executorId: String) extends CoarseGrainedClusterMessage - - // A message that sent from driver to executor to decommission that executor. - // It's used for Standalone's cases, where decommission is triggered at MasterWebUI or Worker. - object DecommissionExecutor extends CoarseGrainedClusterMessage - - // A message that sent to the executor itself when it receives PWR signal, - // indicating the executor starts to decommission. - object ExecutorSigPWRReceived extends CoarseGrainedClusterMessage + case class DecommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo) + extends CoarseGrainedClusterMessage case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage @@ -145,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages { // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not. case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage + + // Used to ask an executor to decommission itself. (Can be an internal message) + case object DecommissionSelf extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f6930da96a390..0f144125af7bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -191,6 +191,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) + case DecommissionExecutor(executorId, decommissionInfo) => + logError(s"Received decommission executor message ${executorId}: $decommissionInfo") + decommissionExecutor(executorId, decommissionInfo, adjustTargetNumExecutors = false) + case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -268,14 +272,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeWorker(workerId, host, message) context.reply(true) - case ExecutorDecommissioning(executorId) => - logWarning(s"Received executor $executorId decommissioned message") - context.reply( - decommissionExecutor( - executorId, - ExecutorDecommissionInfo(s"Executor $executorId is decommissioned."), - adjustTargetNumExecutors = false, - triggeredByExecutor = true)) + case DecommissionExecutor(executorId, decommissionInfo) => + logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") + context.reply(decommissionExecutor(executorId, decommissionInfo, + adjustTargetNumExecutors = false)) case RetrieveSparkAppConfig(resourceProfileId) => val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId) @@ -463,47 +463,71 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @param executorsAndDecomInfo Identifiers of executors & decommission info. * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down * after these executors have been decommissioned. - * @param triggeredByExecutor whether the decommission is triggered at executor. * @return the ids of the executors acknowledged by the cluster manager to be removed. */ override def decommissionExecutors( executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], - adjustTargetNumExecutors: Boolean, - triggeredByExecutor: Boolean): Seq[String] = withLock { - val executorsToDecommission = executorsAndDecomInfo.flatMap { case (executorId, decomInfo) => - // Only bother decommissioning executors which are alive. - if (isExecutorActive(executorId)) { - scheduler.executorDecommission(executorId, decomInfo) - executorsPendingDecommission(executorId) = decomInfo.workerHost - Some(executorId) - } else { - None + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission(executorId) = decomInfo.workerHost + true + } else { + false + } } } // If we don't want to replace the executors we are decommissioning if (adjustTargetNumExecutors) { - adjustExecutors(executorsToDecommission) + adjustExecutors(executorsToDecommission.map(_._1)) } - // Mark those corresponding BlockManagers as decommissioned first before we sending - // decommission notification to executors. So, it's less likely to lead to the race - // condition where `getPeer` request from the decommissioned executor comes first - // before the BlockManagers are marked as decommissioned. - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission) + executorsToDecommission.filter { case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + } + + + private def doDecommission(executorId: String, + decomInfo: ExecutorDecommissionInfo): Boolean = { + + logInfo(s"Asking executor $executorId to decommissioning.") + scheduler.executorDecommission(executorId, decomInfo) + // Send decommission message to the executor (it could have originated on the executor + // but not necessarily). + CoarseGrainedSchedulerBackend.this.synchronized { + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + return false + } } + logInfo(s"Asked executor $executorId to decommission.") - if (!triggeredByExecutor) { - executorsToDecommission.foreach { executorId => - logInfo(s"Asking executor $executorId to decommissioning.") - executorDataMap(executorId).executorEndpoint.send(DecommissionExecutor) + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo(s"Asking block manager corresponding to executor $executorId to decommission.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + return false } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") } - executorsToDecommission + true } + override def start(): Unit = { if (UserGroupInformation.isSecurityEnabled()) { delegationTokenManager = createTokenManager() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index b9ac8d2ba2784..34b03dfec9e80 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -178,12 +178,9 @@ private[spark] class StandaloneSchedulerBackend( } override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { - logInfo(s"Asked to decommission executor $fullId") + logInfo("Asked to decommission executor") val execId = fullId.split("/")(1) - decommissionExecutors( - Array((execId, decommissionInfo)), - adjustTargetNumExecutors = false, - triggeredByExecutor = false) + decommissionExecutors(Array((execId, decommissionInfo)), adjustTargetNumExecutors = false) logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e1b4cb82cebf1..ff0f38a2479b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -56,7 +56,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} -import org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager, ReplicateBlock} +import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.storage.memory._ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ @@ -1809,9 +1809,7 @@ private[spark] class BlockManager( blocksToRemove.size } - def decommissionBlockManager(): Unit = storageEndpoint.ask(DecommissionBlockManager) - - private[spark] def decommissionSelf(): Unit = synchronized { + def decommissionBlockManager(): Unit = synchronized { decommissioner match { case None => logInfo("Starting block manager decommissioning process...") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 3fcfca365846e..a3d42348befaa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -163,7 +163,8 @@ class BlockManagerMasterEndpoint( context.reply(true) case DecommissionBlockManagers(executorIds) => - decommissioningBlockManagerSet ++= executorIds.flatMap(blockManagerIdByExecutor.get) + val bmIds = executorIds.flatMap(blockManagerIdByExecutor.get) + decommissionBlockManagers(bmIds) context.reply(true) case GetReplicateInfoForRDDBlocks(blockManagerId) => @@ -358,6 +359,21 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + /** + * Decommission the given Seq of blockmanagers + * - Adds these block managers to decommissioningBlockManagerSet Set + * - Sends the DecommissionBlockManager message to each of the [[BlockManagerReplicaEndpoint]] + */ + def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = { + val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet) + val futures = newBlockManagersToDecommission.map { blockManagerId => + decommissioningBlockManagerSet.add(blockManagerId) + val info = blockManagerInfo(blockManagerId) + info.storageEndpoint.ask[Unit](DecommissionBlockManager) + } + Future.sequence{ futures.toSeq } + } + /** * Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId * @param blockManagerId - block manager id for which ReplicateBlock info is needed diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 54a72568b18fa..a69bebc23c661 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -62,7 +62,7 @@ class BlockManagerStorageEndpoint( } case DecommissionBlockManager => - context.reply(blockManager.decommissionSelf()) + context.reply(blockManager.decommissionBlockManager()) case RemoveBroadcast(broadcastId, _) => doAsync[Int]("removing broadcast " + broadcastId, context) { diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index abe5b7a71ca63..9c5e460854053 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterEach import org.scalatest.concurrent.Eventually._ import org.apache.spark._ -import org.apache.spark.deploy.DeployMessages.{DecommissionWorkers, MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState, WorkerDecommission} import org.apache.spark.deploy.master.{ApplicationInfo, Master, WorkerInfo} import org.apache.spark.deploy.worker.Worker import org.apache.spark.internal.{config, Logging} @@ -414,7 +414,7 @@ class DecommissionWorkerSuite def decommissionWorkerOnMaster(workerInfo: WorkerInfo, reason: String): Unit = { logInfo(s"Trying to decommission worker ${workerInfo.id} for reason `$reason`") - master.self.send(DecommissionWorkers(Seq(workerInfo.id))) + master.self.send(WorkerDecommission(workerInfo.id, workerInfo.endpoint)) } def killWorkerAfterTimeout(workerInfo: WorkerInfo, secondsToWait: Int): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index a3438cab5b0a3..fe88822bb46b5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.apache.spark._ import org.apache.spark.deploy.{ApplicationDescription, Command} -import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState, WorkerDecommissioning} +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, Master} import org.apache.spark.deploy.worker.Worker import org.apache.spark.internal.{config, Logging} @@ -122,10 +122,7 @@ class AppClientSuite // Send a decommission self to all the workers // Note: normally the worker would send this on their own. - workers.foreach { worker => - worker.decommissionSelf() - master.self.send(WorkerDecommissioning(worker.workerId, worker.self)) - } + workers.foreach(worker => worker.decommissionSelf()) // Decommissioning is async. eventually(timeout(1.seconds), interval(10.millis)) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 4a92cbcb85847..83bb66efdac9e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { override def beforeEach(): Unit = { - val conf = new SparkConf().setAppName("test") + val conf = new SparkConf().setAppName("test").setMaster("local") .set(config.DECOMMISSION_ENABLED, true) sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) @@ -78,10 +78,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { val execs = sched.getExecutorIds() // Make the executors decommission, finish, exit, and not be replaced. val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", None))).toArray - sched.decommissionExecutors( - execsAndDecomInfo, - adjustTargetNumExecutors = true, - triggeredByExecutor = false) + sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 293498ae5c37b..f1870718c6730 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.scheduler -import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.ArgumentMatchers.{eq => meq} import org.mockito.Mockito.{never, reset, times, verify, when} import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} import org.scalatest.concurrent.Eventually.{eventually, timeout} @@ -101,12 +101,12 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase val decomInfo = ExecutorDecommissionInfo("spark scale down", None) if (decommissioning) { verify(allocationClient, times(1)).decommissionExecutor( - meq(expectedExec.get), meq(decomInfo), meq(true), any()) + meq(expectedExec.get), meq(decomInfo), meq(true)) verify(allocationClient, never).killExecutor(meq(expectedExec.get)) } else { verify(allocationClient, times(1)).killExecutor(meq(expectedExec.get)) verify(allocationClient, never).decommissionExecutor( - meq(expectedExec.get), meq(decomInfo), meq(true), any()) + meq(expectedExec.get), meq(decomInfo), meq(true)) } } else { if (decommissioning) {