From 3e3ea3916b3cf64ae4a7fc60ce46c63b21731bd0 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 25 Aug 2015 15:33:57 -0500 Subject: [PATCH 1/3] asserts on numAvailableOutputs to make it clear what is happening --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2e8688cf41d99..17b8edc87793e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -656,19 +656,25 @@ class DAGSchedulerSuite val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) val taskSet = taskSets(0) + val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage] + assert(shuffleStage.numAvailableOutputs === 0) // should be ignored for being too old runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) - // should work because it's a non-failed host + assert(shuffleStage.numAvailableOutputs === 0) + // should work because it's a non-failed host (so the available map outputs will increase) runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + assert(shuffleStage.numAvailableOutputs === 1) // should be ignored for being too old runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + assert(shuffleStage.numAvailableOutputs === 1) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + assert(shuffleStage.numAvailableOutputs === 2) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) From 8e2a969759fbf1dfd415a5a964374412edd017f6 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 25 Aug 2015 15:35:52 -0500 Subject: [PATCH 2/3] formatting of runEvent(CompletionEvent(...)) --- .../spark/scheduler/DAGSchedulerSuite.scala | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 17b8edc87793e..84fdc0d5292e9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -659,21 +659,41 @@ class DAGSchedulerSuite val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage] assert(shuffleStage.numAvailableOutputs === 0) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", - reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent( + taskSet.tasks(0), + Success, + makeMapStatus("hostA", reduceRdd.partitions.size), + null, + createFakeTaskInfo(), + null)) assert(shuffleStage.numAvailableOutputs === 0) // should work because it's a non-failed host (so the available map outputs will increase) - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", - reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent( + taskSet.tasks(0), + Success, + makeMapStatus("hostB", reduceRdd.partitions.size), + null, + createFakeTaskInfo(), + null)) assert(shuffleStage.numAvailableOutputs === 1) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", - reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent( + taskSet.tasks(0), + Success, + makeMapStatus("hostA", reduceRdd.partitions.size), + null, + createFakeTaskInfo(), + null)) assert(shuffleStage.numAvailableOutputs === 1) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", - reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent( + taskSet.tasks(1), + Success, + makeMapStatus("hostA", reduceRdd.partitions.size), + null, + createFakeTaskInfo(), + null)) assert(shuffleStage.numAvailableOutputs === 2) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) From 94927c8ec3d5a63eb8f0416176df2f5b505b9039 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 2 Sep 2015 09:46:10 -0500 Subject: [PATCH 3/3] blank lines and a few more comments --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 84fdc0d5292e9..56a7238f019b9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -650,14 +650,19 @@ class DAGSchedulerSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) + // pretend we were told hostA went away val oldEpoch = mapOutputTracker.getEpoch runEvent(ExecutorLost("exec-hostA")) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) + + // now start completing some tasks in the shuffle map stage, under different hosts + // and epochs, and make sure scheduler updates its state correctly val taskSet = taskSets(0) val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage] assert(shuffleStage.numAvailableOutputs === 0) + // should be ignored for being too old runEvent(CompletionEvent( taskSet.tasks(0), @@ -667,6 +672,7 @@ class DAGSchedulerSuite createFakeTaskInfo(), null)) assert(shuffleStage.numAvailableOutputs === 0) + // should work because it's a non-failed host (so the available map outputs will increase) runEvent(CompletionEvent( taskSet.tasks(0), @@ -676,6 +682,7 @@ class DAGSchedulerSuite createFakeTaskInfo(), null)) assert(shuffleStage.numAvailableOutputs === 1) + // should be ignored for being too old runEvent(CompletionEvent( taskSet.tasks(0), @@ -685,7 +692,9 @@ class DAGSchedulerSuite createFakeTaskInfo(), null)) assert(shuffleStage.numAvailableOutputs === 1) - // should work because it's a new epoch + + // should work because it's a new epoch, which will increase the number of available map + // outputs, and also finish the stage taskSet.tasks(1).epoch = newEpoch runEvent(CompletionEvent( taskSet.tasks(1), @@ -697,6 +706,8 @@ class DAGSchedulerSuite assert(shuffleStage.numAvailableOutputs === 2) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) + + // finish the next stage normally, which completes the job complete(taskSets(1), Seq((Success, 42), (Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty()