New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors #28619
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors #28619
Conversation
…executor kill interval
I am adding more UTs for this change. So added WIP in title. @holdenk Please give some feedback on the actual logic. |
cc @ScrapCodes, @vanzin, @jiangxb1987, @Ngone51 too |
Jenkins ok to test. |
Test build #123055 has finished for PR 28619 at commit
|
Test build #123089 has finished for PR 28619 at commit
|
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Outdated
Show resolved
Hide resolved
BTW, what do we do for those tasks if speculative is disabled? Waiting for executor lost action? |
@Ngone51 Thanks for the review.
If speculation is disabled, either the running tasks on those executors will succeed or they might fail and will be retried when the executor is taken away forcefully (due to node loss etc). |
Test build #123131 has finished for PR 28619 at commit
|
Test build #123194 has finished for PR 28619 at commit
|
…te-decommission-exec-tasks
Test build #123262 has finished for PR 28619 at commit
|
…te-decommission-exec-tasks
Test build #123367 has finished for PR 28619 at commit
|
@Ngone51 Thanks for the review. I have addressed all the review comments. Can you please take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM expect 2 minor comments, cc @holdenk @jiangxb1987
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Outdated
Show resolved
Hide resolved
@holdenk @cloud-fan Please review the changes. |
@holdenk @jiangxb1987 @Dooyoung-Hwang Please review the changes.
|
Test build #123668 has finished for PR 28619 at commit
|
…te-decommission-exec-tasks
…te-decommission-exec-tasks
Test build #123936 has finished for PR 28619 at commit
|
retest this please. |
Test build #124034 has finished for PR 28619 at commit
|
ConfigBuilder("spark.executor.decommission.killInterval") | ||
.doc("Duration after which a decommissioned executor will be killed forcefully." + | ||
"This config is useful for cloud environments where we know in advance when " + | ||
"an executor is going to go down after decommissioning signal Ex- around 2 mins " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: does the Ex in the "go down after decommissioning signal Ex- around 2 mins" stands for "example"?
Sorry I have not seen this abbreviation used before and still not sure it exists. What about "i.e." that even used in Spark documentation several times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to "i.e.".
"This config is useful for cloud environments where we know in advance when " + | ||
"an executor is going to go down after decommissioning signal Ex- around 2 mins " + | ||
"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + | ||
"used to decide what tasks running on decommission executors to speculate") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Missing dot from the end of sentence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments/question but the changes LGTM.
One more question: the checkAndSubmitSpeculatableTask
checks whether the number of running copies from the task is one:
if (!successful(index) && copiesRunning(index) == 1 && |
It could happen that we already have two running copies of the task but both are running on decommissioning executors (i.o on the same host). I am not sure whether this is worth to be considered (I mean here or maybe in a followup PR). What do you think?
@attilapiros Thanks for the review.
Yes it is possible that two copies can be running on decommissioning executors and so we will end up not speculating for it because of above checks. I felt it might be a corner case which we can take as separate/follow-up PR. |
Test build #124058 has finished for PR 28619 at commit
|
@holdenk @cloud-fan @Dooyoung-Hwang Please review the changes. |
.doc("Duration after which a decommissioned executor will be killed forcefully." + | ||
"This config is useful for cloud environments where we know in advance when " + | ||
"an executor is going to go down after decommissioning signal i.e. around 2 mins " + | ||
"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the timeout is decided by the cloud vendors? What does this config specify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan This config can be set by users based on their setups. If they are using AWS spot nodes, timeout can be set to somewhere around 120 seconds, if they are using fix duration 6hrs spot blocks (say they decommission executors at 5:45), timeout can be set to 15 mins and so on.
If user doesn't set this timeout, things will remain as they were and tasks running on decommission executors won't get any special treatment with respect to speculation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that Spark can get this timeout value from the cluster manager? So that users don't need to set it manually. cc @holdenk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan As per my understanding, Worker Decommissioning is getting triggered currently using SIGPWR signal (and not via some message coming from YARN/Kubernetes Cluster manager). So getting this timeout from Spark Cluster Manager might not be possible. We might be able to do this once Spark's Worker Decommissioning logic starts triggering via communication from YARN etc in future. cc @holdenk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there are some situations where we can know the length of time from the cluster manager or from Spark it's self, but not all. I think having a configurable default for folks who know their cloud provider environment makes sense
@holdenk @cloud-fan @Dooyoung-Hwang Please review the changes. |
I'll be holding off on this while we wait for the SPIP. |
Took a quick look, thanks for working on this. I think having a timeout to kill the executors regardless (e.g. a max decommissioning time) and the speculation are both useful. I'll follow up more once we've decided on the design in OSS. |
@holdenk Please review the changes. |
This looks good to me, I'm excited for us to have this improvement. Thanks for working on this and thanks to all the folks who took the time to review it. Sorry I have not been as active with the reviews quite as much as I want (new puppy joined our household last week). |
Merged to the development branch! |
shouldn't we trigger another round of jenkins test before merging this? |
This chunk of code is pretty standalone and theres no conflicts so I didn't feel it necessary, it's a judgement call and Jenkins is in the middle of an upgrade. |
…kSchedulerImpl realm ### What changes were proposed in this pull request? The decommissioning state is a bit fragment across two places in the TaskSchedulerImpl: #29014 stored the incoming decommission info messages in TaskSchedulerImpl.executorsPendingDecommission. While #28619 was storing just the executor end time in the map TaskSetManager.tidToExecutorKillTimeMapping (which in turn is contained in TaskSchedulerImpl). While the two states are not really overlapping, it's a bit of a code hygiene concern to save this state in two places. With #29422, TaskSchedulerImpl is emerging as the place where all decommissioning book keeping is kept within the driver. So consolidate the information in _tidToExecutorKillTimeMapping_ into _executorsPendingDecommission_. However, in order to do so, we need to walk away from keeping the raw ExecutorDecommissionInfo messages and instead keep another class ExecutorDecommissionState. This decoupling will allow the RPC message class ExecutorDecommissionInfo to evolve independently from the book keeping ExecutorDecommissionState. ### Why are the changes needed? This is just a code cleanup. These two features were added independently and its time to consolidate their state for good hygiene. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #29452 from agrawaldevesh/consolidate_decom_state. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com>
… executors This PR adds functionality to consider the running tasks on decommission executors based on some config. In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds. So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation. Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors. Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running. Added UT. Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks. Authored-by: Prakhar Jain <prakharjain09@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com>
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors This PR adds functionality to consider the running tasks on decommission executors based on some config. In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds. So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation. Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors. Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running. Added UT. Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks. Authored-by: Prakhar Jain <prakharjain09@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE . To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`. Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks. The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors. Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks. This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning. New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated. This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)). Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up. Lead-authored-by: Holden Karau <hkarau@apple.com> Co-authored-by: Holden Karau <holden@pigscanfly.ca> Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com> Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed. For more relevant information see here: fabric8io/kubernetes-client#1075 No Running spark-submit to a k8s cluster. Not sure how to make an automated test for this. If someone can help me out that would be great. Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change. Authored-by: Stijn De Haes <stijndehaes@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-32217] Plumb whether a worker would also be decommissioned along with executor This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along with the DecommissionExecutor message. The primary motivation is to know whether a decommissioned executor would also be loosing shuffle files -- and thus it is important to know whether the host would also be decommissioned. In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message. In the future, this `ExecutorDecommissionInfo` can be embellished for knowing how long the executor has to live for scenarios like Cloud spot kills (or Yarn preemption) and the like. No Tweaked an existing unit test in `AppClientSuite` Closes apache#29032 from agrawaldevesh/plumb_decom_info. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-32199][SPARK-32198] Reduce job failures during decommissioning This PR reduces the prospect of a job loss during decommissioning. It fixes two holes in the current decommissioning framework: - (a) Loss of decommissioned executors is not treated as a job failure: We know that the decommissioned executor would be dying soon, so its death is clearly not caused by the application. - (b) Shuffle files on the decommissioned host are cleared when the first fetch failure is detected from a decommissioned host: This is a bit tricky in terms of when to clear the shuffle state ? Ideally you want to clear it the millisecond before the shuffle service on the node dies (or the executor dies when there is no external shuffle service) -- too soon and it could lead to some wastage and too late would lead to fetch failures. The approach here is to do this clearing when the very first fetch failure is observed on the decommissioned block manager, without waiting for other blocks to also signal a failure. Without them decommissioning a lot of executors at a time leads to job failures. The task scheduler tracks the executors that were decommissioned along with their `ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure. No Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-) - Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-). Closes apache#29014 from agrawaldevesh/decom_harden. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite. Make the block manager decommissioning test be less flaky An interesting failure happens when migrateDuring = true (and persist or shuffle is true): - We schedule the job with tasks on executors 0, 1, 2. - We wait 300 ms and decommission executor 0. - If the task is not yet done on executor 0, it will now fail because the block manager won't be able to save the block. This condition is easy to trigger on a loaded machine where the github checks run. - The task with retry on a different executor (1 or 2) and its shuffle blocks will land there. - No actual block migration happens here because the decommissioned executor technically failed before it could even produce a block. To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned. The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor. I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it. No, unit test only change. Github checks. Ran this test 100 times, 10 at a time in parallel in a script. Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-31197][CORE] Shutdown executor once we are done decommissioning Exit the executor when it has been asked to decommission and there is nothing left for it to do. This is a rebase of apache#28817 If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished. Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible. The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet. I changed the unit test to not send the executor exit message and still wait on the executor exited message. Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone. Authored-by: Holden Karau <hkarau@apple.com> Signed-off-by: Holden Karau <hkarau@apple.com> Connect decommissioning to dynamic scaling Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads Make Spark's dynamic allocation use decommissioning Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission Fix up executor add for resource profile Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits. Verify executors decommissioned, then killed by external external cluster manager are re-launched Verify some additional calls are not occuring in the executor allocation manager suite. Dont' close the watcher until the end of the test Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors bump numparts up to 6 Revert "bump numparts up to 6" This reverts commit daf96dd. Small coment & visibility cleanup CR feedback/cleanup Cleanup the merge [SPARK-21040][CORE] Speculate tasks which are running on decommission executors This PR adds functionality to consider the running tasks on decommission executors based on some config. In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds. So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation. Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors. Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running. Added UT. Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks. Authored-by: Prakhar Jain <prakharjain09@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE . To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`. Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks. The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors. Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks. This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning. New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated. This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)). Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up. Lead-authored-by: Holden Karau <hkarau@apple.com> Co-authored-by: Holden Karau <holden@pigscanfly.ca> Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com> Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed. For more relevant information see here: fabric8io/kubernetes-client#1075 No Running spark-submit to a k8s cluster. Not sure how to make an automated test for this. If someone can help me out that would be great. Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change. Authored-by: Stijn De Haes <stijndehaes@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-32217] Plumb whether a worker would also be decommissioned along with executor This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along with the DecommissionExecutor message. The primary motivation is to know whether a decommissioned executor would also be loosing shuffle files -- and thus it is important to know whether the host would also be decommissioned. In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message. In the future, this `ExecutorDecommissionInfo` can be embellished for knowing how long the executor has to live for scenarios like Cloud spot kills (or Yarn preemption) and the like. No Tweaked an existing unit test in `AppClientSuite` Closes apache#29032 from agrawaldevesh/plumb_decom_info. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-32199][SPARK-32198] Reduce job failures during decommissioning This PR reduces the prospect of a job loss during decommissioning. It fixes two holes in the current decommissioning framework: - (a) Loss of decommissioned executors is not treated as a job failure: We know that the decommissioned executor would be dying soon, so its death is clearly not caused by the application. - (b) Shuffle files on the decommissioned host are cleared when the first fetch failure is detected from a decommissioned host: This is a bit tricky in terms of when to clear the shuffle state ? Ideally you want to clear it the millisecond before the shuffle service on the node dies (or the executor dies when there is no external shuffle service) -- too soon and it could lead to some wastage and too late would lead to fetch failures. The approach here is to do this clearing when the very first fetch failure is observed on the decommissioned block manager, without waiting for other blocks to also signal a failure. Without them decommissioning a lot of executors at a time leads to job failures. The task scheduler tracks the executors that were decommissioned along with their `ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure. No Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-) - Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-). Closes apache#29014 from agrawaldevesh/decom_harden. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite. Make the block manager decommissioning test be less flaky An interesting failure happens when migrateDuring = true (and persist or shuffle is true): - We schedule the job with tasks on executors 0, 1, 2. - We wait 300 ms and decommission executor 0. - If the task is not yet done on executor 0, it will now fail because the block manager won't be able to save the block. This condition is easy to trigger on a loaded machine where the github checks run. - The task with retry on a different executor (1 or 2) and its shuffle blocks will land there. - No actual block migration happens here because the decommissioned executor technically failed before it could even produce a block. To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned. The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor. I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it. No, unit test only change. Github checks. Ran this test 100 times, 10 at a time in parallel in a script. Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> [SPARK-31197][CORE] Shutdown executor once we are done decommissioning Exit the executor when it has been asked to decommission and there is nothing left for it to do. This is a rebase of apache#28817 If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished. Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible. The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet. I changed the unit test to not send the executor exit message and still wait on the executor exited message. Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone. Authored-by: Holden Karau <hkarau@apple.com> Signed-off-by: Holden Karau <hkarau@apple.com> Connect decommissioning to dynamic scaling Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads Make Spark's dynamic allocation use decommissioning Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission Fix up executor add for resource profile Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits. Verify executors decommissioned, then killed by external external cluster manager are re-launched Verify some additional calls are not occuring in the executor allocation manager suite. Dont' close the watcher until the end of the test Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors bump numparts up to 6 Revert "bump numparts up to 6" This reverts commit daf96dd. Small coment & visibility cleanup CR feedback/cleanup Fix up the merge CR feedback, move adjustExecutors to a common utility function Exclude some non-public APIs Remove junk More CR feedback Fix adjustExecutors backport This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test Cleanup and drop watcher changes from the backport
What changes were proposed in this pull request?
This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.
Why are the changes needed?
Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.
Does this PR introduce any user-facing change?
Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.
How was this patch tested?
Added UT.