-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-32850][CORE][K8S] Simplify the RPC message flow of decommission #29817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@holdenk, seems like the test failure wasn't caused by this PR. Dose your -1 at here still stand? cc @dongjoon-hyun as well per #29751. |
Kubernetes integration test starting |
5ca0fe8
to
15f6085
Compare
The -1 is only on the PR that I mentioned it on (the refactoring with many sub classes) which still stands. I would like to review this PR more though since I think we probably need better test coverage of this change than it originally had. Sound good? Thanks for checking in about that :) |
Got it about -1 but what about we pushing this as is, and work on the test as followups? It's a bit odds that we reverted it for the reason this PR didn't cause, and ask more things before merging it in. |
Kubernetes integration test status failure |
I think we should not commit this with the K8s test being broken. It’s in the same chunk of code and changes the logging string (although another PR also changed that string first too?). I do not believe this PR was appropriately tested when first merger given it changed decommissioning messages and did not run the decommission tests. For clarity: if you want to fix the tests in a separate PR that’s ok with me, but I would prefer not to commit this without passing integration testing. |
Test build #128934 has finished for PR 29817 at commit
|
Test build #128935 has finished for PR 29817 at commit
|
We don't. That's also why I added |
@holdenk, why don't you take a look for the test failure since it blocks all changes in decommission in k8s, and you were involved mainly in the development there? |
Can we find out which commit caused the test failure in the first place? We should either revert that commit, or fix it soon, as the test failure blocks others. Since this PR is resubmitted (although the revert is not necessary now given the test failure was already there), I think it's a good chance for @holdenk to take a closer look before re-merging. And I agree with @holdenk that we can't merge a PR when the related test is already broken. We should fix that first. @holdenk can you give some hints about it? I took a look at https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33556/ , but I don't even see how the test failed. The output is very different from normal Spark tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reverting this and resubmitting it. I know you believe the original PR didn't cause the test failure, but that's only half true, this PR just broke the test some more.
That being said I still have concerns this PR is not sufficiently tested, can you add some more tests for the new flows you've introduced?
core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Outdated
Show resolved
Hide resolved
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + | ||
"disabling worker decommission feature.")(decommissionSelf) | ||
"disabling worker decommission feature.") { | ||
self.send(WorkerSigPWRReceived) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might mean we return from handling the signal right away rather than waiting for decommissionSelf to be finished. Is this an intentional change?
Also this will no longer report a decommissioning failure by signal return value, so may block pod deletion or other cleanup task longer than needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This's Worker, I guess you cares more about the executor? In Worker, decommissionSelf
always returns true. and In exeutor, there's a change to return false to fail the decommissionSelf but seems rarely happen. If you would insist on returning the value, I think we can use askSync instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you look into what the difference of this behavior might cause at the system level and then tell me if that’s a desired change? I’m ok with us making changes here, I just want us to be intentional and know if we need to test the change and it seems like this change was incidental.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value of the signal handling decides whether we should forward the signal to the other handlers. If true
, no other handlers will handle the PWR
signal except ourselves. If false
, we will handle it (for decommission) and other handlers will handle it too. Do you expect other handlers to continue handling the SIGPWR
when the system isn't really experiencing a power failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do, I think if the signal is unhandled then the process will be killed immediately. If we think of decommissioning/graceful shut down I believe that behavior is desirable, since if we can't shut down gracefully the least we can do is exit quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, but please note that I only updated for the executor's case since Worker's case always returns true
before.
core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Show resolved
Hide resolved
if (decommissioned) { | ||
val msg = "Asked to launch a task while decommissioned." | ||
logError(msg) | ||
driver match { | ||
case Some(endpoint) => | ||
logInfo("Sending DecommissionExecutor to driver.") | ||
endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg))) | ||
case _ => | ||
logError("No registered driver to send Decommission to.") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should just take this out. async sends could fail, re-sending the message if we receive a request which indicates the master hasn't received our notification indicates we should resend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, we use askSync
to send decommission notice to the driver whenever it needs(see ExecutorSigPWRReceived
). Second, even if driver receives the decommission notice successfully, there still could be LaunchTask request due to the async between LaunchTask and decommission notice. Third, this part also uses async send, so we still can not ensure the decommission notice is received by driver successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, so we should resend the notice then right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Sorry if I didn't explain it clearly.
We should already send decommission notice to the driver when decommissioned = true
, which using askSync
. If askSync
still fail, I wouldn't expect another send would succeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I don't see the point of resending the notice to the driver, especially in this race condition. If we want to make sure the driver is noticed, we should design a mechanism for it, instead of doing it here randomly.
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
context.reply(decommissionExecutor(executorId, decommissionInfo, | ||
adjustTargetNumExecutors = false)) | ||
case ExecutorDecommissioning(executorId) => | ||
logWarning(s"Received executor $executorId decommissioned message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here might be where you break the test suite last time, so double check it.
} | ||
|
||
def decommissionBlockManager(): Unit = synchronized { | ||
def decommissionBlockManager(): Unit = storageEndpoint.ask(DecommissionBlockManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you make this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I unsterstand your question correctly:
We didn't really change the decommissionBlockManager
. The original decommissionBlockManager
has been renamed to decommissionSelf
to avoid the naming collision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, although maybe introducing a new name instead of changing the use of a previous function name would be easier for verifying.
I really like the idea of simplifying the RPC message flow, thanks for taking this on @Ngone51 and I'm sorry the code here is so brittle to these types of changes (the K8s integration tests are kind of limited). |
@holdenk, you're kidding right? There was only one test failure that was not caused by this PR in K8S tests. That test was just fixed by you. How come this PR broke more tests? Can you be more explicit on that? Which tests were more broken, and how did you test? |
I’m not joking. The test has multiple conditions. Another PR broke one of the conditions. This PR in its original broke another one of the conditions. It was reported in the same test because K8s integration tests focus on the integration so they cover multiple pieces. If you want I’m happy to do a video call (and I can be flex on my timezone if that’s a constraint) and screen share so we can discuss the details. |
If it helps I called out the part of the PR I believe most likely responsible for that during my code review already. |
Except for the breaking of the log(like you just fixed), what other conditions does this PR break? |
There's only one new flow that is from Master to Worker. I can update the existing test by verifying Worker's decommission status... What kind of other concerns do you have? Could you elaborate more? So I can improve the PR accordingly. |
I’m mostly concerned with the change around how the storage decommissioning is being done now, I’d like to see some tests that the flow from the master to the worker results in storage decommissioning. |
Also there are multiple log conditions, this PR broke one of them. Another PR had broken another one. |
@Ngone51, shall we add a test #29817 (comment) and fix the test as requested? Seems like otherwise good to go. |
15f6085
to
23bfdf5
Compare
Kubernetes integration test starting |
e625051
to
3c1e033
Compare
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test status success |
Test build #130159 has finished for PR 29817 at commit
|
Test build #130163 has finished for PR 29817 at commit
|
Merged to master. |
thanks all!! |
.set(config.DECOMMISSION_ENABLED, true) | ||
.set(config.STORAGE_DECOMMISSION_ENABLED, isEnabled) | ||
sc = new SparkContext(conf) | ||
TestUtils.waitUntilExecutorsUp(sc, 2, 6000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got test failure in my PR #31131 (the PR is not related to the test I believe):
[info] BlockManagerDecommissionIntegrationSuite:
[info] - SPARK-32850: BlockManager decommission should respect the configuration (enabled=false) *** FAILED *** (6 seconds, 165 milliseconds)
[info] java.util.concurrent.TimeoutException: Can't find 2 executors before 6000 milliseconds elapsed
[info] at org.apache.spark.TestUtils$.waitUntilExecutorsUp(TestUtils.scala:374)
[info] at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.$anonfun$new$2(BlockManagerDecommissionIntegrationSuite.scala:52)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
What changes were proposed in this pull request?
This PR cleans up the RPC message flow among the multiple decommission use cases, it includes changes:
Keep
Worker
's decommission status be consistent between the case where decommission starts fromWorker
and the case where decommission starts from theMasterWebUI
: sendingDecommissionWorker
fromMaster
toWorker
in the latter case.Change from two-way communication to one-way communication when notifying decommission between driver and executor: it's obviously unnecessary for the executor to acknowledge the decommission status to the driver since the decommission request is from the driver. And it's same in reverse.
Only send one message instead of two(
DecommissionSelf
/DecommissionBlockManager
) when decommission the executor: executor andBlockManager
are in the same JVM.Clean up codes around here.
Why are the changes needed?
Before:
After:

(Note the diagrams only counts those RPC calls that needed to go through the network. Local RPC calls are not counted here.)
After this change, We reduced 6 original RPC calls and added one more RPC call for keeping the consistent decommission status for the Worker. And the RPC flow becomes more clear.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Updated existing tests.