-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[WIP][SPARK-32913][CORE][K8S] Improve ExecutorDecommissionInfo and ExecutorDecommissionState for different use cases #29788
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@holdenk @cloud-fan @agrawaldevesh Please take a look, thanks! |
Please tag this PR as WIP until it is tested, thanks for working on improving the code though @Ngone51 :) |
Kubernetes integration test starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm personally against this refactoring, but if folks working on making this work for other cluster backends say this refactoring would make it easier for them I'm fine to proceed (although I'd appreciate a chance to do a more detailed review before hand).
package org.apache.spark | ||
|
||
import org.apache.spark.scheduler.ExecutorDecommissionInfo | ||
import org.apache.spark.scheduler.ExecutorDecommissionReason |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we renaming this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I would argue against un-necessary renaming even if it seems a bit "unnatural". It creates un-necessary diff noise.
To me "Info" and "Reason" are both similar: They both portend "additional information".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess @Ngone51 is trying to follow the style of TaskEndReason
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @dongjoon-hyun for the clarification. In addition to follow the style of TaskEndReason
, I acutally also want to handle the decommission info/reason in the similar way of TaskEndReason
.
|
||
package org.apache.spark.scheduler | ||
|
||
private[spark] sealed trait ExecutorDecommissionReason { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get why this is a sealed trait, namely we're pattern matching against it. But this seems to remove flexibility for anyone working on scheduler backends
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk, can you please provide an example of how having this as a sealed trait would limit the flexibility ?
It is marked as a private[spark], so the resource manager specific scheduler backends, should be able to extend it ... no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duh. Sorry for my n00bness. I can totally see why this shouldn't be a sealed trait: For example it is forcing the TestExecutorDecommissionInfo to be in this file.
@Ngone51 is there a strong reason for making this be a sealed trait ? Is that required by the RPC framework for example ? If not, I don't think its worth it.
/** | ||
* For the Kubernetes workloads | ||
*/ | ||
case class K8SDecommission() extends ExecutorTriggeredDecommission |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe we could have a better level here, there isn't really anything K8s specific about this kind of message. Rather all external cluster manager decommissions could be the same perhaps?
Kubernetes integration test status failure |
Test build #128829 has finished for PR 29788 at commit
|
@tgravescs, @mridulm, @squito FYI |
adjustTargetNumExecutors: Boolean, | ||
triggeredByExecutor: Boolean): Seq[String] = { | ||
killExecutors(executorsAndDecomInfo.map(_._1), | ||
executorsAndDecomReason: Array[(String, ExecutorDecommissionReason)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that different executors have different ExecutorDecommissionReason
? If it's not possible, I think we are over-engineering here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how it was earlier -- so we aren't changing the semantics save the renaming :-) And plus yes this can happen: Different executors on different hosts would have different ExecutorDecommissionReason/Info with different hosts potentially in them.
This is simply a bulk api : Instead of making n calls we are folding them into one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a great refactoring and it does help to separate out the different use cases. It's close but I that there are some rough edges worth fixing and we can make the changes be even tighter.
package org.apache.spark | ||
|
||
import org.apache.spark.scheduler.ExecutorDecommissionInfo | ||
import org.apache.spark.scheduler.ExecutorDecommissionReason |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I would argue against un-necessary renaming even if it seems a bit "unnatural". It creates un-necessary diff noise.
To me "Info" and "Reason" are both similar: They both portend "additional information".
adjustTargetNumExecutors: Boolean, | ||
triggeredByExecutor: Boolean): Seq[String] = { | ||
killExecutors(executorsAndDecomInfo.map(_._1), | ||
executorsAndDecomReason: Array[(String, ExecutorDecommissionReason)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how it was earlier -- so we aren't changing the semantics save the renaming :-) And plus yes this can happen: Different executors on different hosts would have different ExecutorDecommissionReason/Info with different hosts potentially in them.
This is simply a bulk api : Instead of making n calls we are folding them into one.
|
||
package org.apache.spark.scheduler | ||
|
||
private[spark] sealed trait ExecutorDecommissionReason { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk, can you please provide an example of how having this as a sealed trait would limit the flexibility ?
It is marked as a private[spark], so the resource manager specific scheduler backends, should be able to extend it ... no ?
*/ | ||
private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) | ||
extends ExecutorLossReason("Executor decommission.") | ||
private [spark] case class ExecutorDecommission(reason: String, host: Option[String] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My scala knowledge is really really poor, but I would rather we make this be a non case class if you are planning to do this. Currently, I think the field "reason" is going to be duplicated in the base class ExecutorLossReason and the ExecutorDecommission.
That's also the reason why you are pattern matching it above with an additional _ (for the reason
) argument, when you really don't care about the reason
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case class is neeed because we'd apply pattern matching on it.
The "reason" is necessary because of class inheritance, no? Please see ExecutorProcessLost
for as instance, the ExecutorProcessLost
also has the field _message
, which is needed to assigne to ExecutorProcessLost.message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can right unapply methods if you need to do pattern matching with something other than a case class.
val workerHost = reason match { | ||
case ExecutorProcessLost(_, workerHost, _) => workerHost | ||
case ExecutorDecommission(workerHost) => workerHost | ||
case ExecutorDecommission(_, host) => host |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment below for ExecutorDecommission
... Should this be changed to a:
case decom @ ExecutorDecommission => decom.workerHost // or decom.host
You don't need to add an extra '_' then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this actually a good point! This's actually a rule of Databricks' scala style guide. But I just follow the style of above ExecutorProcessLost
here. I think it's acceptable when there're not many arugumenetes are being expaned.
package org.apache.spark.scheduler | ||
|
||
private[spark] sealed trait ExecutorDecommissionReason { | ||
val reason: String = "decommissioned" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the reason
field is really needed anywhere, besides it being used for toString
? Should we just require overriding toString
by marking toString
abstract ? I don't think that child classes need to override both toString
and reason
: I would prefer we just override methods instead of fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also indirectly use the reason for logExecutorLoss
. And yeah we can just override toString indeed.
logInfo(s"Executor $executorId on $hostPort killed by driver.") | ||
case ExecutorDecommission(reason, _) => | ||
// use logInfo instead of logError as the loss of decommissioned executor is what we expect | ||
logInfo(s"Decommissioned executor $executorId on $hostPort shutdown: $reason") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of 'shutdown', should we say 'is finally lost' ? To be more accurate in this setting.
+1 on this change to avoid log spam.
val exitCausedByApp: Boolean = reason match { | ||
case exited: ExecutorExited => exited.exitCausedByApp | ||
case ExecutorKilled | ExecutorDecommission(_) => false | ||
case ExecutorKilled | ExecutorDecommission(_, _) => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we should instead pattern match in a separate arm like:
_ @ ExecutorDecommission => false
To avoid having to change the case arms when we make changes to the structure definitions.
} | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this test only class somewhere in the test only package ?
See TestResourceIDs as an example.
private class KubernetesDriverEndpoint extends DriverEndpoint { | ||
|
||
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { | ||
case ExecutorDecommissioning(executorId) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't fully follow the need for distinction b/w the K8s case and the simple executor triggered case.
I thought K8s only needs the SIGPWR based thing, and indeed ExecutorDecommissioning is only sent in response to a SIGPWR.
So I am missing why we override ExecutorDecommissioning
here and the motivation for K8SDecommission
.
Since there was another PR in the same area committed that broke the existing integration tests in this area I don't feel confident with my soft reservations and switching to a vetoing for this change (e.g. -1). |
(willing to switch back to -0 once the original issue is addressed, I just don't want us in a state with broken tests as normal). |
Thanks for everyone's review. Agree with @holdenk that we should resolve the issue (#29722 (comment)) first. We can continue the discussion after the issue resolved. Thanks again! |
This PRs adds 5 classes to represent 5 different decommission reasons, but we don't really have 5 different branches to handle these reasons. I think the abstraction should be based on the real requirements, can we simplify them? |
So it turns out that the PR (#29722) doesn't break the existing integration tests. (It's someone else but we don't know yet). Therefore, I think we are safe to continue the discussion. So as for the main concern, I think I can actually change the
Then, we'd still keep one decommission info instance. Does this sounds good to you? (BTW, the updates could be late since the PR (#29722) is already reverted and we have conflits here. And the re-submitted PR is being block by the broken integration tests.) |
I think it would be good to see your proposal in code @Ngone51 because I'm not 100% sure what you mean. |
Sounds good |
If @holdenk thinks so, then I agree as well! |
Sorry for the bother @holden 😢 |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Split
ExecutorDecommissionInfo
into 5 classes for different use cases:DynamicAllocationDecommission
: for the case where decommission is triggered at executor dynamic allocationStandaloneDecommission(workerHost)
: for the Standalone caseK8SDecommission
: for the Kubernetes case, it extendsExecutorTriggeredDecommission
.ExecutorTriggeredDecommission
: for the case where decommission is triggered at executorTestExecutorDecommission(host)
: test only.And all of them extend
ExecutorDecommissionReason
with a specific decommission reason.On the other hand,
ExecutorDecommissionState
would accept theExecutorDecommissionReason
as an attribute and exposes common information by functions, e.g.,isHostDecommissioned()
.Why are the changes needed?
We have various decommission uses cases now. And
ExecutorDecommissionInfo
becomes not enough to distinguish the case where decommission is triggered at executor after #29722. That's also the reason why we addedtriggeredByExecutor
. But things liketriggeredByExecutor
can be annoying and not easy to extend. So we need to improve the current way to work better with different cases.There are a few benefits with this PR:
Get rid of the parameter
triggeredByExecutor
No longer need to save the redundant
workerHost
infoThe decommission handling logic is more clear than before
Does this PR introduce any user-facing change?
No.
How was this patch tested?
WIP: I need to check whether k8s has an existing test covered by my change.