From d5bc7560f32d59861885c6f40ec2597d680e4612 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sat, 29 Aug 2020 23:03:21 +0800 Subject: [PATCH 01/18] impr --- .../spark/ExecutorAllocationManager.scala | 2 +- .../apache/spark/deploy/DeployMessage.scala | 2 +- .../deploy/client/StandaloneAppClient.scala | 6 +- .../client/StandaloneAppClientListener.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 8 +-- .../CoarseGrainedExecutorBackend.scala | 7 +- .../spark/internal/config/package.scala | 10 --- .../apache/spark/scheduler/DAGScheduler.scala | 17 ++--- .../scheduler/ExecutorDecommissionInfo.scala | 10 +-- .../spark/scheduler/ExecutorLossReason.scala | 9 ++- .../spark/scheduler/TaskSchedulerImpl.scala | 67 ++++--------------- .../spark/scheduler/TaskSetManager.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 28 ++++---- .../cluster/StandaloneSchedulerBackend.scala | 4 +- .../spark/deploy/client/AppClientSuite.scala | 4 +- .../spark/scheduler/DAGSchedulerSuite.scala | 6 +- .../scheduler/TaskSchedulerImplSuite.scala | 25 +++---- .../spark/scheduler/TaskSetManagerSuite.scala | 9 ++- .../WorkerDecommissionExtendedSuite.scala | 2 +- .../scheduler/WorkerDecommissionSuite.scala | 2 +- ...kManagerDecommissionIntegrationSuite.scala | 2 +- 21 files changed, 82 insertions(+), 142 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c2989314262c5..b6e14e8210c86 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -580,7 +580,7 @@ private[spark] class ExecutorAllocationManager( // when the task backlog decreased. if (decommissionEnabled) { val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( - id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray + id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) } else { client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b7a64d75a8d47..25ce9566df45c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -188,7 +188,7 @@ private[deploy] object DeployMessages { } case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], - exitStatus: Option[Int], workerLost: Boolean) + exitStatus: Option[Int], hostOpt: Option[String]) case class ApplicationRemoved(message: String) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index a6da8393bf405..aaf51ba526648 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient( cores)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) - case ExecutorUpdated(id, state, message, exitStatus, workerLost) => + case ExecutorUpdated(id, state, message, exitStatus, hostOpt) => val fullId = appId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { - listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) + listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, hostOpt) } else if (state == ExecutorState.DECOMMISSIONED) { listener.executorDecommissioned(fullId, - ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost)) + ExecutorDecommissionInfo(message.getOrElse(""), hostOpt)) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index e72f7e976bb0a..981e3adc2a59e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -39,7 +39,7 @@ private[spark] trait StandaloneAppClientListener { fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit def executorRemoved( - fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit + fullId: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 220e1c963d5ea..9558dc35f08cd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -308,7 +308,7 @@ private[deploy] class Master( appInfo.resetRetryCount() } - exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) + exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None)) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app @@ -909,9 +909,9 @@ private[deploy] class Master( exec.application.driver.send(ExecutorUpdated( exec.id, ExecutorState.DECOMMISSIONED, Some("worker decommissioned"), None, - // workerLost is being set to true here to let the driver know that the host (aka. worker) + // worker host is being set here to let the driver know that the host (aka. worker) // is also being decommissioned. - workerLost = true)) + Some(worker.host))) exec.state = ExecutorState.DECOMMISSIONED exec.application.removeExecutor(exec) } @@ -932,7 +932,7 @@ private[deploy] class Master( for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( - exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true)) + exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host))) exec.state = ExecutorState.LOST exec.application.removeExecutor(exec) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 07258f270b458..48045bafe6e3f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -172,10 +172,7 @@ private[spark] class CoarseGrainedExecutorBackend( driver match { case Some(endpoint) => logInfo("Sending DecommissionExecutor to driver.") - endpoint.send( - DecommissionExecutor( - executorId, - ExecutorDecommissionInfo(msg, isHostDecommissioned = false))) + endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg))) case _ => logError("No registered driver to send Decommission to.") } @@ -275,7 +272,7 @@ private[spark] class CoarseGrainedExecutorBackend( // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { driver.get.askSync[Boolean](DecommissionExecutor( - executorId, ExecutorDecommissionInfo(msg, false))) + executorId, ExecutorDecommissionInfo(msg))) } else { logError("No driver to message decommissioning.") } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b308115935d64..c3482c94761f6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1890,16 +1890,6 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional - private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL = - ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL") - .doc("Duration for which a decommissioned executor's information will be kept after its" + - "removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " + - "decommissioning even after the mapper executor has been decommissioned. This allows " + - "eager recovery from fetch failures caused by decommissioning, increasing job robustness.") - .version("3.1.0") - .timeConf(TimeUnit.SECONDS) - .createWithDefaultString("5m") - private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 18cd2410c1e4c..b0b66524ce709 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler( val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled val isHostDecommissioned = taskScheduler .getExecutorDecommissionState(bmAddress.executorId) - .exists(_.isHostDecommissioned) + .exists(_.hostOpt.isDefined) // Shuffle output of all executors on host `bmAddress.host` may be lost if: // - External shuffle service is enabled, so we assume that all shuffle data on node is @@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler( */ private[scheduler] def handleExecutorLost( execId: String, - workerLost: Boolean): Unit = { + hostOpt: Option[String]): Unit = { // if the cluster manager explicitly tells us that the entire worker was lost, then // we know to unregister shuffle output. (Note that "worker" specifically refers to the process // from a Standalone cluster, where the shuffle service lives in the Worker.) - val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled + val fileLost = hostOpt.isDefined || !env.blockManager.externalShuffleServiceEnabled removeExecutorAndUnregisterOutputs( execId = execId, fileLost = fileLost, - hostToUnregisterOutputs = None, + hostToUnregisterOutputs = hostOpt, maybeEpoch = None) } @@ -2366,11 +2366,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => - val workerLost = reason match { - case ExecutorProcessLost(_, true, _) => true - case _ => false + val hostOpt = reason match { + case ExecutorProcessLost(_, host, _) => host + case ExecutorDecommission(host) => host + case _ => None } - dagScheduler.handleExecutorLost(execId, workerLost) + dagScheduler.handleExecutorLost(execId, hostOpt) case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala index 48ae879a518ce..1ea9255d3ad13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala @@ -20,12 +20,12 @@ package org.apache.spark.scheduler /** * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. - * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is - * being decommissioned too. Used to infer if the shuffle data might - * be lost even if the external shuffle service is enabled. + * @param hostOpt When hostOpt is defined. It means the host (aka the `node` or `worker` + * in other places) has been decommissioned too. Used to infer if the + * shuffle data might be lost even if the external shuffle service is enabled. */ private[spark] -case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) +case class ExecutorDecommissionInfo(message: String, hostOpt: Option[String] = None) /** * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived @@ -37,4 +37,4 @@ case class ExecutorDecommissionState( // to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL // is configured. startTime: Long, - isHostDecommissioned: Boolean) + hostOpt: Option[String] = None) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 671dedaa5a6e8..263bdfa10e0ad 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -53,14 +53,14 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los /** * @param _message human readable loss reason - * @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service) + * @param hostOpt it's defined when the host is confirmed lost too (i.e. including shuffle service) * @param causedByApp whether the loss of the executor is the fault of the running app. * (assumed true by default unless known explicitly otherwise) */ private[spark] case class ExecutorProcessLost( _message: String = "Executor Process Lost", - workerLost: Boolean = false, + hostOpt: Option[String] = None, causedByApp: Boolean = true) extends ExecutorLossReason(_message) @@ -69,5 +69,8 @@ case class ExecutorProcessLost( * * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. + * + * @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is decommissioned too */ -private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.") +private [spark] case class ExecutorDecommission(var hostOpt: Option[String] = None) + extends ExecutorLossReason("Executor decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d446638107690..ab5338968d6b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -143,18 +143,6 @@ private[spark] class TaskSchedulerImpl( // continue to run even after being asked to decommission, but they will eventually exit. val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState] - // When they exit and we know of that via heartbeat failure, we will add them to this cache. - // This cache is consulted to know if a fetch failure is because a source executor was - // decommissioned. - lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder() - .expireAfterWrite( - conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS) - .ticker(new Ticker{ - override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis()) - }) - .build[String, ExecutorDecommissionState]() - .asMap() - def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap } @@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl( synchronized { // Don't bother noting decommissioning for executors that we don't know about if (executorIdToHost.contains(executorId)) { - val oldDecomStateOpt = executorsPendingDecommission.get(executorId) - val newDecomState = if (oldDecomStateOpt.isEmpty) { - // This is the first time we are hearing of decommissioning this executor, - // so create a brand new state. - ExecutorDecommissionState( - clock.getTimeMillis(), - decommissionInfo.isHostDecommissioned) - } else { - val oldDecomState = oldDecomStateOpt.get - if (!oldDecomState.isHostDecommissioned && decommissionInfo.isHostDecommissioned) { - // Only the cluster manager is allowed to send decommission messages with - // isHostDecommissioned set. So the new decommissionInfo is from the cluster - // manager and is thus authoritative. Flip isHostDecommissioned to true but keep the old - // decommission start time. - ExecutorDecommissionState( - oldDecomState.startTime, - isHostDecommissioned = true) - } else { - oldDecomState - } - } - executorsPendingDecommission(executorId) = newDecomState + executorsPendingDecommission(executorId) = + ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.hostOpt) } } rootPool.executorDecommission(executorId) @@ -952,28 +920,19 @@ private[spark] class TaskSchedulerImpl( override def getExecutorDecommissionState(executorId: String) : Option[ExecutorDecommissionState] = synchronized { - executorsPendingDecommission - .get(executorId) - .orElse(Option(decommissionedExecutorsRemoved.get(executorId))) + executorsPendingDecommission.get(executorId) } - override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = { + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None - val reason = givenReason match { - // Handle executor process loss due to decommissioning - case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) => - val executorDecommissionState = getExecutorDecommissionState(executorId) - ExecutorProcessLost( - message, - // Also mark the worker lost if we know that the host was decommissioned - origWorkerLost || executorDecommissionState.exists(_.isHostDecommissioned), - // Executor loss is certainly not caused by app if we knew that this executor is being - // decommissioned - causedByApp = executorDecommissionState.isEmpty && origCausedByApp) - case e => e - } synchronized { + reason match { + case e @ ExecutorDecommission(_) => + e.hostOpt = getExecutorDecommissionState(executorId).map(_.hostOpt).get + case _ => + } + if (executorIdToRunningTaskIds.contains(executorId)) { val hostPort = executorIdToHost(executorId) logExecutorLoss(executorId, hostPort, reason) @@ -1060,9 +1019,7 @@ private[spark] class TaskSchedulerImpl( } } - - val decomState = executorsPendingDecommission.remove(executorId) - decomState.foreach(decommissionedExecutorsRemoved.put(executorId, _)) + executorsPendingDecommission.remove(executorId) if (reason != LossReasonPending) { executorIdToHost -= executorId @@ -1104,7 +1061,7 @@ private[spark] class TaskSchedulerImpl( // exposed for test protected final def isHostDecommissioned(host: String): Boolean = { hostToExecutors.get(host).exists { executors => - executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned)) + executors.exists(e => getExecutorDecommissionState(e).exists(_.hostOpt.isDefined)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index ff0387602273d..673fe4fe27519 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -988,7 +988,7 @@ private[spark] class TaskSetManager( for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { val exitCausedByApp: Boolean = reason match { case exited: ExecutorExited => exited.exitCausedByApp - case ExecutorKilled => false + case ExecutorKilled | ExecutorDecommission(_) => false case ExecutorProcessLost(_, _, false) => false case _ => true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ca657313c14f6..d6f3bab45d16a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -390,16 +390,23 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case Some(executorInfo) => // This must be synchronized because variables mutated // in this block are read when requesting executors - val killed = CoarseGrainedSchedulerBackend.this.synchronized { + val lossReason = CoarseGrainedSchedulerBackend.this.synchronized { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId - executorsPendingDecommission -= executorId - executorsPendingToRemove.remove(executorId).getOrElse(false) + executorsPendingToRemove + .remove(executorId).map(_ => ExecutorKilled) + .getOrElse { + if (executorsPendingDecommission.remove(executorId)) { + ExecutorDecommission() + } else { + reason + } + } } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) - scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) + scheduler.executorLost(executorId, lossReason) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) case None => @@ -489,17 +496,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp decomInfo: ExecutorDecommissionInfo): Boolean = { logInfo(s"Asking executor $executorId to decommissioning.") - try { - scheduler.executorDecommission(executorId, decomInfo) - if (driverEndpoint != null) { - logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) - } - } catch { - case e: Exception => - logError(s"Unexpected error during decommissioning ${e.toString}", e) - return false - } + scheduler.executorDecommission(executorId, decomInfo) // Send decommission message to the executor (it could have originated on the executor // but not necessarily. CoarseGrainedSchedulerBackend.this.synchronized { @@ -656,7 +653,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp !executorsPendingToRemove.contains(id) && !executorsPendingLossReason.contains(id) && !executorsPendingDecommission.contains(id) - } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 3acb6f1088e13..baa6f6db3013e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -165,10 +165,10 @@ private[spark] class StandaloneSchedulerBackend( } override def executorRemoved( - fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = { + fullId: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit = { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) - case None => ExecutorProcessLost(message, workerLost = workerLost) + case None => ExecutorProcessLost(message, hostOpt) } logInfo("Executor %s removed: %s".format(fullId, message)) removeExecutor(fullId.split("/")(1), reason) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index 85ad4bdb3ec10..95b3bc0f67faf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -129,7 +129,7 @@ class AppClientSuite // We only record decommissioning for the executor we've requested assert(ci.listener.execDecommissionedMap.size === 1) val decommissionInfo = ci.listener.execDecommissionedMap.get(executorId) - assert(decommissionInfo != null && decommissionInfo.isHostDecommissioned, + assert(decommissionInfo != null && decommissionInfo.hostOpt.isDefined, s"$executorId should have been decommissioned along with its worker") } @@ -245,7 +245,7 @@ class AppClientSuite } def executorRemoved( - id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = { + id: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit = { execRemovedList.add(id) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a7f8affee918c..edc27ed7a2d2d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -848,9 +848,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } private val shuffleFileLossTests = Seq( - ("executor process lost with shuffle service", ExecutorProcessLost("", false), true, false), - ("worker lost with shuffle service", ExecutorProcessLost("", true), true, true), - ("worker lost without shuffle service", ExecutorProcessLost("", true), false, true), + ("executor process lost with shuffle service", ExecutorProcessLost("", None), true, false), + ("worker lost with shuffle service", ExecutorProcessLost("", None), true, true), + ("worker lost without shuffle service", ExecutorProcessLost("", None), false, true), ("executor failure with shuffle service", ExecutorKilled, true, false), ("executor failure without shuffle service", ExecutorKilled, false, true)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 26c9d9130e56a..1d340ffe982da 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -158,8 +158,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B .exists(s => s.contains(exec0) && s.contains(exec1))) assert(scheduler.getExecutorsAliveOnHost(host1).exists(_.contains(exec2))) - scheduler.executorDecommission(exec1, ExecutorDecommissionInfo("test", false)) - scheduler.executorDecommission(exec2, ExecutorDecommissionInfo("test", true)) + scheduler.executorDecommission(exec1, ExecutorDecommissionInfo("test", None)) + scheduler.executorDecommission(exec2, ExecutorDecommissionInfo("test", Some(host1))) assert(scheduler.isExecutorAlive(exec0)) assert(!Seq(exec1, exec2).exists(scheduler.isExecutorAlive)) @@ -1865,17 +1865,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val clock = new ManualClock(10000L) val scheduler = setupSchedulerForDecommissionTests(clock, 2) val oldTime = clock.getTimeMillis() - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", false)) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", true)) + scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", None)) + scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", Some("host1"))) clock.advance(3000L) - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0 new", false)) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1 new", false)) + scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0 new", None)) + scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1 new", None)) assert(scheduler.getExecutorDecommissionState("executor0") - === Some(ExecutorDecommissionState(oldTime, false))) + === Some(ExecutorDecommissionState(oldTime, None))) assert(scheduler.getExecutorDecommissionState("executor1") - === Some(ExecutorDecommissionState(oldTime, true))) + === Some(ExecutorDecommissionState(oldTime, Some("host1")))) assert(scheduler.getExecutorDecommissionState("executor2").isEmpty) } @@ -1890,7 +1890,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) scheduler.executorLost("executor0", ExecutorExited(0, false, "normal")) assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", false)) + scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", None)) assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) // 0th task just died above @@ -1903,13 +1903,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionState("executor1").isEmpty) // executor 1 is decommissioned before loosing - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) + scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None)) assert(scheduler.getExecutorDecommissionState("executor1").isDefined) clock.advance(2000) // executor1 is eventually lost scheduler.executorLost("executor1", ExecutorExited(0, false, "normal")) - assert(scheduler.decommissionedExecutorsRemoved.size === 1) assert(scheduler.executorsPendingDecommission.isEmpty) // So now both the tasks are no longer running assert(manager.copiesRunning.take(2) === Array(0, 0)) @@ -1917,16 +1916,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Decommission state should hang around a bit after removal ... assert(scheduler.getExecutorDecommissionState("executor1").isDefined) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) + scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None)) clock.advance(2000) - assert(scheduler.decommissionedExecutorsRemoved.size === 1) assert(scheduler.getExecutorDecommissionState("executor1").isDefined) // The default timeout for expiry is 300k milliseconds (5 minutes) which completes now, // and the executor1's decommission state should finally be purged. clock.advance(300000) assert(scheduler.getExecutorDecommissionState("executor1").isEmpty) - assert(scheduler.decommissionedExecutorsRemoved.isEmpty) // Now give it some resources and both tasks should be rerun val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq( diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 86d4e92df723b..c389fd2ffa8b1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -681,8 +681,8 @@ class TaskSetManagerSuite assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY)) // Decommission all executors on host0, to mimic CoarseGrainedSchedulerBackend. - sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", true)) - sched.executorDecommission(exec1, ExecutorDecommissionInfo("test", true)) + sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", Some(host0))) + sched.executorDecommission(exec1, ExecutorDecommissionInfo("test", Some(host0))) assert(manager.myLocalityLevels === Array(ANY)) } @@ -707,7 +707,7 @@ class TaskSetManagerSuite assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY)) // Decommission the only executor (without the host) that the task is interested in running on. - sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", false)) + sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", None)) assert(manager.myLocalityLevels === Array(NODE_LOCAL, ANY)) } @@ -2029,8 +2029,7 @@ class TaskSetManagerSuite // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be now // checked if they should be speculated. // (TASK 2 -> 15, TASK 3 -> 15) - sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom", - isHostDecommissioned = false)) + sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom", None)) assert(sched.getExecutorDecommissionState("exec2").map(_.startTime) === Some(clock.getTimeMillis())) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index 4264d45b36f2a..129eb8bf91051 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -64,7 +64,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] sc.getExecutorIds().tail.foreach { id => - sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false), + sched.decommissionExecutor(id, ExecutorDecommissionInfo("", None), adjustTargetNumExecutors = false) assert(rdd3.sortByKey().collect().length === 100) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 1ccb53f32dc2e..83bb66efdac9e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -77,7 +77,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() // Make the executors decommission, finish, exit, and not be replaced. - val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))).toArray + val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", None))).toArray sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 37836a9b49042..094b893cdda2e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -192,7 +192,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", isHostDecommissioned = false), + ExecutorDecommissionInfo("", None), adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() From 404f92b85cc6f9e5dadaa8301a4b9dd6024a143c Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sun, 30 Aug 2020 11:24:07 +0800 Subject: [PATCH 02/18] fix compile error --- .../spark/streaming/scheduler/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 8f74d2d9959d1..1037950a4424f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -137,7 +137,7 @@ private[streaming] class ExecutorAllocationManager( val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) if (conf.get(DECOMMISSION_ENABLED)) { client.decommissionExecutor(execIdToRemove, - ExecutorDecommissionInfo("spark scale down", false), + ExecutorDecommissionInfo("spark scale down", None), adjustTargetNumExecutors = true) } else { client.killExecutor(execIdToRemove) From 47da0d7d9f6c76fc452c06f04eb84e595cc32943 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sun, 30 Aug 2020 11:45:25 +0800 Subject: [PATCH 03/18] fix --- .../streaming/scheduler/ExecutorAllocationManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index ec3ff456b8eab..f1870718c6730 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -98,7 +98,7 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase /** Verify that a particular executor was scaled down. */ def verifyScaledDownExec(expectedExec: Option[String]): Unit = { if (expectedExec.nonEmpty) { - val decomInfo = ExecutorDecommissionInfo("spark scale down", false) + val decomInfo = ExecutorDecommissionInfo("spark scale down", None) if (decommissioning) { verify(allocationClient, times(1)).decommissionExecutor( meq(expectedExec.get), meq(decomInfo), meq(true)) From 3152e9b02304126bc189bc58bddf281c96d8ec6b Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 10:02:49 +0800 Subject: [PATCH 04/18] fix TSM rest --- .../cluster/CoarseGrainedSchedulerBackend.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d6f3bab45d16a..94f426111fc47 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -394,15 +394,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId + val decommissioned = executorsPendingDecommission.remove(executorId) executorsPendingToRemove - .remove(executorId).map(_ => ExecutorKilled) - .getOrElse { - if (executorsPendingDecommission.remove(executorId)) { - ExecutorDecommission() - } else { - reason - } - } + .remove(executorId).filter(killedByDriver => killedByDriver).map(_ => ExecutorKilled) + .getOrElse(if (decommissioned) ExecutorDecommission() else reason) } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) From a0bc4f6db19c9633dd51de3347a3a606b71a3d38 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 10:25:29 +0800 Subject: [PATCH 05/18] fix tests --- .../spark/scheduler/DAGSchedulerSuite.scala | 14 ++++++++++--- .../scheduler/TaskSchedulerImplSuite.scala | 21 +++---------------- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index edc27ed7a2d2d..50c890026d7ba 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -849,8 +849,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi private val shuffleFileLossTests = Seq( ("executor process lost with shuffle service", ExecutorProcessLost("", None), true, false), - ("worker lost with shuffle service", ExecutorProcessLost("", None), true, true), - ("worker lost without shuffle service", ExecutorProcessLost("", None), false, true), + ("worker lost with shuffle service", ExecutorProcessLost("", Some("hostA")), true, true), + ("worker lost without shuffle service", ExecutorProcessLost("", Some("hostA")), false, true), ("executor failure with shuffle service", ExecutorKilled, true, false), ("executor failure without shuffle service", ExecutorKilled, false, true)) @@ -874,10 +874,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) submit(reduceRdd, Array(0)) completeShuffleMapStageSuccessfully(0, 0, 1) + val expectHostFileLoss = event match { + case ExecutorProcessLost(_, hostOpt, _) => hostOpt.isDefined + case _ => false + } runEvent(ExecutorLost("hostA-exec", event)) verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") if (expectFileLoss) { - verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") + if (expectHostFileLoss) { + verify(mapOutputTracker, times(1)).removeOutputsOnHost("hostA") + } else { + verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") + } intercept[MetadataFetchFailedException] { mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 1d340ffe982da..f29eb70eb3628 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1864,18 +1864,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("scheduler should keep the decommission state where host was decommissioned") { val clock = new ManualClock(10000L) val scheduler = setupSchedulerForDecommissionTests(clock, 2) - val oldTime = clock.getTimeMillis() + val decomTime = clock.getTimeMillis() scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", None)) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", Some("host1"))) - clock.advance(3000L) - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0 new", None)) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1 new", None)) - assert(scheduler.getExecutorDecommissionState("executor0") - === Some(ExecutorDecommissionState(oldTime, None))) + === Some(ExecutorDecommissionState(decomTime, None))) assert(scheduler.getExecutorDecommissionState("executor1") - === Some(ExecutorDecommissionState(oldTime, Some("host1")))) + === Some(ExecutorDecommissionState(decomTime, Some("host1")))) assert(scheduler.getExecutorDecommissionState("executor2").isEmpty) } @@ -1914,17 +1910,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(manager.copiesRunning.take(2) === Array(0, 0)) clock.advance(2000) - // Decommission state should hang around a bit after removal ... - assert(scheduler.getExecutorDecommissionState("executor1").isDefined) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None)) - clock.advance(2000) - assert(scheduler.getExecutorDecommissionState("executor1").isDefined) - - // The default timeout for expiry is 300k milliseconds (5 minutes) which completes now, - // and the executor1's decommission state should finally be purged. - clock.advance(300000) - assert(scheduler.getExecutorDecommissionState("executor1").isEmpty) - // Now give it some resources and both tasks should be rerun val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten From 75a14a649d65d2db2da824680a339a9cae5580cf Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 3 Sep 2020 16:58:51 +0800 Subject: [PATCH 06/18] rename to workerHost --- .../org/apache/spark/deploy/DeployMessage.scala | 2 +- .../deploy/client/StandaloneAppClient.scala | 6 +++--- .../client/StandaloneAppClientListener.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 16 ++++++++-------- .../scheduler/ExecutorDecommissionInfo.scala | 6 +++--- .../spark/scheduler/ExecutorLossReason.scala | 8 ++++---- .../spark/scheduler/TaskSchedulerImpl.scala | 6 +++--- .../cluster/StandaloneSchedulerBackend.scala | 7 +++++-- .../spark/deploy/client/AppClientSuite.scala | 4 ++-- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- 10 files changed, 31 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 25ce9566df45c..b9703ade5086e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -188,7 +188,7 @@ private[deploy] object DeployMessages { } case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], - exitStatus: Option[Int], hostOpt: Option[String]) + exitStatus: Option[Int], workerHost: Option[String]) case class ApplicationRemoved(message: String) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index aaf51ba526648..e5efb15f6bc51 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient( cores)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) - case ExecutorUpdated(id, state, message, exitStatus, hostOpt) => + case ExecutorUpdated(id, state, message, exitStatus, workerHost) => val fullId = appId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { - listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, hostOpt) + listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost) } else if (state == ExecutorState.DECOMMISSIONED) { listener.executorDecommissioned(fullId, - ExecutorDecommissionInfo(message.getOrElse(""), hostOpt)) + ExecutorDecommissionInfo(message.getOrElse(""), workerHost)) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 981e3adc2a59e..76970ac9829c9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -39,7 +39,7 @@ private[spark] trait StandaloneAppClientListener { fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit def executorRemoved( - fullId: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit + fullId: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b0b66524ce709..080e0e7f1552f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler( val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled val isHostDecommissioned = taskScheduler .getExecutorDecommissionState(bmAddress.executorId) - .exists(_.hostOpt.isDefined) + .exists(_.workerHost.isDefined) // Shuffle output of all executors on host `bmAddress.host` may be lost if: // - External shuffle service is enabled, so we assume that all shuffle data on node is @@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler( */ private[scheduler] def handleExecutorLost( execId: String, - hostOpt: Option[String]): Unit = { + workerHost: Option[String]): Unit = { // if the cluster manager explicitly tells us that the entire worker was lost, then // we know to unregister shuffle output. (Note that "worker" specifically refers to the process // from a Standalone cluster, where the shuffle service lives in the Worker.) - val fileLost = hostOpt.isDefined || !env.blockManager.externalShuffleServiceEnabled + val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled removeExecutorAndUnregisterOutputs( execId = execId, fileLost = fileLost, - hostToUnregisterOutputs = hostOpt, + hostToUnregisterOutputs = workerHost, maybeEpoch = None) } @@ -2366,12 +2366,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => - val hostOpt = reason match { - case ExecutorProcessLost(_, host, _) => host - case ExecutorDecommission(host) => host + val workerHost = reason match { + case ExecutorProcessLost(_, workerHost, _) => workerHost + case ExecutorDecommission(workerHost) => workerHost case _ => None } - dagScheduler.handleExecutorLost(execId, hostOpt) + dagScheduler.handleExecutorLost(execId, workerHost) case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala index 1ea9255d3ad13..82692674220bb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala @@ -20,12 +20,12 @@ package org.apache.spark.scheduler /** * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. - * @param hostOpt When hostOpt is defined. It means the host (aka the `node` or `worker` + * @param workerHost When workerHost is defined. It means the host (aka the `node` or `worker` * in other places) has been decommissioned too. Used to infer if the * shuffle data might be lost even if the external shuffle service is enabled. */ private[spark] -case class ExecutorDecommissionInfo(message: String, hostOpt: Option[String] = None) +case class ExecutorDecommissionInfo(message: String, workerHost: Option[String] = None) /** * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived @@ -37,4 +37,4 @@ case class ExecutorDecommissionState( // to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL // is configured. startTime: Long, - hostOpt: Option[String] = None) + workerHost: Option[String] = None) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 263bdfa10e0ad..8eb9322e8ae7f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -53,14 +53,14 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los /** * @param _message human readable loss reason - * @param hostOpt it's defined when the host is confirmed lost too (i.e. including shuffle service) + * @param workerHost it's defined when the host is confirmed lost too (i.e. including shuffle service) * @param causedByApp whether the loss of the executor is the fault of the running app. * (assumed true by default unless known explicitly otherwise) */ private[spark] case class ExecutorProcessLost( _message: String = "Executor Process Lost", - hostOpt: Option[String] = None, + workerHost: Option[String] = None, causedByApp: Boolean = true) extends ExecutorLossReason(_message) @@ -70,7 +70,7 @@ case class ExecutorProcessLost( * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. * - * @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is decommissioned too + * @param workerHost it will be set by [[TaskSchedulerImpl]] when the host is decommissioned too */ -private [spark] case class ExecutorDecommission(var hostOpt: Option[String] = None) +private [spark] case class ExecutorDecommission(var workerHost: Option[String] = None) extends ExecutorLossReason("Executor decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ab5338968d6b4..ca26ac12d036a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -911,7 +911,7 @@ private[spark] class TaskSchedulerImpl( // Don't bother noting decommissioning for executors that we don't know about if (executorIdToHost.contains(executorId)) { executorsPendingDecommission(executorId) = - ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.hostOpt) + ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.workerHost) } } rootPool.executorDecommission(executorId) @@ -929,7 +929,7 @@ private[spark] class TaskSchedulerImpl( synchronized { reason match { case e @ ExecutorDecommission(_) => - e.hostOpt = getExecutorDecommissionState(executorId).map(_.hostOpt).get + e.workerHost = getExecutorDecommissionState(executorId).map(_.workerHost).get case _ => } @@ -1061,7 +1061,7 @@ private[spark] class TaskSchedulerImpl( // exposed for test protected final def isHostDecommissioned(host: String): Boolean = { hostToExecutors.get(host).exists { executors => - executors.exists(e => getExecutorDecommissionState(e).exists(_.hostOpt.isDefined)) + executors.exists(e => getExecutorDecommissionState(e).exists(_.workerHost.isDefined)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index baa6f6db3013e..244a365401a66 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -165,10 +165,13 @@ private[spark] class StandaloneSchedulerBackend( } override def executorRemoved( - fullId: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit = { + fullId: String, + message: String, + exitStatus: Option[Int], + workerHost: Option[String]): Unit = { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) - case None => ExecutorProcessLost(message, hostOpt) + case None => ExecutorProcessLost(message, workerHost) } logInfo("Executor %s removed: %s".format(fullId, message)) removeExecutor(fullId.split("/")(1), reason) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index 95b3bc0f67faf..fe88822bb46b5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -129,7 +129,7 @@ class AppClientSuite // We only record decommissioning for the executor we've requested assert(ci.listener.execDecommissionedMap.size === 1) val decommissionInfo = ci.listener.execDecommissionedMap.get(executorId) - assert(decommissionInfo != null && decommissionInfo.hostOpt.isDefined, + assert(decommissionInfo != null && decommissionInfo.workerHost.isDefined, s"$executorId should have been decommissioned along with its worker") } @@ -245,7 +245,7 @@ class AppClientSuite } def executorRemoved( - id: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit = { + id: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit = { execRemovedList.add(id) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 50c890026d7ba..436765808e22b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -875,7 +875,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0)) completeShuffleMapStageSuccessfully(0, 0, 1) val expectHostFileLoss = event match { - case ExecutorProcessLost(_, hostOpt, _) => hostOpt.isDefined + case ExecutorProcessLost(_, workerHost, _) => workerHost.isDefined case _ => false } runEvent(ExecutorLost("hostA-exec", event)) From b6490fcbb5349a5c17a5d3ae5df6d1a01a616b45 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 3 Sep 2020 17:01:55 +0800 Subject: [PATCH 07/18] update comment --- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9558dc35f08cd..48516cdf83291 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -910,7 +910,8 @@ private[deploy] class Master( exec.id, ExecutorState.DECOMMISSIONED, Some("worker decommissioned"), None, // worker host is being set here to let the driver know that the host (aka. worker) - // is also being decommissioned. + // is also being decommissioned. So the driver can unregister all the shuffle map + // statues located at this host when it receives the executor lost event. Some(worker.host))) exec.state = ExecutorState.DECOMMISSIONED exec.application.removeExecutor(exec) From bea465b4a4f4e25cdf486e9ffd87473d9bae38a3 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 3 Sep 2020 17:05:13 +0800 Subject: [PATCH 08/18] close the parenthesis --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 94f426111fc47..e868b9b674efa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -493,7 +493,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logInfo(s"Asking executor $executorId to decommissioning.") scheduler.executorDecommission(executorId, decomInfo) // Send decommission message to the executor (it could have originated on the executor - // but not necessarily. + // but not necessarily). CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.get(executorId) match { case Some(executorInfo) => From c12c82df1200a04924ff36c4332a10d1a87a7f57 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 3 Sep 2020 17:10:38 +0800 Subject: [PATCH 09/18] update --- .../cluster/CoarseGrainedSchedulerBackend.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e868b9b674efa..2f30dab913458 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -395,9 +395,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap -= executorId executorsPendingLossReason -= executorId val decommissioned = executorsPendingDecommission.remove(executorId) - executorsPendingToRemove - .remove(executorId).filter(killedByDriver => killedByDriver).map(_ => ExecutorKilled) - .getOrElse(if (decommissioned) ExecutorDecommission() else reason) + if (executorsPendingToRemove.remove(executorId).getOrElse(false)) { + ExecutorKilled + } else if (decommissioned) { + ExecutorDecommission() + } else { + reason + } } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) From 90f1fd1558571ac9afafee437d8d2d35266c4ca9 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 3 Sep 2020 17:24:09 +0800 Subject: [PATCH 10/18] avoid var --- .../apache/spark/scheduler/ExecutorLossReason.scala | 4 ++-- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 6 ------ .../cluster/CoarseGrainedSchedulerBackend.scala | 13 +++++++------ 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 8eb9322e8ae7f..314ccb805155d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -70,7 +70,7 @@ case class ExecutorProcessLost( * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. * - * @param workerHost it will be set by [[TaskSchedulerImpl]] when the host is decommissioned too + * @param workerHost it's defined when the worker is decommissioned too */ -private [spark] case class ExecutorDecommission(var workerHost: Option[String] = None) +private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) extends ExecutorLossReason("Executor decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ca26ac12d036a..107c517ca06bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -927,12 +927,6 @@ private[spark] class TaskSchedulerImpl( var failedExecutor: Option[String] = None synchronized { - reason match { - case e @ ExecutorDecommission(_) => - e.workerHost = getExecutorDecommissionState(executorId).map(_.workerHost).get - case _ => - } - if (executorIdToRunningTaskIds.contains(executorId)) { val hostPort = executorIdToHost(executorId) logExecutorLoss(executorId, hostPort, reason) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2f30dab913458..be5ce0ce73aa0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,8 +92,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. private val executorsPendingLossReason = new HashSet[String] - // Executors which are being decommissioned - protected val executorsPendingDecommission = new HashSet[String] + // Executors which are being decommissioned. Maps from executorId to + // workerHost(it's defined when the worker is also decommissioned) + protected val executorsPendingDecommission = new HashMap[String, Option[String]] // A map of ResourceProfile id to map of hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") @@ -397,8 +398,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val decommissioned = executorsPendingDecommission.remove(executorId) if (executorsPendingToRemove.remove(executorId).getOrElse(false)) { ExecutorKilled - } else if (decommissioned) { - ExecutorDecommission() + } else if (decommissioned.isDefined) { + ExecutorDecommission(decommissioned.get) } else { reason } @@ -468,11 +469,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], adjustTargetNumExecutors: Boolean): Seq[String] = { - val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) => CoarseGrainedSchedulerBackend.this.synchronized { // Only bother decommissioning executors which are alive. if (isExecutorActive(executorId)) { - executorsPendingDecommission += executorId + executorsPendingDecommission(executorId) = decomInfo.workerHost true } else { false From 84df7359e7c253220d115756ab144740ee2915cd Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 3 Sep 2020 17:36:50 +0800 Subject: [PATCH 11/18] fix style --- .../scala/org/apache/spark/scheduler/ExecutorLossReason.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 314ccb805155d..f7750fb04e263 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -53,7 +53,8 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los /** * @param _message human readable loss reason - * @param workerHost it's defined when the host is confirmed lost too (i.e. including shuffle service) + * @param workerHost it's defined when the host is confirmed lost too (i.e. including + * shuffle service) * @param causedByApp whether the loss of the executor is the fault of the running app. * (assumed true by default unless known explicitly otherwise) */ From 0c0749eb683749f4635b57a15bf15d19eebe34c5 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 3 Sep 2020 17:41:19 +0800 Subject: [PATCH 12/18] update --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index be5ce0ce73aa0..4eb75b1080fbf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -395,8 +395,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId + val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) val decommissioned = executorsPendingDecommission.remove(executorId) - if (executorsPendingToRemove.remove(executorId).getOrElse(false)) { + if (killedByDriver) { ExecutorKilled } else if (decommissioned.isDefined) { ExecutorDecommission(decommissioned.get) From 6e8b57e4c25fd3da71dee43530e6c04d3f3f7150 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 4 Sep 2020 18:07:27 +0800 Subject: [PATCH 13/18] drop parenthesis --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4eb75b1080fbf..850ddff03fe94 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,8 +92,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. private val executorsPendingLossReason = new HashSet[String] - // Executors which are being decommissioned. Maps from executorId to - // workerHost(it's defined when the worker is also decommissioned) + // Executors which are being decommissioned. Maps from executorId to workerHost. protected val executorsPendingDecommission = new HashMap[String, Option[String]] // A map of ResourceProfile id to map of hostname with its possible task number running on it From a39ba8e06b17e9917cfc4ee8c88559f3dff1c73d Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 4 Sep 2020 18:08:56 +0800 Subject: [PATCH 14/18] rename to workerHostOpt --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 850ddff03fe94..0f144125af7bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -395,11 +395,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap -= executorId executorsPendingLossReason -= executorId val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) - val decommissioned = executorsPendingDecommission.remove(executorId) + val workerHostOpt = executorsPendingDecommission.remove(executorId) if (killedByDriver) { ExecutorKilled - } else if (decommissioned.isDefined) { - ExecutorDecommission(decommissioned.get) + } else if (workerHostOpt.isDefined) { + ExecutorDecommission(workerHostOpt.get) } else { reason } From ff02621b8e86e747f800e8e89563cc02db6fc978 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 4 Sep 2020 18:09:41 +0800 Subject: [PATCH 15/18] remove extra space --- .../spark/scheduler/cluster/StandaloneSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 244a365401a66..34b03dfec9e80 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -168,7 +168,7 @@ private[spark] class StandaloneSchedulerBackend( fullId: String, message: String, exitStatus: Option[Int], - workerHost: Option[String]): Unit = { + workerHost: Option[String]): Unit = { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) case None => ExecutorProcessLost(message, workerHost) From 9096cb9d40767454aac0339f174125a19d2367ec Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 4 Sep 2020 18:10:45 +0800 Subject: [PATCH 16/18] it is --- .../scala/org/apache/spark/scheduler/ExecutorLossReason.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index f7750fb04e263..f2eb4a7047b56 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -71,7 +71,7 @@ case class ExecutorProcessLost( * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. * - * @param workerHost it's defined when the worker is decommissioned too + * @param workerHost it is defined when the worker is decommissioned too */ private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) extends ExecutorLossReason("Executor decommission.") From 58add670f43818e556b5fbe441274c68c3adefef Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 4 Sep 2020 18:11:36 +0800 Subject: [PATCH 17/18] comma --- .../org/apache/spark/scheduler/ExecutorDecommissionInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala index 82692674220bb..7eec070232c3b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler /** * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. - * @param workerHost When workerHost is defined. It means the host (aka the `node` or `worker` + * @param workerHost When workerHost is defined, it means the host (aka the `node` or `worker` * in other places) has been decommissioned too. Used to infer if the * shuffle data might be lost even if the external shuffle service is enabled. */ From d2468407f3d02e5a191ad373b5f2201a930cbb09 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 7 Sep 2020 21:30:16 +0800 Subject: [PATCH 18/18] add comment --- core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b9703ade5086e..83f373d526e90 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -187,6 +187,8 @@ private[deploy] object DeployMessages { Utils.checkHostPort(hostPort) } + // When the host of Worker is lost or decommissioned, the `workerHost` is the host address + // of that Worker. Otherwise, it's None. case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], exitStatus: Option[Int], workerHost: Option[String])