Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-20629][CORE] Copy shuffle data when nodes are being shutdown #28331

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Apr 24, 2020

Why are the changes needed?

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

Does this PR introduce any user-facing change?

This PR introduces two new configs parameters, spark.storage.decommission.shuffle_blocks & spark.storage.decommission.rdd_blocks that control which blocks should be migrated during storage decommissioning.

How was this patch tested?

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

@SparkQA
Copy link

SparkQA commented Apr 24, 2020

Test build #121782 has finished for PR 28331 at commit 4126c1b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RDDBlockSavedOnDecommissionedBlockManagerException(blockId: RDDBlockId)

@holdenk
Copy link
Contributor Author

holdenk commented Apr 24, 2020

If you've got some cycles to take a look at the WIP PR @prakharjain09 I'd appreciate it. This builds on top of the cache block migration.

Why this PR is still a work in progress:

  1. Only works with indexed shuffle files
  2. I'm not sure if this is the best way to copy shuffle files between executors

Future work (not planned in this PR but in the future):

  1. Supporting write back to some type of DFS
  2. Updating the listener that tracks location of shuffle blocks for scale down to understand block migrations
  3. Integrate decommissioning into our planned scale down, not just cluster manager/cloud triggered scale downs.

@SparkQA
Copy link

SparkQA commented Apr 24, 2020

Test build #121784 has finished for PR 28331 at commit 8ee8949.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk holdenk changed the title [WIP][SPARK-20629] Copy shuffle data when nodes are being shutdown [WIP][SPARK-20629][CORE] Copy shuffle data when nodes are being shutdown Apr 24, 2020
@SparkQA
Copy link

SparkQA commented Apr 25, 2020

Test build #121785 has finished for PR 28331 at commit 4071ae2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

cc @dbtsai

@SparkQA
Copy link

SparkQA commented May 1, 2020

Test build #122153 has finished for PR 28331 at commit ff620ba.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented May 1, 2020

Jenkins retest this please

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented May 1, 2020

The original cache block migration is reverted, isn't it? IIRC, the conflicts have been here after reverting it.

@holdenk
Copy link
Contributor Author

holdenk commented May 1, 2020

Oh yeah that's the conflicts, but I'm curious about the 400 minute timeout issue in Jenkins since I'm getting different results with the executor exiting locally.

@SparkQA
Copy link

SparkQA commented May 1, 2020

Test build #122178 has finished for PR 28331 at commit ff620ba.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 2, 2020

Test build #122181 has finished for PR 28331 at commit adb03db.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 2, 2020

Test build #122183 has finished for PR 28331 at commit be2a5e7.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

Comment on lines +209 to +211
if (file.exists()) {
file.delete()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the file exists, does it mean there is index/data file with same shuffle id and map id? When it could happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this should never happen, I'm not sure though let me do some thinking on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this mirrors the logic inside of writeIndexFileAndCommit, the matching check there was introduced in SPARK-17547
which I believe is for the situation where an exception occurred during a previous write and the filesystem is in a dirty state. So I think we should keep it to be safe.

@prakharjain09
Copy link
Contributor

Test build #122183 has finished for PR 28331 at commit be2a5e7.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@holdenk All tests started passing after this commit of yours. This includes all my changes also. Any specific change you know of in your branch which has fixed all the tests?

@holdenk
Copy link
Contributor Author

holdenk commented May 4, 2020

Jenkins retest this please.

@SparkQA
Copy link

SparkQA commented May 4, 2020

Test build #122279 has finished for PR 28331 at commit be2a5e7.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good first step to experiment with the problem.

With this solution even the handling of fetch failures can remain as it is, right?
I mean currently fetch failure lead to an assumption that all the blocks created by the executor are lost. Here if we have not got enough time to finish the copying of the shuffle blocks then the error handling logic will detect the decommissioned executor as the source of failure as just successfully copied blocks locations are rewritten.

I am wondering whether we could sort the shuffle files by their importance before offloading them: we definitely need the shuffle files from the last stage. We could start with them.

Comment on lines 1905 to 1906
deadPeers.map{peer =>
migrationPeers.get(peer).map(_.running = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in both lines use foreach instead of map

((BlockId, ManagedBuffer), (BlockId, ManagedBuffer)) = {
// Load the index block
val indexFile = getIndexFile(shuffleId, mapId)
val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use NOOP_REDUCE_ID instead of 0


// Load the data block
val dataFile = getDataFile(shuffleId, mapId)
val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use NOOP_REDUCE_ID instead of 0

@@ -650,6 +656,19 @@ private[spark] class BlockManager(
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID = {
// Delegate shuffle blocks here to resolver if supported
if (blockId.isShuffle || blockId.isInternalShuffle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The (blockId.isShuffle || blockId.isInternalShuffle) covers:

  • ShuffleBlockId
  • ShuffleBlockBatchId
  • ShuffleDataBlockId
  • ShuffleIndexBlockId

But the putShuffleBlockAsStream() is prepared for only these two:

And as I know shuffle block batch IDs are used for fetching blocks where for the same map ID continuous reducer IDs are requested. To merge those continues block ids together and fetch them with one request so I assume they are not what we need here.

Finally the getMigrationBlocks() returns only a ShuffleIndexBlockId and a ShuffleDataBlockId which is correct.

We should harmonise three places (methods) regarding the block IDs they are handling: use the isInternalShuffle within this condition and fix putShuffleBlockAsStream().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, I wanted to delegate all shuffle blocks for when we generalize the interface a bit but I haven't done that yet. I'll revisit this once we get the cache block sorted out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I've changed the interface to be more general, so I think probably this makes sense now. For the Index resolver we're still only going to see Data & index blocks, but if other resolvers implement the trait then we might see other blocks transfered.

val file = blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
getIndexFile(shuffleId, mapId)
case ShuffleBlockBatchId(shuffleId, mapId, _, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be ShuffleDataBlockId.

To get this method tested with small data I suggest to set spark.network.maxRemoteBlockSizeFetchToMem to a very low value i.e. to 1.

After that without fixing this issue you will see the updated exception message (see a few lines below) will be: java.lang.Exception: Unexpected shuffle block transfer shuffle_0_11_0.data as ShuffleDataBlockId

And unfortunately the test will still pass as the corresponding index file shuffle_0_11_0.index was successfully migrated and the assert in the suite checks whether at least one migrated shuffle block:

assert(numLocs > 0, s"No shuffle block updates in ${blocksUpdated}")

The shuffle data file probably was still served from the decommissioned executor otherwise the recalculation would be detected by the number of successful tasks/accumulator.

Please consider covering this case by introducing a new boolean flag to control this setting and check at least one shuffle migration test along with this setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a more rigorious test that fails as is and then I'll fix the issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And thanks for the suggestion, very useful :)

case _ =>
logError(s"Unexpected shuffle block type ${blockId}")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in case of shuffle files the rest is not needed as it is enough to update the mapOutputTracker and we can skip updating the blockLocations which is only used for broadcast and RDD blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense +1

eventually(timeout(15.seconds), interval(10.milliseconds)) {
if (persist) {
// One of our blocks should have moved.
val blockLocs = blocksUpdated.map{ update =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is a test case where both RDD and shuffle block migration are tested we should filter for RDD blocks here.

Actually I have tried it by

Suggested change
val blockLocs = blocksUpdated.map{ update =>
val blockLocs = blocksUpdated.filter(_.blockUpdatedInfo.blockId.isRDD).map{ update =>

But then the RDD tests started to fail as not RDD blocks was satisfying the condition below but broadcast variables, when I logged out blocksToManagers.filter(_._2 > 1) I got:

broadcast_1_piece0 -> 4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think there is some type issues with how isRDD is implemented (namely it's checking the type and we've got type erasure going on). I'll fix the isRDD impl and add that as a filter.

…Resolver.scala


add more information on unexpected block.

Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
@SparkQA
Copy link

SparkQA commented May 29, 2020

Test build #123297 has finished for PR 28331 at commit 069dd3b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…dex blocks have both been migrated, check that RDD blocks are duplicated not just broadcast blocks, make the number of partitions smaller so the test can run faster, avoid the Thread.sleep for all of the tests except for the midflight test where we need it, check for the broadcast blocks landing (further along in scheduling) beyond just task start, force fetching the shuffle block to local disk if in shuffle block test mode, start the job as soon as the first executor comes online.
@SparkQA
Copy link

SparkQA commented May 30, 2020

Test build #123301 has finished for PR 28331 at commit 6340f9b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// Force fetching to local disk
if (shuffle) {
conf.set("spark.network.maxRemoteBlockSizeFetchToMem", "1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry here might be some misunderstanding we should test shuffle migration with both streamed and non-streamed upload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so I can have this be a param to the test.

…py so lets call it baseRdd, and test both small blocksize to mem and not
…ote: this fails in forced migrate to disk + some logging
@SparkQA
Copy link

SparkQA commented Jun 1, 2020

Test build #123394 has finished for PR 28331 at commit 4cfeb8e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…e test now passes so yay), re-enable the other tests I disabled while debugging. Add a bit more logging.
@SparkQA
Copy link

SparkQA commented Jun 1, 2020

Test build #123396 has finished for PR 28331 at commit e81aa5a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Jun 2, 2020

If this passes the new tests I'm going to make a new non-WIP PR. Thank you for all your help reviewing @attilapiros the suggestions have really sped up my work here :)

@SparkQA
Copy link

SparkQA commented Jun 2, 2020

Test build #123400 has finished for PR 28331 at commit 155aeb2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 2, 2020

Test build #123409 has finished for PR 28331 at commit 7e32341.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 2, 2020

Test build #123407 has finished for PR 28331 at commit 7f93df6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Comment on lines 1589 to 1594
/**
* :: DeveloperApi ::
* Deregister the listener from Spark's listener bus.
*/
@DeveloperApi
def getExecutorIds(): Seq[String] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move this back to private[spark] (also the comment I have seems wrong so revert the comment)

Comment on lines 1734 to 1735
@DeveloperApi
def decommissionExecutors(executorIds: Seq[String]): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think private[spark] as well

@holdenk
Copy link
Contributor Author

holdenk commented Jun 2, 2020

I've made a non-WIP PR based on this, so closing this on for now.

@holdenk holdenk closed this Jun 2, 2020
@SparkQA
Copy link

SparkQA commented Jun 2, 2020

Test build #123446 has finished for PR 28331 at commit a3aa8eb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jul 20, 2020
### What is changed?

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

### Why are the changes needed?

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

### Does this PR introduce any user-facing change?

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

### How was this patch tested?

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made #28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes #28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>
holdenk added a commit to holdenk/spark that referenced this pull request Oct 27, 2020
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Cleanup the merge

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Fix up the merge

CR feedback, move adjustExecutors to a common utility function

Exclude some non-public APIs

Remove junk

More CR feedback

Fix adjustExecutors backport

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

Cleanup and drop watcher changes from the backport
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants