Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32736][CORE] Avoid caching the removed decommissioned executors in TaskSchedulerImpl #29579

Closed
wants to merge 18 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ private[spark] class ExecutorAllocationManager(
// when the task backlog decreased.
if (decommissionEnabled) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray
id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private[deploy] object DeployMessages {
}

case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
exitStatus: Option[Int], workerLost: Boolean)
exitStatus: Option[Int], hostOpt: Option[String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a better name than hostOpt ? How about just "Hostname" ? The type already conveys that this is an optional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about hostLost ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also leave the name as workerLost and just make it be an Optional[String] ? In the spirit of minimal code change ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, I changed it to workerHost. We need the keyword worker because it's specific to Standalone Worker. And Host gives the direct meaning of the value. And workerLost sounds more appropriate for the Boolean type. WDYT?


case class ApplicationRemoved(message: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient(
cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
case ExecutorUpdated(id, state, message, exitStatus, hostOpt) =>
val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, hostOpt)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId,
ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost))
ExecutorDecommissionInfo(message.getOrElse(""), hostOpt))
}

case WorkerRemoved(id, host, message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] trait StandaloneAppClientListener {
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit

def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
fullId: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit

def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private[deploy] class Master(
appInfo.resetRetryCount()
}

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None))

if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
Expand Down Expand Up @@ -909,9 +909,9 @@ private[deploy] class Master(
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None,
// workerLost is being set to true here to let the driver know that the host (aka. worker)
// worker host is being set here to let the driver know that the host (aka. worker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you reword the comment to be more accurate now :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated a little bit.

// is also being decommissioned.
workerLost = true))
Some(worker.host)))
exec.state = ExecutorState.DECOMMISSIONED
exec.application.removeExecutor(exec)
}
Expand All @@ -932,7 +932,7 @@ private[deploy] class Master(
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host)))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ private[spark] class CoarseGrainedExecutorBackend(
driver match {
case Some(endpoint) =>
logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(
DecommissionExecutor(
executorId,
ExecutorDecommissionInfo(msg, isHostDecommissioned = false)))
endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg)))
case _ =>
logError("No registered driver to send Decommission to.")
}
Expand Down Expand Up @@ -275,7 +272,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(
executorId, ExecutorDecommissionInfo(msg, false)))
executorId, ExecutorDecommissionInfo(msg)))
} else {
logError("No driver to message decommissioning.")
}
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1890,16 +1890,6 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL =
ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL")
.doc("Duration for which a decommissioned executor's information will be kept after its" +
"removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " +
"decommissioning even after the mapper executor has been decommissioned. This allows " +
"eager recovery from fetch failures caused by decommissioning, increasing job robustness.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5m")

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler(
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
val isHostDecommissioned = taskScheduler
.getExecutorDecommissionState(bmAddress.executorId)
.exists(_.isHostDecommissioned)
.exists(_.hostOpt.isDefined)

// Shuffle output of all executors on host `bmAddress.host` may be lost if:
// - External shuffle service is enabled, so we assume that all shuffle data on node is
Expand Down Expand Up @@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
workerLost: Boolean): Unit = {
hostOpt: Option[String]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this method's comment also if you decide to go with hostOpt instead of workerLost (perhaps you ought to consider my consider my comment on making workerLost itself be an Optional[String]). The comment still refers to "standalone worker"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to workerHost, so I guess we can keep the comment?

// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
val fileLost = hostOpt.isDefined || !env.blockManager.externalShuffleServiceEnabled
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
hostToUnregisterOutputs = None,
hostToUnregisterOutputs = hostOpt,
maybeEpoch = None)
}

Expand Down Expand Up @@ -2366,11 +2366,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case ExecutorProcessLost(_, true, _) => true
case _ => false
val hostOpt = reason match {
case ExecutorProcessLost(_, host, _) => host
case ExecutorDecommission(host) => host
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
case _ => None
}
dagScheduler.handleExecutorLost(execId, workerLost)
dagScheduler.handleExecutorLost(execId, hostOpt)

case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.spark.scheduler
/**
* Message providing more detail when an executor is being decommissioned.
* @param message Human readable reason for why the decommissioning is happening.
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is
* being decommissioned too. Used to infer if the shuffle data might
* be lost even if the external shuffle service is enabled.
* @param hostOpt When hostOpt is defined. It means the host (aka the `node` or `worker`
* in other places) has been decommissioned too. Used to infer if the
* shuffle data might be lost even if the external shuffle service is enabled.
*/
private[spark]
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean)
case class ExecutorDecommissionInfo(message: String, hostOpt: Option[String] = None)

/**
* State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived
Expand All @@ -37,4 +37,4 @@ case class ExecutorDecommissionState(
// to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL
// is configured.
startTime: Long,
isHostDecommissioned: Boolean)
hostOpt: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los

/**
* @param _message human readable loss reason
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
* @param hostOpt it's defined when the host is confirmed lost too (i.e. including shuffle service)
* @param causedByApp whether the loss of the executor is the fault of the running app.
* (assumed true by default unless known explicitly otherwise)
*/
private[spark]
case class ExecutorProcessLost(
_message: String = "Executor Process Lost",
workerLost: Boolean = false,
hostOpt: Option[String] = None,
causedByApp: Boolean = true)
extends ExecutorLossReason(_message)

Expand All @@ -69,5 +69,8 @@ case class ExecutorProcessLost(
*
* This is used by the task scheduler to remove state associated with the executor, but
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
*
* @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is decommissioned too
*/
private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
private [spark] case class ExecutorDecommission(var hostOpt: Option[String] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of this change of making the hostOpt be a var instead of a val. I think you only need this for line 932 in TaskSchedulerImpl. I am sure you would be able to accommodate that use case in a different way.

The reason I don't like it is because other ExecutorLossReason's are "messages" (for example ExecutorProcessLost) and these messages tend to be immutable. I think it's a bit hacky to have ExecutorDecommission masquerading as a message but then make it be mutable.

Even ExecutorDecommission is a message that the TaskSchedulerImpl enqueues into the event loop of the DAGScheduler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't like the way myself too. I tried another way to get rid of the problem here but requires storing the redundant workHost info at CoarseGrainedSchedulerBackend.

extends ExecutorLossReason("Executor decommission.")
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,6 @@ private[spark] class TaskSchedulerImpl(
// continue to run even after being asked to decommission, but they will eventually exit.
val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState]

// When they exit and we know of that via heartbeat failure, we will add them to this cache.
// This cache is consulted to know if a fetch failure is because a source executor was
// decommissioned.
lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder()
.expireAfterWrite(
conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS)
.ticker(new Ticker{
override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis())
})
.build[String, ExecutorDecommissionState]()
.asMap()

def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
}
Expand Down Expand Up @@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl(
synchronized {
// Don't bother noting decommissioning for executors that we don't know about
if (executorIdToHost.contains(executorId)) {
val oldDecomStateOpt = executorsPendingDecommission.get(executorId)
val newDecomState = if (oldDecomStateOpt.isEmpty) {
// This is the first time we are hearing of decommissioning this executor,
// so create a brand new state.
ExecutorDecommissionState(
clock.getTimeMillis(),
decommissionInfo.isHostDecommissioned)
} else {
val oldDecomState = oldDecomStateOpt.get
if (!oldDecomState.isHostDecommissioned && decommissionInfo.isHostDecommissioned) {
// Only the cluster manager is allowed to send decommission messages with
// isHostDecommissioned set. So the new decommissionInfo is from the cluster
// manager and is thus authoritative. Flip isHostDecommissioned to true but keep the old
// decommission start time.
ExecutorDecommissionState(
oldDecomState.startTime,
isHostDecommissioned = true)
} else {
oldDecomState
}
}
executorsPendingDecommission(executorId) = newDecomState
executorsPendingDecommission(executorId) =
ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.hostOpt)
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
}
}
rootPool.executorDecommission(executorId)
Expand All @@ -952,28 +920,19 @@ private[spark] class TaskSchedulerImpl(

override def getExecutorDecommissionState(executorId: String)
: Option[ExecutorDecommissionState] = synchronized {
executorsPendingDecommission
.get(executorId)
.orElse(Option(decommissionedExecutorsRemoved.get(executorId)))
executorsPendingDecommission.get(executorId)
}

override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = {
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None
val reason = givenReason match {
// Handle executor process loss due to decommissioning
case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
val executorDecommissionState = getExecutorDecommissionState(executorId)
ExecutorProcessLost(
message,
// Also mark the worker lost if we know that the host was decommissioned
origWorkerLost || executorDecommissionState.exists(_.isHostDecommissioned),
// Executor loss is certainly not caused by app if we knew that this executor is being
// decommissioned
causedByApp = executorDecommissionState.isEmpty && origCausedByApp)
case e => e
}

synchronized {
reason match {
case e @ ExecutorDecommission(_) =>
e.hostOpt = getExecutorDecommissionState(executorId).map(_.hostOpt).get
case _ =>
}

if (executorIdToRunningTaskIds.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logExecutorLoss(executorId, hostPort, reason)
Expand Down Expand Up @@ -1060,9 +1019,7 @@ private[spark] class TaskSchedulerImpl(
}
}


val decomState = executorsPendingDecommission.remove(executorId)
decomState.foreach(decommissionedExecutorsRemoved.put(executorId, _))
executorsPendingDecommission.remove(executorId)

if (reason != LossReasonPending) {
executorIdToHost -= executorId
Expand Down Expand Up @@ -1104,7 +1061,7 @@ private[spark] class TaskSchedulerImpl(
// exposed for test
protected final def isHostDecommissioned(host: String): Boolean = {
hostToExecutors.get(host).exists { executors =>
executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned))
executors.exists(e => getExecutorDecommissionState(e).exists(_.hostOpt.isDefined))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
val exitCausedByApp: Boolean = reason match {
case exited: ExecutorExited => exited.exitCausedByApp
case ExecutorKilled => false
case ExecutorKilled | ExecutorDecommission(_) => false
case ExecutorProcessLost(_, _, false) => false
case _ => true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,16 +390,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case Some(executorInfo) =>
// This must be synchronized because variables mutated
// in this block are read when requesting executors
val killed = CoarseGrainedSchedulerBackend.this.synchronized {
val lossReason = CoarseGrainedSchedulerBackend.this.synchronized {
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
executorsPendingDecommission -= executorId
executorsPendingToRemove.remove(executorId).getOrElse(false)
val decommissioned = executorsPendingDecommission.remove(executorId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename decommissioned to workerHostOpt and perhaps give it an explicit type: Option[Option[String]]. Its no longer a simple boolean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to workerHostOpt make sense to me. But I don't have a strong feeling to add the explicit type. It also breaks one line length limitation. I'd like to keep it in one line when it's not necessary to break it.

executorsPendingToRemove
.remove(executorId).filter(killedByDriver => killedByDriver).map(_ => ExecutorKilled)
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
.getOrElse(if (decommissioned) ExecutorDecommission() else reason)
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
scheduler.executorLost(executorId, lossReason)
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None =>
Expand Down Expand Up @@ -489,17 +491,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
decomInfo: ExecutorDecommissionInfo): Boolean = {

logInfo(s"Asking executor $executorId to decommissioning.")
try {
scheduler.executorDecommission(executorId, decomInfo)
if (driverEndpoint != null) {
logInfo("Propagating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
}
} catch {
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
return false
}
scheduler.executorDecommission(executorId, decomInfo)
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
// Send decommission message to the executor (it could have originated on the executor
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
// but not necessarily.
CoarseGrainedSchedulerBackend.this.synchronized {
Expand Down Expand Up @@ -656,7 +648,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
!executorsPendingToRemove.contains(id) &&
!executorsPendingLossReason.contains(id) &&
!executorsPendingDecommission.contains(id)

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ private[spark] class StandaloneSchedulerBackend(
}

override def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
fullId: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit = {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => ExecutorProcessLost(message, workerLost = workerLost)
case None => ExecutorProcessLost(message, hostOpt)
}
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason)
Expand Down
Loading