-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32736][CORE] Avoid caching the removed decommissioned executors in TaskSchedulerImpl #29579
Conversation
Test build #128019 has finished for PR 29579 at commit
|
Test build #128024 has finished for PR 29579 at commit
|
Test build #128025 has finished for PR 29579 at commit
|
@agrawaldevesh @cloud-fan @holdenk Please take a look, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a semantic change here:
Earlier the shuffle status was removed both when the downstream triggered a fetch failure AND when the executor is lost (heartbeat failure). Whichever comes first.
However, with this PR, it seems you are removing the "clear shuffle on fetch failure" part. It seems that you will wait for the heartbeat failure to occur and the host be lost, even if the downstream has signaled fetch failure. Can you confirm if this understanding is right ?
The memory used by the cache is trivially small. And the code simplification is also not a whole lot: so it seems that I am missing the bigger motivation for this change.
@@ -188,7 +188,7 @@ private[deploy] object DeployMessages { | |||
} | |||
|
|||
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], | |||
exitStatus: Option[Int], workerLost: Boolean) | |||
exitStatus: Option[Int], hostOpt: Option[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a better name than hostOpt ? How about just "Hostname" ? The type already conveys that this is an optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about hostLost ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also leave the name as workerLost and just make it be an Optional[String] ? In the spirit of minimal code change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a second thought, I changed it to workerHost
. We need the keyword worker
because it's specific to Standalone Worker. And Host
gives the direct meaning of the value. And workerLost
sounds more appropriate for the Boolean
type. WDYT?
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Show resolved
Hide resolved
Thank you for the quick response @agrawaldevesh .
I think this PR doesn't change the semantics. We still clear shuffle status on fetch failure as you can see the only change for fetch failure in DAGScheduler is: - .exists(_.isHostDecommissioned)
+ .exists(_.hostOpt.isDefined) If the fetch failure comes first before the executor lost, DAGScheduler will still ask TaskSchedulerImpl for the decommission state and unregister the shuffle status then. While if the executor lost comes first, fetch failure becomes a NoOp on shuffle status unregister. I think the only difference is that, before this PR, if the executor lost event comes first, it can only unregister shuffle map status on that executor, even if we know the host is also decommissioned. But now we can unregister the host shuffle status because we pass in the host info directly. |
Thanks for the explanations! I will get back to you in like 2-3 days after playing with it locally. (I am on PTO tomorrow). |
Test build #128064 has finished for PR 29579 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I think we need a better name than hostOpt. Consider keeping the name as workerLost.
- Consider making ExecutorDecommission be immutable.
I couldn't repro the failing GH test in ExecutorAllocationManagerSuite, but interestingly a different one fails for me on the upstream itself. Might be a good idea to rebase and get a green GH run.
@@ -188,7 +188,7 @@ private[deploy] object DeployMessages { | |||
} | |||
|
|||
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], | |||
exitStatus: Option[Int], workerLost: Boolean) | |||
exitStatus: Option[Int], hostOpt: Option[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also leave the name as workerLost and just make it be an Optional[String] ? In the spirit of minimal code change ?
@@ -909,9 +909,9 @@ private[deploy] class Master( | |||
exec.application.driver.send(ExecutorUpdated( | |||
exec.id, ExecutorState.DECOMMISSIONED, | |||
Some("worker decommissioned"), None, | |||
// workerLost is being set to true here to let the driver know that the host (aka. worker) | |||
// worker host is being set here to let the driver know that the host (aka. worker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you reword the comment to be more accurate now :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated a little bit.
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Show resolved
Hide resolved
@@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler( | |||
*/ | |||
private[scheduler] def handleExecutorLost( | |||
execId: String, | |||
workerLost: Boolean): Unit = { | |||
hostOpt: Option[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change this method's comment also if you decide to go with hostOpt instead of workerLost (perhaps you ought to consider my consider my comment on making workerLost itself be an Optional[String]). The comment still refers to "standalone worker"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to workerHost
, so I guess we can keep the comment?
*/ | ||
private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.") | ||
private [spark] case class ExecutorDecommission(var hostOpt: Option[String] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a fan of this change of making the hostOpt be a var instead of a val. I think you only need this for line 932 in TaskSchedulerImpl. I am sure you would be able to accommodate that use case in a different way.
The reason I don't like it is because other ExecutorLossReason's are "messages" (for example ExecutorProcessLost) and these messages tend to be immutable. I think it's a bit hacky to have ExecutorDecommission masquerading as a message but then make it be mutable.
Even ExecutorDecommission is a message that the TaskSchedulerImpl enqueues into the event loop of the DAGScheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I don't like the way myself too. I tried another way to get rid of the problem here but requires storing the redundant workHost
info at CoarseGrainedSchedulerBackend
.
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Show resolved
Hide resolved
@cloud-fan @holdenk Could you also take a look? |
Test build #128251 has finished for PR 29579 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM ... Just last few comments and will accept right afterwards.
// Executors which are being decommissioned | ||
protected val executorsPendingDecommission = new HashSet[String] | ||
// Executors which are being decommissioned. Maps from executorId to | ||
// workerHost(it's defined when the worker is also decommissioned) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: space after workerHost.
I think workerHost is already an Option and thus already matches the value type of the executorsPendingDecommission map. Thus, we can perhaps drop the parenthesis clause entirely ?
if (killedByDriver) { | ||
ExecutorKilled | ||
} else if (decommissioned.isDefined) { | ||
ExecutorDecommission(decommissioned.get) | ||
} else { | ||
reason | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:-) I can read !.
@@ -394,10 +395,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |||
addressToExecutorId -= executorInfo.executorAddress | |||
executorDataMap -= executorId | |||
executorsPendingLossReason -= executorId | |||
val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) | |||
val decommissioned = executorsPendingDecommission.remove(executorId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename decommissioned to workerHostOpt and perhaps give it an explicit type: Option[Option[String]]. Its no longer a simple boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to workerHostOpt
make sense to me. But I don't have a strong feeling to add the explicit type. It also breaks one line length limitation. I'd like to keep it in one line when it's not necessary to break it.
core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
Outdated
Show resolved
Hide resolved
@@ -70,7 +71,7 @@ case class ExecutorProcessLost( | |||
* This is used by the task scheduler to remove state associated with the executor, but | |||
* not yet fail any tasks that were running in the executor before the executor is "fully" lost. | |||
* | |||
* @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is decommissioned too | |||
* @param workerHost it's defined when the worker is decommissioned too |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "it's" -> "it is"
Also, should we explicitly bring out the word 'host' here ? "It is defined when the worker host is decommissioned too"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "worker" should be enough.
@@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient( | |||
cores)) | |||
listener.executorAdded(fullId, workerId, hostPort, cores, memory) | |||
|
|||
case ExecutorUpdated(id, state, message, exitStatus, workerLost) => | |||
case ExecutorUpdated(id, state, message, exitStatus, workerHost) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I would still be okay with workerLost being an Option[String] instead of a Boolean. Obviously, had it been called "workerIsLost" then we would have to rename it. But I am also fine with the new name workerHost as well. I don't particularly think that the name workerLost must connote a boolean.
This ExecutorUpdated message is a case in point where the "lost" part is meaningful because it refers to the "worker that is lost" as opposed to some random worker-host.
But no strong feelings on this and I am happy with the choice workerHost.
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
Outdated
Show resolved
Hide resolved
Test build #128297 has finished for PR 29579 at commit
|
retest this please. |
Test build #128305 has finished for PR 29579 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM ! Thanks for simplifying and thinning out the logic. I think the changes are more direct and easier to read.
I confirm that there are no semantic changes introduced.
Cc: @holdenk, @cloud-fan please do review this PR.
@cloud-fan @holdenk Could you take a look? |
Test build #128356 has finished for PR 29579 at commit
|
thanks, merging to master! |
thanks all! |
What changes were proposed in this pull request?
The motivation of this PR is to avoid caching the removed decommissioned executors in
TaskSchedulerImpl
. The cache is introduced in #29422. The cache will hold theisHostDecommissioned
info for a while. So if the taskFetchFailure
event comes after the executor loss event,DAGScheduler
can still get theisHostDecommissioned
from the cache and unregister the host shuffle map status when the host is decommissioned too.This PR tries to achieve the same goal without the cache. Instead of saving the
workerLost
inExecutorUpdated
/ExecutorDecommissionInfo
/ExecutorDecommissionState
, we could save thehostOpt
directly. When the host is decommissioned or lost too, thehostOpt
can be a specific host address. Otherwise, it'sNone
to indicate that only the executor is decommissioned or lost.Now that we have the host info, we can also unregister the host shuffle map status when
executorLost
is triggered for the decommissioned executor.Besides, this PR also includes a few cleanups around the touched code.
Why are the changes needed?
It helps to unregister the shuffle map status earlier for both decommission and normal executor lost cases.
It also saves memory in
TaskSchedulerImpl
and simplifies the code a little bit.Does this PR introduce any user-facing change?
No.
How was this patch tested?
This PR only refactor the code. The original behaviour should be covered by
DecommissionWorkerSuite
.