Skip to content

Commit

Permalink
Revert "[SPARK-32850][CORE] Simplify the RPC message flow of decommis…
Browse files Browse the repository at this point in the history
…sion"

This reverts commit 56ae950.
  • Loading branch information
cloud-fan committed Sep 21, 2020
1 parent 9c653c9 commit 0c66813
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 177 deletions.
Expand Up @@ -91,13 +91,11 @@ private[spark] trait ExecutorAllocationClient {
* @param executorsAndDecomInfo identifiers of executors & decom info.
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been decommissioned.
* @param triggeredByExecutor whether the decommission is triggered at executor.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean): Seq[String] = {
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean): Seq[String] = {
killExecutors(executorsAndDecomInfo.map(_._1),
adjustTargetNumExecutors,
countFailures = false)
Expand All @@ -111,21 +109,14 @@ private[spark] trait ExecutorAllocationClient {
* @param executorId identifiers of executor to decommission
* @param decommissionInfo information about the decommission (reason, host loss)
* @param adjustTargetNumExecutors if we should adjust the target number of executors.
* @param triggeredByExecutor whether the decommission is triggered at executor.
* (TODO: add a new type like `ExecutorDecommissionInfo` for the
* case where executor is decommissioned at executor first, so we
* don't need this extra parameter.)
* @return whether the request is acknowledged by the cluster manager.
*/
final def decommissionExecutor(
executorId: String,
final def decommissionExecutor(executorId: String,
decommissionInfo: ExecutorDecommissionInfo,
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean = false): Boolean = {
adjustTargetNumExecutors: Boolean): Boolean = {
val decommissionedExecutors = decommissionExecutors(
Array((executorId, decommissionInfo)),
adjustTargetNumExecutors = adjustTargetNumExecutors,
triggeredByExecutor = triggeredByExecutor)
adjustTargetNumExecutors = adjustTargetNumExecutors)
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
}

Expand Down
Expand Up @@ -580,10 +580,7 @@ private[spark] class ExecutorAllocationManager(
if (decommissionEnabled) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
client.decommissionExecutors(
executorIdsWithoutHostLoss,
adjustTargetNumExecutors = false,
triggeredByExecutor = false)
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
Expand Down
31 changes: 5 additions & 26 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Expand Up @@ -61,34 +61,13 @@ private[deploy] object DeployMessages {
}

/**
* An internal message that used by Master itself, in order to handle the
* `DecommissionWorkersOnHosts` request from `MasterWebUI` asynchronously.
* @param ids A collection of Worker ids, which should be decommissioned.
*/
case class DecommissionWorkers(ids: Seq[String]) extends DeployMessage

/**
* A message that sent from Master to Worker to decommission the Worker.
* It's used for the case where decommission is triggered at MasterWebUI.
*
* Note that decommission a Worker will cause all the executors on that Worker
* to be decommissioned as well.
*/
object DecommissionWorker extends DeployMessage

/**
* A message that sent to the Worker itself when it receives PWR signal,
* indicating the Worker starts to decommission.
*/
object WorkerSigPWRReceived extends DeployMessage

/**
* A message sent from Worker to Master to tell Master that the Worker has started
* decommissioning. It's used for the case where decommission is triggered at Worker.
*
* @param id the worker id
* @param worker the worker endpoint ref
*/
case class WorkerDecommissioning(id: String, workerRef: RpcEndpointRef) extends DeployMessage
case class WorkerDecommission(
id: String,
worker: RpcEndpointRef)
extends DeployMessage

case class ExecutorStateChanged(
appId: String,
Expand Down
23 changes: 7 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Expand Up @@ -245,27 +245,15 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)

case WorkerDecommissioning(id, workerRef) =>
case WorkerDecommission(id, workerRef) =>
logInfo("Recording worker %s decommissioning".format(id))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else {
// We use foreach since get gives us an option and we can skip the failures.
idToWorker.get(id).foreach(w => decommissionWorker(w))
idToWorker.get(id).foreach(decommissionWorker)
}

case DecommissionWorkers(ids) =>
// The caller has already checked the state when handling DecommissionWorkersOnHosts,
// so it should not be the STANDBY
assert(state != RecoveryState.STANDBY)
ids.foreach ( id =>
// We use foreach since get gives us an option and we can skip the failures.
idToWorker.get(id).foreach { w =>
decommissionWorker(w)
// Also send a message to the worker node to notify.
w.endpoint.send(DecommissionWorker)
}
)

case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
masterAddress, resources) =>
Expand Down Expand Up @@ -903,7 +891,10 @@ private[deploy] class Master(
logInfo(s"Decommissioning the workers with host:ports ${workersToRemoveHostPorts}")

// The workers are removed async to avoid blocking the receive loop for the entire batch
self.send(DecommissionWorkers(workersToRemove.map(_.id).toSeq))
workersToRemove.foreach(wi => {
logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}")
self.send(WorkerDecommission(wi.id, wi.endpoint))
})

// Return the count of workers actually removed
workersToRemove.size
Expand Down
28 changes: 10 additions & 18 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Expand Up @@ -70,10 +70,7 @@ private[deploy] class Worker(
if (conf.get(config.DECOMMISSION_ENABLED)) {
logInfo("Registering SIGPWR handler to trigger decommissioning.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
"disabling worker decommission feature.") {
self.send(WorkerSigPWRReceived)
true
}
"disabling worker decommission feature.")(decommissionSelf)
} else {
logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
}
Expand Down Expand Up @@ -140,8 +137,7 @@ private[deploy] class Worker(
private var registered = false
private var connected = false
private var decommissioned = false
// expose for test
private[spark] val workerId = generateWorkerId()
private val workerId = generateWorkerId()
private val sparkHome =
if (sys.props.contains(IS_TESTING.key)) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
Expand Down Expand Up @@ -672,13 +668,8 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)

case DecommissionWorker =>
decommissionSelf()

case WorkerSigPWRReceived =>
case WorkerDecommission(_, _) =>
decommissionSelf()
// Tell master we starts decommissioning so it stops trying to launch executor/driver on us
sendToMaster(WorkerDecommissioning(workerId, self))
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -777,15 +768,16 @@ private[deploy] class Worker(
}
}

private[deploy] def decommissionSelf(): Unit = {
if (conf.get(config.DECOMMISSION_ENABLED) && !decommissioned) {
private[deploy] def decommissionSelf(): Boolean = {
if (conf.get(config.DECOMMISSION_ENABLED)) {
logDebug("Decommissioning self")
decommissioned = true
logInfo(s"Decommission worker $workerId.")
} else if (decommissioned) {
logWarning(s"Worker $workerId already started decommissioning.")
sendToMaster(WorkerDecommission(workerId, self))
} else {
logWarning(s"Receive decommission request, but decommission feature is disabled.")
logWarning("Asked to decommission self, but decommissioning not enabled")
}
// Return true since can be called as a signal handler
true
}

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
Expand Down
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
Expand Down Expand Up @@ -79,17 +79,12 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]

private var decommissioned = false
@volatile private var decommissioned = false

override def onStart(): Unit = {
if (env.conf.get(DECOMMISSION_ENABLED)) {
logInfo("Registering PWR handler to trigger decommissioning.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
"disabling executor decommission feature.") {
self.send(ExecutorSigPWRReceived)
true
}
}
logInfo("Registering PWR handler.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
"disabling decommission feature.")(decommissionSelf)

logInfo("Connecting to driver: " + driverUrl)
try {
Expand Down Expand Up @@ -171,6 +166,17 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
if (decommissioned) {
val msg = "Asked to launch a task while decommissioned."
logError(msg)
driver match {
case Some(endpoint) =>
logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg)))
case _ =>
logError("No registered driver to send Decommission to.")
}
}
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
Expand Down Expand Up @@ -207,17 +213,9 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)

case DecommissionExecutor =>
case DecommissionSelf =>
logInfo("Received decommission self")
decommissionSelf()

case ExecutorSigPWRReceived =>
decommissionSelf()
if (driver.nonEmpty) {
// Tell driver we starts decommissioning so it stops trying to schedule us
driver.get.askSync[Boolean](ExecutorDecommissioning(executorId))
} else {
logError("No driver to message decommissioning.")
}
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down Expand Up @@ -266,20 +264,17 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}

private def decommissionSelf(): Unit = {
if (!env.conf.get(DECOMMISSION_ENABLED)) {
logWarning(s"Receive decommission request, but decommission feature is disabled.")
return
} else if (decommissioned) {
logWarning(s"Executor $executorId already started decommissioning.")
return
}
val msg = s"Decommission executor $executorId."
private def decommissionSelf(): Boolean = {
val msg = "Decommissioning self w/sync"
logInfo(msg)
try {
decommissioned = true
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
env.blockManager.decommissionBlockManager()
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(
executorId, ExecutorDecommissionInfo(msg)))
} else {
logError("No driver to message decommissioning.")
}
if (executor != null) {
executor.decommission()
Expand Down Expand Up @@ -338,9 +333,12 @@ private[spark] class CoarseGrainedExecutorBackend(
shutdownThread.start()

logInfo("Will exit when finished decommissioning")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError("Unexpected error while decommissioning self", e)
false
}
}
}
Expand Down
Expand Up @@ -95,17 +95,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
extends CoarseGrainedClusterMessage

// A message that sent from executor to driver to tell driver that the executor has started
// decommissioning. It's used for the case where decommission is triggered at executor (e.g., K8S)
case class ExecutorDecommissioning(executorId: String) extends CoarseGrainedClusterMessage

// A message that sent from driver to executor to decommission that executor.
// It's used for Standalone's cases, where decommission is triggered at MasterWebUI or Worker.
object DecommissionExecutor extends CoarseGrainedClusterMessage

// A message that sent to the executor itself when it receives PWR signal,
// indicating the executor starts to decommission.
object ExecutorSigPWRReceived extends CoarseGrainedClusterMessage
case class DecommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo)
extends CoarseGrainedClusterMessage

case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage
Expand Down Expand Up @@ -145,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {

// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage

// Used to ask an executor to decommission itself. (Can be an internal message)
case object DecommissionSelf extends CoarseGrainedClusterMessage
}

0 comments on commit 0c66813

Please sign in to comment.