Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException #5636

Closed
wants to merge 50 commits into from

Conversation

ilganeli
Copy link

The Stage class now tracks whether there were a sufficient number of consecutive failures of that stage to trigger an abort.

To avoid an infinite loop of stage retries, we abort the job completely after 4 consecutive stage failures for one stage. We still allow more than 4 consecutive stage failures if there is an intervening successful attempt for the stage, so that in very long-lived applications, where a stage may get reused many times, we don't abort the job after failures that have been recovered from successfully.

I've added test cases to exercise the most obvious scenarios.

…ting function to check whether to abort a stage when it fails for a single reason more than N times.
@SparkQA
Copy link

SparkQA commented Apr 22, 2015

Test build #30773 has finished for PR 5636 at commit 40aefbe.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StageFailure(failureReason : String)
  • This patch does not change any dependencies.

@@ -96,6 +96,30 @@ class DAGScheduler(
// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]

// The maximum number of times to retry a stage before aborting
val maxStageFailures = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make this a conf? there is already spark.task.maxFailures, so how about spark.stage.maxFailures? Also it should get added to the docs

@SparkQA
Copy link

SparkQA commented Apr 22, 2015

Test build #30775 has finished for PR 5636 at commit f8744be.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StageFailure(failureReason : String)
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 22, 2015

Test build #30778 has finished for PR 5636 at commit 8fe31e0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 22, 2015

Test build #30779 has finished for PR 5636 at commit 729b7ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@squito
Copy link
Contributor

squito commented Apr 22, 2015

Thanks @ilganeli I took a quick look and have some high-level comments:

  • Checking for the exact same string is too restrictive IMO. Eg., will the failure message include host names of the fetch failed? Even without that detail, I bet there are a lot of cases where the same real error can result in different msgs.
  • I think the count for stage failure should be reset every time the job completes. If you have a really long running job , I could imagine some stage that is dependency on many downstream stages (eg., imagine a streaming job, where some common RDD is joined against lots of incoming batches). On a big cluster, eventually nodes will go down and stages will fail, but as long as the subsequent retry works, everything is fine. Over time, that same stage might fail a number of times, but as long as there is no more than one failure between each success, it would be completely normal (even expected to some extent).
  • Maybe we should still allow the old behavior of infinite retry, eg., maybe if the spark.stage.maxFailures is set to -1? Though to be honest I can't really think of any reason you'd want infinite retry, I just wonder if we should leave the door open since it is a behavior change.

thanks for working on this! This will be a great addition, I've seen this come up in a number of cases and its really hard for the average user to figure out what is going on, this will be a big help.

@ilganeli
Copy link
Author

@squito Given that we can't check against the failure message (as I expected), any ideas on what we can do instead? Is this information exposed in any way at the level of the DAGScheduler or do I need to figure out a mechanism to propagate the error info up in a more detailed way. I can add the config change to allow infinite retries and I'll add the clear at the end of the job - that seems reasonable.

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30893 has finished for PR 5636 at commit d5fa622.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30896 has finished for PR 5636 at commit 0335b96.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@ilganeli
Copy link
Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30947 has started for PR 5636 at commit 0335b96.

@ilganeli
Copy link
Author

retest this please

@squito
Copy link
Contributor

squito commented Apr 25, 2015

Hi @ilganeli thanks for updating this. Not sure if you are still working on this or not, but we definitely need tests for the new behavior as well. There are tests around fetch failures in DagSchedulerSuite already, so you can probably add something which follows those examples.

@squito
Copy link
Contributor

squito commented Apr 25, 2015

btw I have no idea what is going on in those test failures ... do the tests pass when you run them locally?

@ilganeli
Copy link
Author

No Imran - they don't. However I see the same on he master branch. I don't think they have anything to do with my changes.

Sent with Good (www.good.com)

-----Original Message-----
From: Imran Rashid [notifications@github.commailto:notifications@github.com]
Sent: Friday, April 24, 2015 09:55 PM Eastern Standard Time
To: apache/spark
Cc: Ganelin, Ilya
Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636)

btw I have no idea what is going on in those test failures ... do the tests pass when you run them locally?


Reply to this email directly or view it on GitHubhttps://github.com//pull/5636#issuecomment-96120951.


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

@ilganeli
Copy link
Author

Roger - I'll add tests to the suite.

Sent with Good (www.good.com)

-----Original Message-----
From: Imran Rashid [notifications@github.commailto:notifications@github.com]
Sent: Friday, April 24, 2015 09:54 PM Eastern Standard Time
To: apache/spark
Cc: Ganelin, Ilya
Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636)

Hi @ilganelihttps://github.com/ilganeli thanks for updating this. Not sure if you are still working on this or not, but we definitely need tests for the new behavior as well. There are tests around fetch failures in DagSchedulerSuite already, so you can probably add something which follows those examples.


Reply to this email directly or view it on GitHubhttps://github.com//pull/5636#issuecomment-96120931.


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

@ilganeli
Copy link
Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 27, 2015

Test build #30990 has finished for PR 5636 at commit 0335b96.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch adds the following new dependencies:
    • tachyon-0.6.4.jar
    • tachyon-client-0.6.4.jar
  • This patch removes the following dependencies:
    • tachyon-0.5.0.jar
    • tachyon-client-0.5.0.jar

@SparkQA
Copy link

SparkQA commented Apr 28, 2015

Test build #31083 has finished for PR 5636 at commit 914b2cb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@ilganeli
Copy link
Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 28, 2015

Test build #31107 has finished for PR 5636 at commit 914b2cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.


private[scheduler] object Stage {
// The number of consecutive failures allowed before a stage is aborted
val MAX_CONSECUTIVE_FAILURES = 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really MAX_CONSECUTIVE_FETCH_FAILURES; we don't have use this cap for other kinds of failures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to clarify a little bit -- fetch failures are the only way we currently fail stages. Separately there are task failures, and job failures. In any case its good to make that clear here.

@andrewor14
Copy link
Contributor

@ilganeli LGTM Thanks for fixing this tricky issue; I'm sure many in the community will find this helpful. All my comments are concerned with style / renaming / test code. Once you address these I'll merge this.


completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts)

completeNextStageWithFetchFailure(1, 0, shuffleDep)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait I think things got a little confused between all the comments from Kay, Andrew, and me ...
As this stands now, its not a single fetch failure -- there is fetch failure from every task. I think the options were either (a) move this test to be first (as you've already done), but keep the name "multiple tasks w/ fetch failures" or (b) change the other tests to only have a single fetch failure by the refactoring to completeStageWithFetchFailure, and keep this one w/ multiple tasks w/ fetch failures.

Maybe the name should actually be "multiple task with fetch failures in a single stage attempt should not abort the stage"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What? Why does it matter if there are one vs multiple tasks that failed with the fetch failure? Your suggestion is very verbose...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was your concern that "Single fetch failure" could refer to a task? If so we can call this "Single stage fetch failure"

@squito
Copy link
Contributor

squito commented Sep 3, 2015

thanks for reviews @kayousterhout and @andrewor14 , and the quick updates @ilganeli !

@SparkQA
Copy link

SparkQA commented Sep 3, 2015

Test build #41951 has finished for PR 5636 at commit 5bb1ae6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Alright, merging into master. I fixed the test name on merge. Thanks everyone.

@asfgit asfgit closed this in 4bd85d0 Sep 3, 2015
asfgit pushed a commit that referenced this pull request Nov 11, 2015
just trying to increase test coverage in the scheduler, this already works.  It includes a regression test for SPARK-9809

copied some test utils from #5636, we can wait till that is merged first

Author: Imran Rashid <irashid@cloudera.com>

Closes #8402 from squito/test_retry_in_shared_shuffle_dep.

(cherry picked from commit 33112f9)
Signed-off-by: Andrew Or <andrew@databricks.com>
asfgit pushed a commit that referenced this pull request Nov 11, 2015
just trying to increase test coverage in the scheduler, this already works.  It includes a regression test for SPARK-9809

copied some test utils from #5636, we can wait till that is merged first

Author: Imran Rashid <irashid@cloudera.com>

Closes #8402 from squito/test_retry_in_shared_shuffle_dep.
kiszk pushed a commit to kiszk/spark-gpu that referenced this pull request Dec 26, 2015
just trying to increase test coverage in the scheduler, this already works.  It includes a regression test for SPARK-9809

copied some test utils from apache/spark#5636, we can wait till that is merged first

Author: Imran Rashid <irashid@cloudera.com>

Closes #8402 from squito/test_retry_in_shared_shuffle_dep.
ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
…edException

The ```Stage``` class now tracks whether there were a sufficient number of consecutive failures of that stage to trigger an abort.

To avoid an infinite loop of stage retries, we abort the job completely after 4 consecutive stage failures for one stage. We still allow more than 4 consecutive stage failures if there is an intervening successful attempt for the stage, so that in very long-lived applications, where a stage may get reused many times, we don't abort the job after failures that have been recovered from successfully.

I've added test cases to exercise the most obvious scenarios.

Author: Ilya Ganelin <ilya.ganelin@capitalone.com>

Closes apache#5636 from ilganeli/SPARK-5945.

(cherry picked from commit 4bd85d0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants