Skip to content

Commit

Permalink
simple test w/ failure involving a shared dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Sep 4, 2015
1 parent cf42138 commit cfcf4e6
Showing 1 changed file with 49 additions and 2 deletions.
Expand Up @@ -556,11 +556,17 @@ class DAGSchedulerSuite
* @param stageId - The current stageId
* @param attemptIdx - The current attempt count
*/
private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = {
private def completeNextResultStageWithSuccess(
stageId: Int,
attemptIdx: Int,
partitionToResult: Int => Int = _ => 42): Unit = {
val stageAttempt = taskSets.last
checkStageId(stageId, attemptIdx, stageAttempt)
assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq)
val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
(Success, partitionToResult(idx))
}
complete(stageAttempt, taskResults.toSeq)
}

/**
Expand Down Expand Up @@ -1017,6 +1023,47 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

/**
* Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which
* requires regenerating some outputs of the shared dependency. One key aspect of this test is
* that the second job actually uses a different stage for the shared dependency (a "skipped"
* stage).
*/
test("shuffle fetch failure in a reused shuffle dependency") {
// Run the first job successfully, which creates one shuffle dependency

val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))

completeShuffleMapStageSuccessfully(0, 0, 2)
completeNextResultStageWithSuccess(1, 0)
assert(results === Map(0 -> 42, 1 -> 42))
assertDataStructuresEmpty()

// submit another job w/ the shared dependency, and have a fetch failure
val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
submit(reduce2, Array(0, 1))
// Note that the stage numbering here is only b/c the shared dependency produces a new, skipped
// stage. If instead it reused the existing stage, then this would be stage 2
completeNextStageWithFetchFailure(3, 0, shuffleDep)
scheduler.resubmitFailedStages()

// the scheduler now creates a new task set to regenerate the missing map output, but this time
// using a different stage, the "skipped" one

// SPARK-9809 -- this stage is submitted without a task for each partition (because some of
// the shuffle map output is still available from stage 0); make sure we've still got internal
// accumulators setup
assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
completeShuffleMapStageSuccessfully(2, 0, 2)
completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
assert(results === Map(0 -> 1234, 1 -> 1235))

assertDataStructuresEmpty()
}

/**
* Makes sure that failures of stage used by multiple jobs are correctly handled.
*
Expand Down

0 comments on commit cfcf4e6

Please sign in to comment.