Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32003][CORE] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost #28848

Closed
wants to merge 10 commits into from

Conversation

wypoon
Copy link
Contributor

@wypoon wypoon commented Jun 17, 2020

What changes were proposed in this pull request?

If an executor is lost, the DAGScheduler handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files.
In such a case, when fetches from the executor's outputs fail in the same stage, the DAGScheduler again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased.

We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros.

Why are the changes needed?

Without the changes, the loss of a node could require two stage attempts to recover instead of one.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New unit test. This test fails without the change and passes with it.

@wypoon
Copy link
Contributor Author

wypoon commented Jun 17, 2020

@attilapiros


// Shuffle files for hostA-exec should be lost
val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
assert(mapStatuses.count(_ != null) == 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the change this part fails.

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124153 has finished for PR 28848 at commit 4e976ab.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor

Build failure is unrelated. I have checked and there are some other PRs with the same error:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-install-plugin:3.0.0-M1:install (default-cli) on project spark-parent_2.12: ArtifactInstallerException: Failed to install metadata org.apache.spark:spark-parent_2.12/maven-metadata.xml: Could not parse metadata /home/jenkins/.m2/repository/org/apache/spark/spark-parent_2.12/maven-metadata-local.xml: in epilog non whitespace content is not allowed but got > (position: END_TAG seen ...</metadata>\n>... @13:2) -> [Help 1]

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wypoon!

There are just some comments otherwise it is fine.

@attilapiros
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124160 has finished for PR 28848 at commit 4e976ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Jun 17, 2020

cc @jiangxb1987

@wypoon
Copy link
Contributor Author

wypoon commented Jun 17, 2020

Thanks @attilapiros for the review and suggestions. Thanks @Ngone51 for reviewing as well.

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124180 has finished for PR 28848 at commit 4c0b98c.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Jun 18, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124184 has finished for PR 28848 at commit 4c0b98c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124225 has finished for PR 28848 at commit 4b18369.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Jun 18, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124226 has finished for PR 28848 at commit 4b18369.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Jun 18, 2020

@jiangxb1987 @squito can you please review?

@Ngone51
Copy link
Member

Ngone51 commented Jun 19, 2020

@wypoon Just a reminder, please do not resolve conversations before we reach into an agreement.

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124300 has finished for PR 28848 at commit 0069e71.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jun 20, 2020

Test build #124305 has finished for PR 28848 at commit 0069e71.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you for making a PR, @wypoon .

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125863 has finished for PR 28848 at commit 0e00862.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wypoon
Copy link
Contributor Author

wypoon commented Jul 15, 2020

PySpark packaging tests fail with

Writing pyspark-3.1.0.dev0/setup.cfg
creating dist
Creating tar archive
removing 'pyspark-3.1.0.dev0' (and everything under it)
Installing dist into virtual env
Processing ./python/dist/pyspark-3.1.0.dev0.tar.gz
Collecting py4j==0.10.9 (from pyspark==3.1.0.dev0)
  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
Installing collected packages: py4j, pyspark
  Found existing installation: py4j 0.10.9
    Uninstalling py4j-0.10.9:
      Successfully uninstalled py4j-0.10.9
  Found existing installation: pyspark 3.1.0.dev0
Exception:
Traceback (most recent call last):
  File "/home/anaconda/envs/py36/lib/python3.6/site-packages/pip/_internal/cli/base_command.py", line 179, in main
    status = self.run(options, args)
  File "/home/anaconda/envs/py36/lib/python3.6/site-packages/pip/_internal/commands/install.py", line 393, in run
    use_user_site=options.use_user_site,
  File "/home/anaconda/envs/py36/lib/python3.6/site-packages/pip/_internal/req/__init__.py", line 50, in install_given_reqs
    auto_confirm=True
  File "/home/anaconda/envs/py36/lib/python3.6/site-packages/pip/_internal/req/req_install.py", line 816, in uninstall
    uninstalled_pathset = UninstallPathSet.from_dist(dist)
  File "/home/anaconda/envs/py36/lib/python3.6/site-packages/pip/_internal/req/req_uninstall.py", line 505, in from_dist
    '(at %s)' % (link_pointer, dist.project_name, dist.location)
AssertionError: Egg-link /home/jenkins/workspace/SparkPullRequestBuilder@4/python does not match installed location of pyspark (at /home/jenkins/workspace/SparkPullRequestBuilder/python)
Cleaning up temporary directory - /tmp/tmp.nkhMEcKRla
[error] running /home/jenkins/workspace/SparkPullRequestBuilder/dev/run-pip-tests ; received return code 2

but I don't understand what that means. Any help?

@attilapiros
Copy link
Contributor

@wypoon

Check this out: https://issues.apache.org/jira/browse/SPARK-32303

@wypoon
Copy link
Contributor Author

wypoon commented Jul 21, 2020

@tgravescs @squito I made some small tweaks to the scaladoc comments and also had to rebase since you approved the change. It has been open for some time now to any additional feedback. Can you give it the once over again and merge it?

@squito
Copy link
Contributor

squito commented Jul 21, 2020

Jenkins, retest this please

@squito
Copy link
Contributor

squito commented Jul 21, 2020

lgtm still. Since its been a few days and there was a bit of flakiness before, I triggered one more set of tests just to make sure everything is still OK and then will merge

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126279 has finished for PR 28848 at commit 0e00862.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wypoon
Copy link
Contributor Author

wypoon commented Jul 22, 2020

I created #29182 for the backport to branch-2.4.

@@ -548,6 +560,56 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(mapStatus2(2).location.host === "hostB")
}

test("SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure") {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Do we need to describe when external shuffle service is used assumption here because we have conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") at line 567?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally, I'm OK with this as is, I think its OK for some of the details to be down in the test itself and balance a super-duper long name.

@dongjoon-hyun since you called this a nit I'm assuming you're OK with me merging this anyhow, but if not lemme know, can submit a quick followup.

@dongjoon-hyun
Copy link
Member

cc @holdenk and @dbtsai

@Ngone51
Copy link
Member

Ngone51 commented Jul 22, 2020

still LGTM

@asfgit asfgit closed this in e8c06af Jul 22, 2020
@squito
Copy link
Contributor

squito commented Jul 22, 2020

merged to master, thanks everyone!

@squito
Copy link
Contributor

squito commented Jul 22, 2020

@wypoon sorry, I mistakenly only merged this to master, but I should have also merged it to branch-3.0. Would you mind also opening a pr targeting branch-3.0?

cloud-fan pushed a commit that referenced this pull request Aug 18, 2020
### What changes were proposed in this pull request?

The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recently closed #29211 necessitates remembering the decommissioning shortly beyond the removal of the executor.

In addition to fixing this issue, ensure that DecommissionWorkerSuite continues to pass when executors haven't had a chance to exit eagery. That is the old behavior before #29211 also still works.

Added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Hardened the test DecommissionWorkerSuite to make it wait for successful job completion.

### Why are the changes needed?

First, let me describe the intended behavior of decommissioning: If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job. This fetch failure can happen before the executor is truly marked "lost" because of heartbeat delays.

- However, #29211 eagerly exits the executors when they are done decommissioning. This removal of the executor was racing with the fetch failure. By the time the fetch failure is triggered the executor is already removed and thus has forgotten its decommissioning information. (I tested this by delaying the decommissioning). The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- In addition the executor loss can also bump up `shuffleFileLostEpoch` (added in #28848). This happens because when the executor is lost, it forgets the shuffle state about just that executor and increments the `shuffleFileLostEpoch`. This incrementing precludes the clearing of state of the entire host when the fetch failure happens because the failed task is still reusing the old epoch. The fix here is also simple: Ignore the `shuffleFileLostEpoch` when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

I am strategically making both of these fixes be very local to decommissioning to avoid other regressions. Especially the version stuff is tricky (it hasn't been fundamentally changed since it was first introduced in 2013).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually ran DecommissionWorkerSuite several times using a script and ensured it all passed.

### (Internal) Configs added
I added two configs, one of which is sort of meant for testing only:
- `spark.test.executor.decommission.initial.sleep.millis`: Initial delay by the decommissioner shutdown thread. Default is same as before of 1 second. This is used for testing only. This one is kept "hidden" (ie not added as a constant to avoid config bloat)
- `spark.executor.decommission.removed.infoCacheTTL`: Number of seconds to keep the removed executors decom entries around. It defaults to 5 minutes. It should be around the average time it takes for all of the shuffle data to be fetched from the mapper to the reducer, but I think that can take a while since the reducers also do a multistep sort.

Closes #29422 from agrawaldevesh/decom_fixes.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
… outputs for executor on fetch failure after executor is lost

If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files.
In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased.

We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros.

Without the changes, the loss of a node could require two stage attempts to recover instead of one.

No.

New unit test. This test fails without the change and passes with it.

Closes #28848 from wypoon/SPARK-32003.

Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
9 participants