Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling #29367

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Aug 6, 2020

What changes were proposed in this pull request?

If graceful decommissioning is enabled, Spark's dynamic scaling uses this instead of directly killing executors.

Why are the changes needed?

When scaling down Spark we should avoid triggering recomputes as much as possible.

Does this PR introduce any user-facing change?

Hopefully their jobs run faster or at the same speed. It also enables experimental shuffle service free dynamic scaling when graceful decommissioning is enabled (using the same code as the shuffle tracking dynamic scaling).

How was this patch tested?

For now I've extended the ExecutorAllocationManagerSuite for both core & streaming.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 6, 2020

This is a rebase of #28818 now that its pre-requisites have been merged.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 6, 2020

cc @attilapiros & @agrawaldevesh

@holdenk
Copy link
Contributor Author

holdenk commented Aug 6, 2020

cc @cloud-fan who asked about the progress on a related PR in case he is interested.

@agrawaldevesh
Copy link
Contributor

agrawaldevesh commented Aug 6, 2020

@holdenk, I am a bit confused by the commit message of the only commit in this PR: "Shutdown executor once we are done decommissioning". Isn't this the recently merged PR #29211 (it went to master right) ?

Can you make sure that this commit is appropriately rebased on master with a commit like "Use graceful decommissioning as part of dynamic scaling"

@HyukjinKwon
Copy link
Member

cc @tgravescs, @mridulm, @squito, @Ngone51, @jiangxb1987 as well FYI

@SparkQA
Copy link

SparkQA commented Aug 6, 2020

Test build #127114 has finished for PR 29367 at commit 427b26c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2020

Test build #127113 has finished for PR 29367 at commit 3839d31.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 6, 2020

@holdenk, I am a bit confused by the commit message of the only commit in this PR: "Shutdown executor once we are done decommissioning". Isn't this the recently merged PR #29211 (it went to master right) ?

Can you make sure that this commit is appropriately rebased on master with a commit like "Use graceful decommissioning as part of dynamic scaling"

Ah yeah, if you click expand you can see it's just all squished down together into one commit and the full commit text covers everything. When it gets merged the commit message is picked from the title anyways but I'll rename the title line of the commit.

@holdenk holdenk force-pushed the SPARK-31198-use-graceful-decommissioning-as-part-of-dynamic-scaling branch from 427b26c to 38a413e Compare August 6, 2020 03:47
@agrawaldevesh
Copy link
Contributor

Ah yeah, if you click expand you can see it's just all squished down together into one commit and the full commit text covers everything. When it gets merged the commit message is picked from the title anyways but I'll rename the title line of the commit.

It would really help the review if you could please force push the rebased version with the commits properly separated/pruned.

Is this PR ready to be reviewed that you can do that ? Thanks !

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still confused about whether this PR is properly rebased to master branch or not.

As of commit 375d348, #29211 has been pushed to master.

I am not sure if I ended up re-reviewing some of the already pushed code or how much of this code is new.

@SparkQA
Copy link

SparkQA commented Aug 6, 2020

Test build #127122 has finished for PR 29367 at commit 38a413e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Aug 6, 2020

@holdenk Would you mind adding more description about your basic idea in this PR to integrate decommission and dynamic allocation?

@holdenk
Copy link
Contributor Author

holdenk commented Aug 6, 2020

I think the javadoc failure in GHA is unrelated, I'll rebase this in a bit (I can't reproduce locally though).

@holdenk holdenk force-pushed the SPARK-31198-use-graceful-decommissioning-as-part-of-dynamic-scaling branch from 780b00b to 3fa3313 Compare August 6, 2020 21:31
@SparkQA
Copy link

SparkQA commented Aug 6, 2020

Test build #127154 has finished for PR 29367 at commit 780b00b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2020

Test build #127156 has finished for PR 29367 at commit 3fa3313.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 7, 2020

I'm taking the next few days off (Friday-Sunday), I'll take another poke at this on Monday :)

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking one of the last looks after checking it out locally.

@@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor(
//
// This means that an executor may be marked as having shuffle data, and thus prevented
// from being removed, even though the data may not be used.
// TODO: Only track used files (SPARK-31974)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment change intended ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since we're eventually going to want to use intelligent metrics to decide who to scale down I'd like us to only track shuffle files that are being used not speculative ones. Doesn't need to be addressed right now which is why it's a TODO.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Holden,

I went through the code again. I feel that this is not yet in a state to be merged in because of issues marked as "[blocker]" inline.

I am also happy to sync offline to discuss them further.

@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(

var newNextTimeout = Long.MaxValue
timedOutExecs = executors.asScala
.filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle }
.filter { case (_, exec) =>
!exec.pendingRemoval && !exec.hasActiveShuffle && !exec.decommissioning}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through all of the usages of executor.pendingRemoval and executor.decommissioning flag: They are treated identically right now. That is for all practical purposes an executor being decommissioned is treated the same as an executor pending to be removed.

Do you have a use case in mind of why you would like to distinguish b/w these two states ? If you don't need to distinguish, the change would become simpler if you treat a decommissioned executor as pending removal.

I cannot see where this distinction is relevant in this PR, so perhaps you have a future use case in mind for this distinction ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^^ @holdenk ... any thoughts/followup on this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually I'd like us to have better logging and metrics around decommissioning and understanding it's impact versus blacklisting, although to be fair that isn't in the short term.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactoring of that helper method.

My other inline comments are mainly just redrawing your attention to some of the other comments I made over the weekend. No rush if you were already planning to address them in a bit !

(As an aside, do you have the ability to mark these older resolved comments as resolved ? I no longer see the resolve comment button on even my comments).

@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(

var newNextTimeout = Long.MaxValue
timedOutExecs = executors.asScala
.filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle }
.filter { case (_, exec) =>
!exec.pendingRemoval && !exec.hasActiveShuffle && !exec.decommissioning}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^^ @holdenk ... any thoughts/followup on this ?

@holdenk
Copy link
Contributor Author

holdenk commented Aug 10, 2020

Sorry I'm dealing with some other things so I only had the cycles to do a partial response to the comments. I'll try and get back to them tonight or tomorrow.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 10, 2020

(I'll also try and go through and resolve the old comments tonight).

@SparkQA
Copy link

SparkQA commented Aug 10, 2020

Test build #127292 has finished for PR 29367 at commit 01d4137.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk holdenk force-pushed the SPARK-31198-use-graceful-decommissioning-as-part-of-dynamic-scaling branch from 4a8ba4d to a099152 Compare August 11, 2020 04:42
@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127306 has finished for PR 29367 at commit a099152.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

holdenk and others added 4 commits August 11, 2020 11:39
Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup
@holdenk holdenk force-pushed the SPARK-31198-use-graceful-decommissioning-as-part-of-dynamic-scaling branch from 4d8b6cd to cc76ff5 Compare August 11, 2020 18:40
@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127342 has finished for PR 29367 at commit 4d8b6cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127343 has finished for PR 29367 at commit cc76ff5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much better, one blocker remaining if I am understanding the code properly.

I would like to know more about the failure of DecommissionWorkerSuite also please.

Thanks !

@@ -242,8 +242,10 @@ class DecommissionWorkerSuite
assert(jobResult === 2)
}
// 6 tasks: 2 from first stage, 2 rerun again from first stage, 2nd stage attempt 1 and 2.
val tasksSeen = listener.getTasksFinished()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you happen to recall the github actions error you got that lead to this change ? I would like to dig further because I invoke the listener using TestUtils.withListener(sc, listener): Which waits for the listener to drain and also removes the listener.

So I don't think wrapping this in an eventually should actually be doing anything: The listener has already been removed. Perhaps I ought to bring back the the "waiting for job done" inside of the getTasksFinished or as a separate call.

I would like to understand further just so that I can learn about some of the gotchas with this listener stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, it's probably not the listener. It's only showing up for me in GHA though - https://github.com/apache/spark/pull/29367/checks?check_run_id=972990200#step:14:13579

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets undo this change then. I am rerunning this PR locally to debug. Thanks for sharing the GHA link. It helps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I know the race. I will file another PR for this either against this PR or against master. The race is simply that we need to wait for the decommissioning to have happened before triggering the fetch failure. On a busy machine, the listener can be delayed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I'll back this change out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will take me a while to fix. I will make this fix against the master branch.

Apparently #29211 broke some of my state keeping that I was relying on in #29032 :-P. Let me think through how to fix this for real. So I think the test failure is real and it is worrisome that it isn't failing as frequently as it should.

Stay tuned for a PR to fix this but in the meanwhile please back out this test change. Thanks for surfacing this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so @HyukjinKwon also reported a test failure: #29014 (comment) and that is encouraging. I will work on a fix for this ASAP.

@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127345 has finished for PR 29367 at commit 6a69126.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

// If we don't want to replace the executors we are decommissioning
if (adjustTargetNumExecutors) {
Copy link
Contributor

@agrawaldevesh agrawaldevesh Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a check for executorsToDecommission.notEmpty ? Otherwise, we will request executors again with no change in the adjustExecutors helper method. Could again lead to some unnecessary strain on the driver.

Not a big deal because this is one time, since doDecommission isn't called again and again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me put that logic inside adjustExecutors :)

@@ -242,8 +242,10 @@ class DecommissionWorkerSuite
assert(jobResult === 2)
}
// 6 tasks: 2 from first stage, 2 rerun again from first stage, 2nd stage attempt 1 and 2.
val tasksSeen = listener.getTasksFinished()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets undo this change then. I am rerunning this PR locally to debug. Thanks for sharing the GHA link. It helps.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 12, 2020

Just to be clear what's the outstanding blocker in your opinion?

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR looks good to me and I have no blockers. The two things that we arrived at were:

  • Please backout the test DecommissionWorkerSuite change.
  • Consider not doing doRequestTotalExecutors in adjustExecutors if the input arg list is empty.

This is going to be great and finally brings decommissioning more prime time.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 12, 2020

Gotcha, I've got those two changes in now and I'll see how it goes in Jenkins/GHA :) Just an FYI to other folks since there are no outstanding blockers I intend to merge this once CI completes. If anyone needs more time to review please leave a comment and I'll hold off on merging.

@SparkQA
Copy link

SparkQA commented Aug 12, 2020

Test build #127389 has finished for PR 29367 at commit e970cb1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 548ac7c Aug 13, 2020
@dongjoon-hyun
Copy link
Member

Thank you, @holdenk and @agrawaldevesh .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants