Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException #5636

Closed
wants to merge 50 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
40aefbe
[SPARK-5945] Added map to track reasons for stage failures and suppor…
Apr 22, 2015
f8744be
Fixed method scoping error
Apr 22, 2015
8fe31e0
Made StageFailure private to spark scheduler
Apr 22, 2015
e0f8b55
Made fail() method public
Apr 22, 2015
729b7ef
Added config option for stageFailure count and documentation
Apr 22, 2015
d5fa622
Moved failure tracking to Stage class. Added clear of failre count up…
Apr 24, 2015
0335b96
Removed stale documentation and fixed some erroneous spacing
Apr 24, 2015
2b91940
Added test case for stage abort after N failures
Apr 28, 2015
914b2cb
Nit
Apr 28, 2015
77555b9
Added test that also validates that the listenerBus sees the JobFaile…
Apr 29, 2015
1243b65
updated to fix last few items
Apr 30, 2015
9052e39
Removed extraneous assert
Apr 30, 2015
673fcb2
Fixed a couple minor issues. Removed unecessary asserT
Apr 30, 2015
e26ae6e
Fixed a couple minor issues. Removed unecessary asserT
Apr 30, 2015
75952ea
Fixed missing maxStageFailures
Apr 30, 2015
bc88aa1
Updated to add test cases for multiple task failures within a Stage. …
May 4, 2015
7ff8b21
Typo fix
May 4, 2015
560a381
Merge remote-tracking branch 'upstream/master' into SPARK-5945
May 4, 2015
76f226a
Resolved merge conflicts. Now simply count the nubmer stage failures …
Jul 23, 2015
fe647d0
Style
Jul 23, 2015
ddfe46c
restoring lost files
Jul 23, 2015
ee8d52e
Updated test suite to properly create task sets and force stage failures
Jul 24, 2015
4da3d5d
got rid of println
Jul 24, 2015
5e13342
Updated tests for stage failures.
Jul 27, 2015
34d69fa
Merge remote-tracking branch 'upstream/master' into SPARK-5945
Jul 27, 2015
e101ed7
Updated test for fetch failures. Added validation of successfully gen…
Jul 28, 2015
daad2e4
Added a test for multiple fetch failures inside a single stage
Jul 28, 2015
0c054d3
Added a test for multiple fetch failures inside a single stage
Jul 28, 2015
f23c31b
Added test to ensure that stage failure only triggers with successive…
Jul 28, 2015
9978575
Test updates. Some tests still failing, unsure why.
Jul 28, 2015
b66d74e
Added stage ID checks in most places and fixed naming for attempts to…
Jul 29, 2015
2e058ba
All tests passing. Still need to refactor multiple fetch failures per…
Jul 29, 2015
f79011b
Updated remaining test for sequential test failures to have three sta…
Jul 29, 2015
62532fa
Nit fixes
Jul 29, 2015
1c1cb72
Style
Jul 29, 2015
17e85de
Naming
Jul 29, 2015
cf94850
refactored tests to eliminate reused code
Aug 4, 2015
7c6f60f
Style
Aug 4, 2015
13af970
Nit fixes.
Aug 5, 2015
09929da
Style
Aug 5, 2015
eb15503
Style
Aug 5, 2015
01d6841
Nits
Aug 6, 2015
1dd4840
Fixed merge conflict
Aug 19, 2015
4da18a1
Style fix
Aug 19, 2015
5e4fe99
Merge remote-tracking branch 'upstream/master' into SPARK-5945
Sep 1, 2015
f928ff3
Updated PR description and minor nits
Sep 1, 2015
e22ce7c
Updated with feedback from PR
Sep 2, 2015
2bd4138
Style fix
Sep 2, 2015
1d44e0c
Test case updates and nit fixes
Sep 2, 2015
5bb1ae6
Added more comments to clarify tricky test case
Sep 2, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,6 @@ class DAGScheduler(
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
} else {

// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is
// possible the fetch failure has already been handled by the scheduler.
Expand All @@ -1117,6 +1116,11 @@ class DAGScheduler(
if (disallowStageRetryForTest) {
abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
None)
} else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
s"has failed the maximum allowable number of " +
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
s"Most recent failure reason: ${failureMessage}", None)
} else if (failedStages.isEmpty) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled.
Expand Down Expand Up @@ -1240,10 +1244,17 @@ class DAGScheduler(
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTimeMillis())

// Clear failure count for this stage, now that it's succeeded.
// We only limit consecutive failures of stage attempts,so that if a stage is
// re-used many times in a long-running job, unrelated failures don't eventually cause the
// stage to be aborted.
stage.clearFailures()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct, but just so I understand, we need to do this in case this stage is resubmitted in the future for a different reason?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this is because the stage may need to be re-run later if one of the machines where its output is stored fails, so the output needs to be re-constructed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that falls under "different reason", i.e. fetch failure in a different stage that depends on this one. That makes sense.

} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
}

outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
Expand Down
30 changes: 29 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.util.CallSite
* be updated for each attempt.
*
*/
private[spark] abstract class Stage(
private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
Expand Down Expand Up @@ -92,6 +92,29 @@ private[spark] abstract class Stage(
*/
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

/**
* Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
* failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is super verbose, and after reading it, it's still not obvious what this is actually storing. What about something like:

Set of IDs of stage attempts that have failed with a FetchFailure. We keep track of these failures in order to avoid endless retries if a stage keeps failing with a FetchFailure. We keep track of each attempt ID that has failed to avoid recording duplicate failures if multiple tasks from the same stage attempt fail.

private val fetchFailedAttemptIds = new HashSet[Int]

private[scheduler] def clearFailures() : Unit = {
fetchFailedAttemptIds.clear()
}

/**
* Check whether we should abort the failedStage due to multiple consecutive fetch failures.
*
* This method updates the running set of failed stage attempts and returns
* true if the number of failures exceeds the allowable number of failures.
*/
private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
fetchFailedAttemptIds.add(stageAttemptId)
fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
}

/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
Expand All @@ -110,3 +133,8 @@ private[spark] abstract class Stage(
case _ => false
}
}

private[scheduler] object Stage {
// The number of consecutive failures allowed before a stage is aborted
val MAX_CONSECUTIVE_FETCH_FAILURES = 4
}
282 changes: 279 additions & 3 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.CallSite
import org.apache.spark.executor.TaskMetrics

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {
Expand Down Expand Up @@ -473,6 +473,282 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}


// Helper function to validate state when creating tests for task failures
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
assert(stageAttempt.stageId === stageId)
assert(stageAttempt.stageAttemptId == attempt)
}


// Helper functions to extract commonly used code in Fetch Failure test cases
private def setupStageAbortTest(sc: SparkContext) {
sc.listenerBus.addListener(new EndListener())
ended = false
jobResult = null
}

// Create a new Listener to confirm that the listenerBus sees the JobEnd message
// when we abort the stage. This message will also be consumed by the EventLoggingListener
// so this will propagate up to the user.
var ended = false
var jobResult : JobResult = null

class EndListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
jobResult = jobEnd.jobResult
ended = true
}
}

/**
* Common code to get the next stage attempt, confirm it's the one we expect, and complete it
* successfully.
*
* @param stageId - The current stageId
* @param attemptIdx - The current attempt count
* @param numShufflePartitions - The number of partitions in the next stage
*/
private def completeShuffleMapStageSuccessfully(
stageId: Int,
attemptIdx: Int,
numShufflePartitions: Int): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: multiline format, each arg on its own line

def completeNextShuffleMapSuccessfully(
    stageId: Int,
    attemptIdx: Int,
    numShufflePartitions: Int): Unit = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Successfully
(thanks to mark hamstra from my copy of this on #8402)

val stageAttempt = taskSets.last
checkStageId(stageId, attemptIdx, stageAttempt)
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
case (task, idx) =>
(Success, makeMapStatus("host" + ('A' + idx).toChar, numShufflePartitions))
}.toSeq)
}

/**
* Common code to get the next stage attempt, confirm it's the one we expect, and complete it
* with all FetchFailure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason all of the tasks need to end in Failure? Can you just have one task end in a failure?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is primarily for convenience when automatically generating failing tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to expand on this slightly -- the one case where do want to have more than one fetch failure is "Multiple tasks w/ fetch failures in same stage attempt should not abort the stage". I know the other cases could just have one fetch failure, but it seems they are still testing the right thing with all fetch failures, and this way we can reuse this method. Of course there are no end of different variants we could add tests for, but it seems to me this is a reasonably good balance. I suppose we could also add an nTasksToFail param here, with a -1 or None meaning fail all tasks. And then the other tests could even check multiple permutations, if you think that would add. I don't have any strong feelings on whether that added complexity in the test cases is worth it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was just thinking that with this method, the "Multiple tasks w/ fetch failures..." test basically adds nothing, because it's just a subset of the functionality in the other tests (and so a failure in that test won't be that useful, because all of the tests will fail). A consequence is that there's no way to distinguish (in the test results) between a bug when there's a single task failure, and a bug that only manifests when multiple tasks in a stage fails. But this isn't the end of the world and if it's too hard to fix, seems OK to leave as-is (we should just re-order the tests so the unit test that's a subset of the others comes first).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, agree that as is, that test isn't really adding anything over the other tests as you've noted. I certainly don't think I'd say "too hard to fix" -- I suppose its just my antsy-ness to get this in, but objectively, it probably makes sense to fix. all you are really asking is to change completeNextStageWithFetchFailure to oneFetchFailureInNextStage and change "Multiple tasks w/ fetch failures..." to just directly do what this method is doing now, pretty minor change.

How about this: wait a day for @ilganeli to update, and if he doesn't get to it we merge as-is and I do a simple follow-up pr?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imran – I don’t have cycles to do a significant refactor at the moment. I would suggest we merge and follow up later.

From: Imran Rashid <notifications@github.commailto:notifications@github.com>
Reply-To: apache/spark <reply@reply.github.commailto:reply@reply.github.com>
Date: Wednesday, September 2, 2015 at 11:24 AM
To: apache/spark <spark@noreply.github.commailto:spark@noreply.github.com>
Cc: "Ganelin, Ilya" <ilya.ganelin@capitalone.commailto:ilya.ganelin@capitalone.com>
Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636)

In core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scalahttps://github.com//pull/5636#discussion_r38566341:

  • * succesfullly.
  • * @param stageId - The current stageId
  • * @param attemptIdx - The current attempt count
  • * @param numShufflePartitions - The number of partitions in the next stage
  • */
  • def completeNextShuffleMapSuccesfully(stageId: Int, attemptIdx: Int,
  •  numShufflePartitions: Int): Unit = {
    
  • val stageAttempt = taskSets.last
  • checkStageId(stageId, attemptIdx, stageAttempt)
  • complete(stageAttempt, makeCompletions(stageAttempt, numShufflePartitions))
  • }
  • /**
  • * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
  • * with all FetchFailure.

yeah, agree that as is, that test isn't really adding anything over the other tests as you've noted. I certainly don't think I'd say "too hard to fix" -- I suppose its just my antsy-ness to get this in, but objectively, it probably makes sense to fix. all you are really asking is to change completeNextStageWithFetchFailure to oneFetchFailureInNextStage and change "Multiple tasks w/ fetch failures..." to just directly do what this method is doing now, pretty minor change.

How about this: wait a day for @ilganelihttps://github.com/ilganeli to update, and if he doesn't get to it we merge as-is and I do a simple follow-up pr?


Reply to this email directly or view it on GitHubhttps://github.com//pull/5636/files#r38566341.


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

*
* @param stageId - The current stageId
* @param attemptIdx - The current attempt count
* @param shuffleDep - The shuffle dependency of the stage with a fetch failure
*/
private def completeNextStageWithFetchFailure(
stageId: Int,
attemptIdx: Int,
shuffleDep: ShuffleDependency[_, _, _]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: multiline format

val stageAttempt = taskSets.last
checkStageId(stageId, attemptIdx, stageAttempt)
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
(FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null)
}.toSeq)
}

/**
* Common code to get the next result stage attempt, confirm it's the one we expect, and
* complete it with a success where we return 42.
*
* @param stageId - The current stageId
* @param attemptIdx - The current attempt count
*/
private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = {
val stageAttempt = taskSets.last
checkStageId(stageId, attemptIdx, stageAttempt)
assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq)
}

/**
* In this test, we simulate a job where many tasks in the same stage fail. We want to show
* that many fetch failures inside a single stage attempt do not trigger an abort
* on their own, but only when there are enough failing stage attempts.
*/
test("Single fetch failure should not abort the stage.") {
setupStageAbortTest(sc)

val parts = 8
val shuffleMapRdd = new MyRDD(sc, parts, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, parts, List(shuffleDep))
submit(reduceRdd, (0 until parts).toArray)

completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts)

completeNextStageWithFetchFailure(1, 0, shuffleDep)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait I think things got a little confused between all the comments from Kay, Andrew, and me ...
As this stands now, its not a single fetch failure -- there is fetch failure from every task. I think the options were either (a) move this test to be first (as you've already done), but keep the name "multiple tasks w/ fetch failures" or (b) change the other tests to only have a single fetch failure by the refactoring to completeStageWithFetchFailure, and keep this one w/ multiple tasks w/ fetch failures.

Maybe the name should actually be "multiple task with fetch failures in a single stage attempt should not abort the stage"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What? Why does it matter if there are one vs multiple tasks that failed with the fetch failure? Your suggestion is very verbose...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was your concern that "Single fetch failure" could refer to a task? If so we can call this "Single stage fetch failure"


// Resubmit and confirm that now all is well
scheduler.resubmitFailedStages()

assert(scheduler.runningStages.nonEmpty)
assert(!ended)

// Complete stage 0 and then stage 1 with a "42"
completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = parts)
completeNextResultStageWithSuccess(1, 1)

// Confirm job finished succesfully
sc.listenerBus.waitUntilEmpty(1000)
assert(ended === true)
assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
assertDataStructuresEmpty()
}

/**
* In this test we simulate a job failure where the first stage completes successfully and
* the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage
* trigger an overall job abort to avoid endless retries.
*/
test("Multiple consecutive stage fetch failures should lead to job being aborted.") {
setupStageAbortTest(sc)

val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))

for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
// Complete all the tasks for the current attempt of stage 0 successfully
completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)

// Now we should have a new taskSet, for a new attempt of stage 1.
// Fail all these tasks with FetchFailure
completeNextStageWithFetchFailure(1, attempt, shuffleDep)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, all tasks, not one task


// this will trigger a resubmission of stage 0, since we've lost some of its
// map output, for the next iteration through the loop
scheduler.resubmitFailedStages()

if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
assert(scheduler.runningStages.nonEmpty)
assert(!ended)
} else {
// Stage should have been aborted and removed from running stages
assertDataStructuresEmpty()
sc.listenerBus.waitUntilEmpty(1000)
assert(ended)
jobResult match {
case JobFailed(reason) =>
assert(reason.getMessage.contains("ResultStage 1 () has failed the maximum"))
case other => fail(s"expected JobFailed, not $other")
}
}
}
}

/**
* In this test, we create a job with two consecutive shuffles, and simulate 2 failures for each
* shuffle fetch. In total In total, the job has had four failures overall but not four failures
* for a particular stage, and as such should not be aborted.
*/
test("Failures in different stages should not trigger an overall abort") {
setupStageAbortTest(sc)

val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
submit(finalRdd, Array(0))

// In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations,
// stage 2 fails.
for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
// Complete all the tasks for the current attempt of stage 0 successfully
completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)

if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) {
// Now we should have a new taskSet, for a new attempt of stage 1.
// Fail all these tasks with FetchFailure
completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

} else {
completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 1)

// Fail stage 2
completeNextStageWithFetchFailure(2, attempt - Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2,
shuffleDepTwo)
}

// this will trigger a resubmission of stage 0, since we've lost some of its
// map output, for the next iteration through the loop
scheduler.resubmitFailedStages()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also successfully complete stage 0 & stage 1 at this point?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to rerun stage0 and stage1 again? What would that show?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to make sure that we can actually finish the job successfully -- you could imagine your checks all passing, but the job is stuck in some weird state where the DAGSCheduler thinks the stage is still running, but no task sets actually created, or something weird like that. Your asserts are almost there, but not quite. the final call to scheduler.resubmitFailedStages should trigger another attempt for stage 0, and after completing that, you'd get another attempt for stage 1, and then finally the job should complete, but successfully.

I know this is minor, I'd just like to be extra thorough.


completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1)

// Succeed stage2 with a "42"
completeNextResultStageWithSuccess(2, Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2)

assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertDataStructuresEmpty()


/**
* In this test we demonstrate that only consecutive failures trigger a stage abort. A stage may
* fail multiple times, succeed, then fail a few more times (because its run again by downstream
* dependencies). The total number of failed attempts for one stage will go over the limit,
* but that doesn't matter, since they have successes in the middle.
*/
test("Non-consecutive stage failures don't trigger abort") {
setupStageAbortTest(sc)

val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
submit(finalRdd, Array(0))

// First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
// Make each task in stage 0 success
completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)

// Now we should have a new taskSet, for a new attempt of stage 1.
// Fail these tasks with FetchFailure
completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've changed the behavior a bit here in the refactor, so the comment is no longer accurate -- now we fail all tasks w/ fetch failures.


scheduler.resubmitFailedStages()

// Confirm we have not yet aborted
assert(scheduler.runningStages.nonEmpty)
assert(!ended)
}

// Rerun stage 0 and 1 to step through the task set
completeShuffleMapStageSuccessfully(0, 3, numShufflePartitions = 2)
completeShuffleMapStageSuccessfully(1, 3, numShufflePartitions = 1)

// Fail stage 2 so that stage 1 is resubmitted when we call scheduler.resubmitFailedStages()
completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)

scheduler.resubmitFailedStages()

// Rerun stage 0 to step through the task set
completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)

// Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this doesn't trigger an abort
// since we succeeded in between.
completeNextStageWithFetchFailure(1, 4, shuffleDepOne)

scheduler.resubmitFailedStages()

// Confirm we have not yet aborted
assert(scheduler.runningStages.nonEmpty)
assert(!ended)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this test -- it has fetch failures, which will result in a stage retry, but the test description says the point is to test non-fetch failures

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments @squito. I'll pick this up once the other patch is merged.


// Next, succeed all and confirm output
// Rerun stage 0 + 1
completeShuffleMapStageSuccessfully(0, 5, numShufflePartitions = 2)
completeShuffleMapStageSuccessfully(1, 5, numShufflePartitions = 1)

// Succeed stage 2 and verify results
completeNextResultStageWithSuccess(2, 1)

assertDataStructuresEmpty()
sc.listenerBus.waitUntilEmpty(1000)
assert(ended === true)
assert(results === Map(0 -> 42))
}

test("trivial shuffle with multiple fetch failures") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
Expand Down Expand Up @@ -810,15 +1086,15 @@ class DAGSchedulerSuite
submit(finalRdd, Array(0))
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
// complete stage 2
// complete stage 0
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
// complete stage 1
complete(taskSets(1), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// pretend stage 0 failed because hostA went down
// pretend stage 2 failed because hostA went down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a test comment for spark-prs integration; please disregard

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to test comment; again, please disregard

complete(taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
Expand Down