Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-31197][CORE] Exit the executor once all tasks and migrations are finished built on top of on top of spark20629 #28817

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Jun 13, 2020

What changes were proposed in this pull request?

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

Why are the changes needed?

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.

Does this PR introduce any user-facing change?

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

How was this patch tested?

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

holdenk and others added 23 commits June 3, 2020 17:23
CR feedback "Nit: I think the comment is not needed as your code is self-explanatory here:"

Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
…Resolver.scala


If we have a failure during block migration, log the exception.

Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
### What changes were proposed in this pull request?
Enable `date.sql` and run it via Thrift Server in `ThriftServerQueryTestSuite`.

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the enabled tests via:
```
$ build/sbt -Phive-thriftserver "hive-thriftserver/test-only *ThriftServerQueryTestSuite -- -z date.sql"
```

Closes apache#28721 from MaxGekk/enable-date.sql-for-thrift.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
… well (help avoid cascading block migration)
…he and log the scheduler when asked to decom and we can't
…RemoteBlockSizeFetchToMem test that we don't need anymore, add back in submitting the thread I accidently took out in applying some CR feedback (a little fast on the ctrl-k)
@SparkQA
Copy link

SparkQA commented Jun 13, 2020

Test build #123957 has finished for PR 28817 at commit ea8efc7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @holdenk .. I know that this PR is in WIP but I couldn't spare myself taking a look :-).

Great and super thorough !. Thanks again for working on this.

// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
private var previousAllBlocksMigrated = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this variable be marked volatile ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, this will only be accessed in one thread.

if (executor != null) {
executor.decommission()
} else {
// If there's a running task it could store blocks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic of previousAllBlocks and allBlocks migrated is a bit confusing. Its not clear why the previous state has to be considered. I wonder if the following code can make this "history" aspect a bit clearer:

val allBlocksMigrated = !env.conf.get(STORAGE_DECOMMISSION_ENABLED) ||
      env.blockManager.decommissionManager.map(_.allBlocksMigrated).orElse(false)
val exitCondition = allBlocksMigrated && numRunningTasks == 0
if (exitCondition) { exitExecutor(...) }

Also, should we really be checking for numRunningTasks here ? What if some race condition caused some tasks to be scheduled onto us while we were marked for decom ?

Finally, should there be a timeout for how much time the executor will stay alive in decommissioned state ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a task is scheduled before we are asked to decom. You can verify this is covered by taking the logic out and watching the tests fail :) (There's an ungly thread sleep in the tests to make this possible).

Since the block migrations are not atomic, I do think we need the 2x logic, unfortunately, think of this situation:

  1. Task launches on executor
  2. Executor asked to decomission
  3. All blocks currently stored on executor are migrated
  4. Task stores a block
  5. We check numTasks & see all blocks are migrated, then exit without migrating the block stored by task SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID #4.

Now that being said that's probably a corner case, and arguably not super important since we're really only doing best effort, but I think for the overhead of one extra boolean it's worth it to cover this corner case.


private def decommissionSelf(): Boolean = {
if (!decommissioned) {
logInfo("Decommissioning self w/sync")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should expand what 'w/sync' stands for in the log message ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@@ -233,6 +233,7 @@ private[spark] class Executor(
* Mark an executor for decommissioning and avoid launching new tasks.
*/
private[spark] def decommission(): Unit = {
logInfo("Executor asked to decommission. Starting shutdown thread.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment looks stale. It should probably be moved to the CoarseGrainedBackendExecutor. Its also not clear to me what the decommission flag does in the Executor besides just logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just logging for now. The reason I propagate the message to the executor is so that if we end up in a state where the executor believes it decommissioned (say local SIGPWR) but the driver doesn't it could be weird so having some logging is useful.

@@ -52,6 +52,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage

case object DecommissionSelf extends CoarseGrainedClusterMessage // Mark as decommissioned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, the DecommissionSelf naming is a bit ambiguous: "Who is self here" ? The sender or the receiver ?

This message is now send from the driver to the executor: So perhaps we should just repurpose DecommissionExecutor with a check for the executorId ?

Not a big deal but trying to reduce the number of message types introduced by this feature ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think decommissionself is pretty clearly telling the receiver to decommission itself. That being said I'm open to renaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good ;-) I checked that DecommissionSelf is not indeed used anywhere else, so it should be unambiguous. Lets keep the name.

@@ -1887,7 +1891,7 @@ private[spark] class BlockManager(
* but rather shadows them.
* Requires an Indexed based shuffle resolver.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the comment needs to be updated to reflect what the return Boolean indicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

@volatile private var stopped = false
// Since running tasks can add more blocks this can change.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I am totally understanding this: You mean that the running tasks that were already running when the decommissioning was started at the executor ? Because, I think we refuse launching new tasks when the decommissioning has started, so the new blocks being written must be written by already running tasks. Did I get this right ?

Also, just to confirm I am still following along: I don't see this case handled in the existing BlockManagerSuite: I believe we are not testing writing new blocks while the decom/offload is in progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is covered, you can verify this by disabling this logic and seeing the test fail (albiet you'll have to run the test a few times because it becomes a race condition). Look at the "migrateDuring" flag for details.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments on how the executor should be exiting.

}
if (allBlocksMigrated) {
logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exitExecutor asynchronously sends RemoveExecutor to the driver. Does that actually make it to the driver ? There is also this question about if we should be using the same Shutdown/StopExecutor codepath for doing the stopping ? (But althought it seems that we do want to intimate to the driver that the executor is being removed).

Interestingly, the driver does indeed respond back with a StopExecutor and does trigger the clean shutdown path in the executor, but again I wonder if it is too late for it. Perhaps we shouldn't be calling System.exit here ?

Also, as currently written, this exitExecutor could cause job failures: Since the TaskSchedulerImpl will treat the ExecutorLossReason send by the executor to the driver as an exitCausedByApp and thus penalize the task. Instead, I think we shouldn't penalize the running job on a planned executor decommission. One workaround might be to actually respond back to the driver with ExecutorDecommission (which is not used elsewhere currently) and then handle that specifically in the TaskSchedulerImpl's determination of exitCausedByApp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's my understanding the TaskSchedulerImpl shouldn't have any job failures because we've waited for all the tasks on the executor to finish before calling this code path. Unless is there something I've missed there?

I think swapping out exit executor for instead telling the driver to stop the executor and avoiding the system.exit makes sense either way though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was talking about the case where we get shot down before we had a chance to cleanly exit on line 276. Say for example, some time out expires and the executor/node is brought down.

Are decom.sh and decommission-slave.sh expected to wait until the executor/worker process has properly shut down ? I think they have some timeouts in them to kill the executor ? Or consider a spot kill scenario where you got some warning (like 2 minutes) and then the machine is yanked out.

In this case, the executor will eventually be marked loss via a heartbeat/timeout. And that loss would be deemed as the fault of the task, and could cause job failures. I am wondering if we can fix that scenario of an unclean exit ?

One workaround I suggested above was to send a message to the driver saying that the executor is going to go away soon. When that happens (in a clean or unclean way), that loss shouldn't be attributed to the task.

Perhaps this unclean executor loss/timeout handling is follow up work ? We (or rather I) can create Jira's for this under the parent ticket :-).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, although I think this behaviour is covered by the changes in https://github.com/apache/spark/pull/26440/files (we only increment failures if the executors previous state was not decommissioning).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please double check that ? I couldn't find this behavior when scouring TaskSchedulerImpl, and TaskSetManager. The only place we check for an executor being decommissioned in that PR is when scheduling tasks (in CoarseGrainedSchedulerBackend#isExecutorActive). Thanks !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point to where in TaskSchedulerImpl it's going to fail the job? core/src/main/scala/org/apache/spark/deploy/master/Master.scala is where the current code is, but there might be an additional case that needs to be covered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {

In this match block, we will hit the default case which will treat the failure as having been caused by the app and thus penalize it.

This routine is called from TaskScheduler.executorLost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :) If you want to make a PR for that I'd be happy to review/merge since I think that would not depend on any of the in-flight PRs just the current code in master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely !. Thanks

@SparkQA
Copy link

SparkQA commented Jun 14, 2020

Test build #123983 has finished for PR 28817 at commit a2c0557.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @Ngone51 @jiangxb1987

@HyukjinKwon
Copy link
Member

cc @tgravescs too FYI

@holdenk
Copy link
Contributor Author

holdenk commented Jun 18, 2020

To be clear this a WIP PR and not yet ready for review. I created it to give additional context after talking with @agrawaldevesh about our respective goals. I'll try and get a "read for review" PR out next week.

@cloud-fan
Copy link
Contributor

any progress here?

asfgit pushed a commit that referenced this pull request Aug 5, 2020
### What changes were proposed in this pull request?

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of #28817

### Why are the changes needed?

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

### Does this PR introduce _any_ user-facing change?

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

### How was this patch tested?

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes #29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
@holdenk
Copy link
Contributor Author

holdenk commented Aug 5, 2020

It's been merged in the non-WIP version of this PR.

@holdenk holdenk closed this Aug 5, 2020
holdenk added a commit to holdenk/spark that referenced this pull request Oct 27, 2020
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Cleanup the merge

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .

To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.

Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.

The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.

Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.

This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.

New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.

This PR is a cleaned-up version of the previous WIP PR I made apache#28331 (thanks to attilapiros for his very helpful reviewing on it :)).

Closes apache#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-24266][K8S] Restart the watcher when we receive a version changed from k8s

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

No

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes apache#28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor

This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.

The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.

In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.

In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.

No

Tweaked an existing unit test in `AppClientSuite`

Closes apache#29032 from agrawaldevesh/plumb_decom_info.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

Without them decommissioning a lot of executors at a time leads to job failures.

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

No

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes apache#29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

No, unit test only change.

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes apache#29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of apache#28817

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes apache#29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>

Connect decommissioning to dynamic scaling

Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads

Make Spark's dynamic allocation use decommissioning

Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission

Fix up executor add for resource profile

Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.

Verify executors decommissioned, then killed by external external cluster manager are re-launched

Verify some additional calls are not occuring in the executor allocation manager suite.

Dont' close the watcher until the end of the test

Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors

bump numparts up to 6

Revert "bump numparts up to 6"

This reverts commit daf96dd.

Small coment & visibility cleanup

CR feedback/cleanup

Fix up the merge

CR feedback, move adjustExecutors to a common utility function

Exclude some non-public APIs

Remove junk

More CR feedback

Fix adjustExecutors backport

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

This test fails for me locally and from what I recall it's because we use a different method of resolving the bind address than upstream so disabling the test

Cleanup and drop watcher changes from the backport
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants