Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost or fetch failure #18150

Closed
wants to merge 16 commits into from

Conversation

sitalkedia
Copy link

What changes were proposed in this pull request?

Currently, when we detect fetch failure, we only remove the shuffle files produced by the executor, while the host itself might be down and all the shuffle files are not accessible. In case we are running multiple executors on a host, any host going down currently results in multiple fetch failures and multiple retries of the stage, which is very inefficient. If we remove all the shuffle files on that host, on first fetch failure, we can rerun all the tasks on that host in a single stage retry.

How was this patch tested?

Unit testing and also ran a job on the cluster and made sure multiple retries are gone.

@sitalkedia sitalkedia changed the title Cleanup shuffle [SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost or fetch failure May 30, 2017
@sitalkedia
Copy link
Author

Please note that the old PR - #17088 was closed inadventantly as a stale PR. Refer to the old PR for more discussion.

cc - @tgravescs

@SparkQA
Copy link

SparkQA commented May 31, 2017

Test build #77559 has finished for PR 18150 at commit 500938d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 31, 2017

Test build #77557 has finished for PR 18150 at commit be3b3db.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
hostToUnregisterOutputs match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the thing I believe @JoshRosen was talking about in the jira was instead of invalidating all stages just invalidate the outputs for that mapSTage on that host.

So something like for(all execs on failed host) stage.removeOutputsOnExecutor(exec)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would be the ideal behavior in case of fetch failure. Consider the case where the DAG looks like this 1 -> 2 -> 3, if we see any fetch failure while running stage 3, the current behavior invalidates the files in stage 2 as well as stage 1. That way we make sure we rerun the tasks in stage 1 first then stage 2 and at last stage 3.

If we do not invalidate the files in stage 1 (as per @josh's suggestion), then we would unnecessarily rerun the tasks in stage 2 to encounter another fetch failure and realize that the files for stage 1 are also missing. This behavior would introduce significant latency overhead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sitalkedia, that's a good point: the real gains here don't necessarily come from the "multiple executors on a host" scenario; instead, it seems like the key benefit is avoiding a bunch of followup stage failures to discover the root of what needs to be recomputed. Your example in this comment is one of the most clear problem statements of this that I've seen so far.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I remember now: the problem is that the current code already avoids the problem with the "1 -> 2 -> 3" cascading retries issue because it's already treating any fetch failure as a complete executor output loss via

handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))

Over on the earlier PR, there were a number of arguments that "fetch failure does not imply lost executor", but those are kind of moot given that we're already handling it that way in today's code and before the changes proposed here.

So now I see why the PR description focuses on the multiple executors per host scenario: this isn't actually changing behavior in the single-executor-per-host world.

In the case where the host is legitimately down then most likely we'll get fetch failures from all of the executors on that host and will remove all of the outputs, so in some scenarios we'll happen to do the right thing. But I can imagine how there are scenarios on very large clusters where we'll get unlucky and won't observe fetch failures from the complete set of executors being served from that host's shuffle service; this could be especially likely to happen if the shuffle service is serving outputs from many dead executors which OOM'd and restarted during the map phase. Therefore I can understand the argument here that it's best to just round up.

The only real question here is how often we'd regret doing this in practice: how often is a fetch failure actually a transient issue? Given that there are already fetch retry mechanisms inside of tasks, I'm guessing that the "false positive" scenario is somewhat rare.

Therefore, I'm now persuaded that this is a good change and I have a better understanding of how this fits into the larger context of fetch failure handling issues.

Since this has been somewhat controversial, what do you think about compromising and adding a feature-flag which lets users opt-in to the old behavior (i.e. the flag just disables the promotion of "all outputs on executor lost" to "all outputs on host lost")?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen - I think this is a good idea to have this behavior configurable so that users have a fall back option. @tgravescs - What do you think?

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach looks good in general, I would vote to merge this mainly for two reasons:

  1. With Spark integrated with YARN, it may be sound to allow user configure to fail quickly on FetchFailure;
  2. The function removeOutputsOnHost is really useful, if we can say for sure a host is dead, we should really use that to clean up the invalid outputs.

@sitalkedia Could you take more effort to add a config for this and make it default to close the feature, so we can have this conservatively?

@@ -559,10 +559,8 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please keep the origin format.

* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnHost(host: String): Unit = {
Copy link
Contributor

@jiangxb1987 jiangxb1987 Jun 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is pretty overlap in code with removeOutputsOnExecutor, How about combine them as:

  def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = {
    ......
      val newList = prevList.filterNot(m => f(m.location))
    ......
  }

And in DAGScheduler, we can simply pass in the filter functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, done.

@sitalkedia
Copy link
Author

Thanks for the review @JoshRosen , @jiangxb1987. Added a config to turn of the behavior and addressed minor comments

@@ -190,6 +190,12 @@ class DAGScheduler(
/**
* Number of consecutive stage attempts allowed before a stage is aborted.
*/
private[scheduler] val unRegisterOutputOnHostOnFetchFailure =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please place correct comment over the value.

private[scheduler] val unRegisterOutputOnHostOnFetchFailure =
sc.getConf.getBoolean("spark.fetch.failure.unRegister.output.on.host", true)

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

/**
* Removes all shuffle outputs which satisfies the filter. Note that this will also
* remove outputs which are served by an external shuffle server (if one exists),
* as they are still registered with this execId.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't mention execId here, so we should update the comment.

@@ -138,17 +138,7 @@ private[spark] class ShuffleMapStage(
* registered with this execId.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update the comment here.

@@ -190,6 +190,12 @@ class DAGScheduler(
/**
* Number of consecutive stage attempts allowed before a stage is aborted.
*/
private[scheduler] val unRegisterOutputOnHostOnFetchFailure =
sc.getConf.getBoolean("spark.fetch.failure.unRegister.output.on.host", true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rename it to spark.files.fetchFailure.unRegisterOutputOnHost and make it default to false?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 - Do we want to set the default to false? If we believe that this is the correct and expected behavior, we should set it to true and in case we see issues, we can turn it off?

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no obvious right or wrong behavior due to the complex environment we face on FetchFailure, in some use cases(for example, the case Josh mentioned) it will benifit, but there are also other cases this will cause the job to run longer than expected(I think @tgravescs has explained much on this), so let's merge this feature and hide this temporarily, and wait for some days to gather enough feedback on this from other developers in Spark community, and make this the default behavior when we have enough confidence to do this.

@@ -562,7 +568,6 @@ class DAGScheduler(
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's keep this empty line too.

@@ -188,6 +188,12 @@ class DAGScheduler(
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)

/**
* If enabled, fetch failure will cause all the output on that host to be unregistered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrase this by

Whether to unregister all the outputs on the host in condition that we receive a FetchFailure, this is set default to false, which means, we only unregister the outputs related to the exact executor(instead of the host) on a FetchFailure.

@SparkQA
Copy link

SparkQA commented Jun 9, 2017

Test build #77853 has finished for PR 18150 at commit 5bd4bc3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

sitalkedia commented Jun 9, 2017

@jiangxb1987 - Sure, turned off the behavior by default.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Jun 9, 2017

Test build #77856 has finished for PR 18150 at commit 810a101.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2017

Test build #77859 has finished for PR 18150 at commit ec89ac1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2017

Test build #77864 has finished for PR 18150 at commit 78bce79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

@sitalkedia Could you rebase this with the latest master? Thanks!

@jiangxb1987
Copy link
Contributor

ping @JoshRosen @tgravescs @squito

* executor(instead of the host) on a FetchFailure.
*/
private[scheduler] val unRegisterOutputOnHostOnFetchFailure =
sc.getConf.getBoolean("spark.files.fetchFailure.unRegisterOutputOnHost", false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put this in org.apache.spark.internal.config?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -396,6 +396,69 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}

test("All shuffle files should on the slave should be cleaned up when slave lost") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: All shuffle files should on the slave -> All shuffle files on the slave

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks

assert(initialMapStatus1.map{_.location.executorId}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))

val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapOutputTracker.mapStatuses.get(1)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good eye, fixed.

@cloud-fan
Copy link
Contributor

LGTM, waiting for the rebase

@tgravescs
Copy link
Contributor

Sorry been out.on vacation I think invalidating all and having feature flag makes sense for now. If we get more data on it causing issues we can revisit. Sorry won't have time to review in detail for a couple days.

@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78015 has finished for PR 18150 at commit 74f285a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
hostToUnregisterOutputs = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more question: if worker lost, shouldn't we unregister outputs on that worker/host?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we can't get worker id here, nvm

@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78016 has finished for PR 18150 at commit ba2ca2a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in dccc0aa Jun 14, 2017
dataknocker pushed a commit to dataknocker/spark that referenced this pull request Jun 16, 2017
…of slave lost or fetch failure

## What changes were proposed in this pull request?

Currently, when we detect fetch failure, we only remove the shuffle files produced by the executor, while the host itself might be down and all the shuffle files are not accessible. In case we are running multiple executors on a host, any host going down currently results in multiple fetch failures and multiple retries of the stage, which is very inefficient. If we remove all the shuffle files on that host, on first fetch failure, we can rerun all the tasks on that host in a single stage retry.

## How was this patch tested?

Unit testing and also ran a job on the cluster and made sure multiple retries are gone.

Author: Sital Kedia <skedia@fb.com>
Author: Imran Rashid <irashid@cloudera.com>

Closes apache#18150 from sitalkedia/cleanup_shuffle.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants