Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40596][CORE] Populate ExecutorDecommission with messages in ExecutorDecommissionInfo #38030

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2899,7 +2899,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case ExecutorLost(execId, reason) =>
val workerHost = reason match {
case ExecutorProcessLost(_, workerHost, _) => workerHost
case ExecutorDecommission(workerHost) => workerHost
case ExecutorDecommission(workerHost, _) => workerHost
case _ => None
}
dagScheduler.handleExecutorLost(execId, workerHost)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ case class ExecutorProcessLost(
* If you update this code make sure to re-run the K8s integration tests.
*
* @param workerHost it is defined when the worker is decommissioned too
* @param reason detailed decommission message
*/
private [spark] case class ExecutorDecommission(workerHost: Option[String] = None)
extends ExecutorLossReason("Executor decommission.")
private [spark] case class ExecutorDecommission(
workerHost: Option[String] = None,
reason: String = "")
extends ExecutorLossReason(ExecutorDecommission.msgPrefix + reason)

private[spark] object ExecutorDecommission {
val msgPrefix = "Executor decommission: "
}
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
val exitCausedByApp: Boolean = reason match {
case ExecutorExited(_, false, _) => false
case ExecutorKilled | ExecutorDecommission(_) => false
case ExecutorKilled | ExecutorDecommission(_, _) => false
case ExecutorProcessLost(_, _, false) => false
// If the task is launching, this indicates that Driver has sent LaunchTask to Executor,
// but Executor has not sent StatusUpdate(TaskState.RUNNING) to Driver. Hence, we assume
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]

// Executors which are being decommissioned. Maps from executorId to workerHost.
protected val executorsPendingDecommission = new HashMap[String, Option[String]]
// Executors which are being decommissioned. Maps from executorId to ExecutorDecommissionInfo.
protected val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo]

// A map of ResourceProfile id to map of hostname with its possible task number running on it
@GuardedBy("CoarseGrainedSchedulerBackend.this")
Expand Down Expand Up @@ -444,11 +444,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap -= executorId
executorsPendingLossReason -= executorId
val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false)
val workerHostOpt = executorsPendingDecommission.remove(executorId)
val decommissionInfoOpt = executorsPendingDecommission.remove(executorId)
if (killedByDriver) {
ExecutorKilled
} else if (workerHostOpt.isDefined) {
ExecutorDecommission(workerHostOpt.get)
} else if (decommissionInfoOpt.isDefined) {
val decommissionInfo = decommissionInfoOpt.get
ExecutorDecommission(decommissionInfo.workerHost, decommissionInfo.message)
} else {
reason
}
Expand Down Expand Up @@ -532,7 +533,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
scheduler.executorDecommission(executorId, decomInfo)
executorsPendingDecommission(executorId) = decomInfo.workerHost
executorsPendingDecommission(executorId) = decomInfo
Some(executorId)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private[spark] class ExecutorMonitor(
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
if (event.reason == ExecutorLossMessage.decommissionFinished ||
event.reason == ExecutorDecommission().message) {
(event.reason != null && event.reason.startsWith(ExecutorDecommission.msgPrefix))) {
metrics.gracefullyDecommissioned.inc()
} else if (removed.decommissioning) {
metrics.decommissionUnfinished.inc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,14 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
}

sc.addSparkListener(new SparkListener {
val listener = new SparkListener {
var removeReasonValidated = false

override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = {
executorRemovedSem.release()
if (execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg 0") {
removeReasonValidated = true
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
Expand All @@ -211,7 +216,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
}
}
}
})
}
sc.addSparkListener(listener)

// Cache the RDD lazily
if (persist) {
Expand Down Expand Up @@ -247,7 +253,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors
sched.decommissionExecutor(
execToDecommission,
ExecutorDecommissionInfo("", None),
ExecutorDecommissionInfo("test msg 0", None),
adjustTargetNumExecutors = true)
val decomTime = new SystemClock().getTimeMillis()

Expand Down Expand Up @@ -343,5 +349,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// should have same value like before
assert(testRdd.count() === numParts)
assert(accum.value === numParts)
import scala.language.reflectiveCalls
assert(listener.removeReasonValidated)
}
}