Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown #28708

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Jun 2, 2020

What is changed?

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the MapOutputTracker has been extended to allow the location of shuffle files to be updated with updateMapOutput. When a shuffle block is put, a block update message will be sent which triggers the updateMapOutput.

Instead of rejecting remote puts of shuffle blocks BlockManager delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Why are the changes needed?

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

Does this PR introduce any user-facing change?

This PR introduces two new configs parameters, spark.storage.decommission.shuffleBlocks.enabled & spark.storage.decommission.rddBlocks.enabled that control which blocks should be migrated during storage decommissioning.

How was this patch tested?

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made #28331 (thanks to @attilapiros for his very helpful reviewing on it :)).

@holdenk
Copy link
Contributor Author

holdenk commented Jun 2, 2020

cc reviewers from the WIP PR: @attilapiros , @dongjoon-hyun , @prakharjain09 , @viirya

@SparkQA
Copy link

SparkQA commented Jun 2, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28066/

@SparkQA
Copy link

SparkQA commented Jun 2, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28066/

@SparkQA
Copy link

SparkQA commented Jun 2, 2020

Test build #123448 has finished for PR 28708 at commit 4730a52.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait MigratableResolver
  • class RDDBlockSavedOnDecommissionedBlockManagerException(blockId: RDDBlockId)

@holdenk holdenk force-pushed the SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up branch from 4730a52 to 7fb4356 Compare June 3, 2020 00:23
@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28072/

@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28072/

@HyukjinKwon
Copy link
Member

cc @vanzin, @jiangxb1987, @Ngone51 too FYI

@holdenk
Copy link
Contributor Author

holdenk commented Jun 3, 2020

I think the k8s failure is unrelated (e.g. java.lang.RuntimeException: Unable to load a Suite class that was discovered in the runpath: org.apache.spark.deploy.master.ui.MasterWebUISuite) seems like a problem with the UI suite.

@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Test build #123452 has finished for PR 28708 at commit 7fb4356.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait MigratableResolver
  • class RDDBlockSavedOnDecommissionedBlockManagerException(blockId: RDDBlockId)

@attilapiros
Copy link
Contributor

I think we should test the offloading of the shuffle blocks at the unit test level so I suggest to add new test to the BlockManagerSuite:

  test("test migration of shuffle blocks during decommissioning") {
    val store1 = makeBlockManager(3500, "exec1")
    val store2 = makeBlockManager(3500, "exec2")

    val blockSize = 5
    val shuffleDataBlockContent = Array[Byte](0, 1, 2, 3, 4)
    val shuffleData = ShuffleDataBlockId(0, 0, 0)
    Files.write(store1.diskBlockManager.getFile(shuffleData).toPath(), shuffleDataBlockContent)
    val shuffleIndexBlockContent = Array[Byte](5, 6, 7, 8, 9)
    val shuffleIndex = ShuffleIndexBlockId(0, 0, 0)
    Files.write(store1.diskBlockManager.getFile(shuffleIndex).toPath(), shuffleIndexBlockContent)

    mapOutputTracker.registerShuffle(0, 1)
    mapOutputTracker.registerMapOutput(0, 0, MapStatus(store1.blockManagerId, Array(blockSize), 0))
    assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === store1.blockManagerId)

    val env = mock(classOf[SparkEnv])
    when(env.blockManager).thenReturn(store1)
    when(env.conf).thenReturn(conf)

    SparkEnv.set(env)
    store1.offloadShuffleBlocks()

    eventually(timeout(1.second), interval(10.milliseconds)) {
      assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === store2.blockManagerId)
    }
    assert(Files.readAllBytes(store2.diskBlockManager.getFile(shuffleData).toPath())
      === shuffleDataBlockContent)
    assert(Files.readAllBytes(store2.diskBlockManager.getFile(shuffleIndex).toPath())
      === shuffleIndexBlockContent)
  }

I think this could be a good starting point for any other tests at this level. This way we could check the streaming upload here and remove it from the BlockManagerReplicationSuite which is a higher level test (gravitate towards the integration testing). What do you think?

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a bunch of NITs and potential thread leak so no big issues but I plan to do some more testing.

@holdenk holdenk force-pushed the SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up branch from 7fb4356 to d988e7f Compare June 3, 2020 19:35
@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28122/

@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28122/

@holdenk
Copy link
Contributor Author

holdenk commented Jun 3, 2020

So I don't want to stop the executor directly once the block migration is done. Instead, I have a follow-up JIRA which I've started working on that shutdowns the executor once the block migration has completed and there are no running tasks. I think it's ok to (temporarily) leak threads since decommissioning (as triggered currently) is only in the situation where the executor will be exiting soon anyways.

@holdenk
Copy link
Contributor Author

holdenk commented Jun 3, 2020

I like the idea of adding a more specific unit test for the streaming upload so we can save the (slower) more integration style test for the other components, thanks for writing the test @attilapiros :)

@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Test build #123499 has finished for PR 28708 at commit d988e7f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait MigratableResolver
  • class RDDBlockSavedOnDecommissionedBlockManagerException(blockId: RDDBlockId)

@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28133/

@SparkQA
Copy link

SparkQA commented Jun 3, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28134/

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28134/

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28133/

@holdenk holdenk force-pushed the SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up branch from 7dde80e to 9c8836a Compare June 4, 2020 00:23
@holdenk
Copy link
Contributor Author

holdenk commented Jun 4, 2020

Hey @attilapiros can you explain to my why you think we need to test the different kinds of block fetches? When we migrate we're always migrating to disk so I'm not seeing how it should matter.

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Test build #123510 has finished for PR 28708 at commit b2da4c5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28137/

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/28137/

@shaneknapp
Copy link
Contributor

test this please

… need an explicit test for it here and the block manager is stubbed out in IndexShuffleBlockResolverSuite
@holdenk
Copy link
Contributor Author

holdenk commented Jul 17, 2020

Hey folks it's the end of the workweek. I just want to check in and see if people believe this PR is still under active discussion, or if once it resolves in Jenkins it's ok to move forward with the merge.

@jiangxb1987
Copy link
Contributor

We can merge it, I may create some followup tickets but I don't want to block the PR for longer.

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/30686/

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/30689/

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/30686/

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/30689/

@holdenk
Copy link
Contributor Author

holdenk commented Jul 17, 2020

sounds good, I'll work on resolving the test issues. @jiangxb1987 if you want to make the follow up issues under the decommissioning umbrella issue it'll make tracking it easier :)

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126073 has finished for PR 28708 at commit 9d210f5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Jul 17, 2020

It looks like the R test is failing even with the upgrade. I'm going to disable it and file a blocker to re-enable it unless folks object to that approach. (For context the R K8s tests have been broken for an extended period of time, otherwise I'd normally wait awhile).

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126076 has finished for PR 28708 at commit 2467732.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/30691/

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/30693/

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/30691/

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/30693/

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126080 has finished for PR 28708 at commit 8494bdd.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126078 has finished for PR 28708 at commit 16b7376.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Jul 19, 2020

The python packaging tests are failing on Jenkins post upgrade and this passes all of the GH actions so unless there is any more discussion I intend to merge this tomorrow.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the code once more and I found no issues to block this PR.

LGTM

logDebug("Nothing to migrate")
// Nothing to do right now, but maybe a transfer will fail or a new block
// will finish being committed.
val SLEEP_TIME_SECS = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine as I have seen this pattern used in the code somewhere else too, ie.:

I believe this is for emphasizing here is something to be set only once with a correct default (by code as opposed to a config param). So reading the code it raises attention that here an important choice was made.

@asfgit asfgit closed this in a4ca355 Jul 20, 2020
@holdenk
Copy link
Contributor Author

holdenk commented Jul 20, 2020

Merged to dev branch

@dbtsai
Copy link
Member

dbtsai commented Jul 20, 2020

Thanks! This is a great milestone.

@dongjoon-hyun
Copy link
Member

Thank you all!

@holdenk
Copy link
Contributor Author

holdenk commented Jul 20, 2020

Thanks everyone who took the time to review I know the discussion thread hear got a bit longer than usual. I’m really excited to get back to reviewing the PRs that build on top of this this week.

holdenk added a commit to holdenk/spark that referenced this pull request Oct 27, 2020
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Cleanup the merge

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Fix up the merge

CR feedback, move adjustExecutors to a common utility function

Exclude some non-public APIs

Remove junk

More CR feedback

Fix adjustExecutors backport

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

Cleanup and drop watcher changes from the backport
Comment on lines +170 to +182
if (persist) {
// One of our blocks should have moved.
val rddUpdates = blocksUpdated.filter { update =>
val blockId = update.blockUpdatedInfo.blockId
blockId.isRDD}
val blockLocs = rddUpdates.map { update =>
(update.blockUpdatedInfo.blockId.name,
update.blockUpdatedInfo.blockManagerId)}
val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
assert(!blocksToManagers.filter(_._2 > 1).isEmpty,
s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" +
s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
}
Copy link
Contributor

@juliuszsompolski juliuszsompolski Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk
If shuffle == true and when == TASK_STARTED or TASK_ENDED (edit: at this point in time it was migrateDuring == true, isn't it feasible that a block wouldn't have moved:
Node decomissioning was triggered during the mapper stage (after task started, or after task ended). Wouldn't it be feasible that decomissioning finished before the reducer stage even started, and hence the persisted rdd blocks never moved, but the reducer stage was executed on the 2 remaining executors in the first place?

The test passes, so it seems to not be happening like that, but I am facing some failures here when making an unrelated change, so I'm wondering if I understand something wrong?

Copy link
Contributor

@juliuszsompolski juliuszsompolski Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, the (later added) test with both shuffle==true and persist==true was running only with migrateDuring==false (JobEnded). So this scenario was not exercised.

  test("verify that both migrations can work at the same time") {
    runDecomTest(true, true, JobEnded)
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet