Skip to content

Commit

Permalink
[SPARK-40596][CORE] Populate ExecutorDecommission with messages in Ex…
Browse files Browse the repository at this point in the history
…ecutorDecommissionInfo

### What changes were proposed in this pull request?

This change populates `ExecutorDecommission` with messages in `ExecutorDecommissionInfo`.

### Why are the changes needed?

Currently the message in `ExecutorDecommission` is a fixed value ("Executor decommission."), so it is the same for all cases, e.g. spot instance interruptions and auto-scaling down. With this change we can better differentiate those cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a unit test.

Closes #38030 from bozhang2820/spark-40596.

Authored-by: Bo Zhang <bo.zhang@databricks.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
  • Loading branch information
bozhang2820 authored and Ngone51 committed Oct 11, 2022
1 parent f8d68b0 commit 4eb0edf
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2949,7 +2949,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case ExecutorLost(execId, reason) =>
val workerHost = reason match {
case ExecutorProcessLost(_, workerHost, _) => workerHost
case ExecutorDecommission(workerHost) => workerHost
case ExecutorDecommission(workerHost, _) => workerHost
case _ => None
}
dagScheduler.handleExecutorLost(execId, workerHost)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ case class ExecutorProcessLost(
* If you update this code make sure to re-run the K8s integration tests.
*
* @param workerHost it is defined when the worker is decommissioned too
* @param reason detailed decommission message
*/
private [spark] case class ExecutorDecommission(workerHost: Option[String] = None)
extends ExecutorLossReason("Executor decommission.")
private [spark] case class ExecutorDecommission(
workerHost: Option[String] = None,
reason: String = "")
extends ExecutorLossReason(ExecutorDecommission.msgPrefix + reason)

private[spark] object ExecutorDecommission {
val msgPrefix = "Executor decommission: "
}
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
val exitCausedByApp: Boolean = reason match {
case ExecutorExited(_, false, _) => false
case ExecutorKilled | ExecutorDecommission(_) => false
case ExecutorKilled | ExecutorDecommission(_, _) => false
case ExecutorProcessLost(_, _, false) => false
// If the task is launching, this indicates that Driver has sent LaunchTask to Executor,
// but Executor has not sent StatusUpdate(TaskState.RUNNING) to Driver. Hence, we assume
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]

// Executors which are being decommissioned. Maps from executorId to workerHost.
protected val executorsPendingDecommission = new HashMap[String, Option[String]]
// Executors which are being decommissioned. Maps from executorId to ExecutorDecommissionInfo.
protected val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo]

// A map of ResourceProfile id to map of hostname with its possible task number running on it
@GuardedBy("CoarseGrainedSchedulerBackend.this")
Expand Down Expand Up @@ -447,11 +447,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap -= executorId
executorsPendingLossReason -= executorId
val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false)
val workerHostOpt = executorsPendingDecommission.remove(executorId)
val decommissionInfoOpt = executorsPendingDecommission.remove(executorId)
if (killedByDriver) {
ExecutorKilled
} else if (workerHostOpt.isDefined) {
ExecutorDecommission(workerHostOpt.get)
} else if (decommissionInfoOpt.isDefined) {
val decommissionInfo = decommissionInfoOpt.get
ExecutorDecommission(decommissionInfo.workerHost, decommissionInfo.message)
} else {
reason
}
Expand Down Expand Up @@ -535,7 +536,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
scheduler.executorDecommission(executorId, decomInfo)
executorsPendingDecommission(executorId) = decomInfo.workerHost
executorsPendingDecommission(executorId) = decomInfo
Some(executorId)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private[spark] class ExecutorMonitor(
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
if (event.reason == ExecutorLossMessage.decommissionFinished ||
event.reason == ExecutorDecommission().message) {
(event.reason != null && event.reason.startsWith(ExecutorDecommission.msgPrefix))) {
metrics.gracefullyDecommissioned.inc()
} else if (removed.decommissioning) {
metrics.decommissionUnfinished.inc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,14 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
}

sc.addSparkListener(new SparkListener {
val listener = new SparkListener {
var removeReasonValidated = false

override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = {
executorRemovedSem.release()
if (execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg 0") {
removeReasonValidated = true
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
Expand All @@ -211,7 +216,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
}
}
}
})
}
sc.addSparkListener(listener)

// Cache the RDD lazily
if (persist) {
Expand Down Expand Up @@ -247,7 +253,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors
sched.decommissionExecutor(
execToDecommission,
ExecutorDecommissionInfo("", None),
ExecutorDecommissionInfo("test msg 0", None),
adjustTargetNumExecutors = true)
val decomTime = new SystemClock().getTimeMillis()

Expand Down Expand Up @@ -343,5 +349,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// should have same value like before
assert(testRdd.count() === numParts)
assert(accum.value === numParts)
import scala.language.reflectiveCalls
assert(listener.removeReasonValidated)
}
}

0 comments on commit 4eb0edf

Please sign in to comment.