From f5df21405f1eb31c7f32ffd7b32ed02abbc6d033 Mon Sep 17 00:00:00 2001 From: jinxing Date: Sun, 12 Feb 2017 20:39:53 +0800 Subject: [PATCH 1/3] [SPARK-19565] Improve DAGScheduler tests. --- .../spark/scheduler/DAGSchedulerSuite.scala | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4e5f267e237c0..a96271f0121a1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + + val rddB = new MyRDD(sc, 2, List(shuffleDepA)) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + + val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + + submit(rddC, Array(0, 1)) + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2)))) + + // Fetch failed on hostA for task(partitionId=0) and success on hostB for task(partitionId=1) + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, + "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), null), + (Success, makeMapStatus("hostB", 2)))) + + scheduler.resubmitFailedStages() + assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2)))) + + assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + + // Thanks to the success from old attempt of stage(stageId=1), there's no pending + // partitions for stage(stageId=1) now, thus downstream stage should be submitted, + // though there's still a running task in the active stage attempt. + assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) + complete(taskSets(4), Seq( + (Success, 1), + (Success, 1))) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. From f8cf4fc09d81a6a459fb2b6212664d0fe2c94ab2 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 14 Feb 2017 10:36:32 +0800 Subject: [PATCH 2/3] fix --- .../spark/scheduler/DAGSchedulerSuite.scala | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a96271f0121a1..837d5548955c2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2162,6 +2162,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } test("After fetching failed, success of old attempt of stage should be taken as valid.") { + // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC val rddA = new MyRDD(sc, 2, Nil) val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) val shuffleIdA = shuffleDepA.shuffleId @@ -2174,29 +2175,43 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou submit(rddC, Array(0, 1)) assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + // Complete both tasks in rddA. complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostA", 2)))) - // Fetch failed on hostA for task(partitionId=0) and success on hostB for task(partitionId=1) - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, - "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), null), - (Success, makeMapStatus("hostB", 2)))) + // Fetch failed on hostA for task(partitionId=0) and task(partitionId=1) is still running. + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, + "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), + result = null)) + // Both original tasks in rddA should be marked as failed, because they ran on the + // failed hostA, so both should be resubmitted. Complete them successfully. scheduler.resubmitFailedStages() - assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) + assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 + && taskSets(2).tasks.size === 2) complete(taskSets(2), Seq( (Success, makeMapStatus("hostB", 2)), (Success, makeMapStatus("hostB", 2)))) - assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) + // Both tasks in rddB should be resubmitted, because none of them has succeeded. + // Complete the task(partitionId=0) successfully. Task(partitionId=1) is still running. + assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 && + taskSets(3).tasks.size === 2) runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) - // Thanks to the success from old attempt of stage(stageId=1), there's no pending - // partitions for stage(stageId=1) now, thus downstream stage should be submitted, - // though there's still a running task in the active stage attempt. + // Complete the task(partition=1) which is from the old attempt(stageId=1, stageAttempt=0) + // successfully. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + + // Thanks to the success from old attempt of stage(stageId=1, stageAttempt=0), there's no + // pending partitions for stage(stageId=1) now, thus downstream stage should be submitted, + // though there's still a running task(stageId=1, stageAttempt=1, partitionId=1) + // in the active stage attempt. assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) complete(taskSets(4), Seq( (Success, 1), From 86fd6b45199cfb14e5b4cd6b90065abb25e15f75 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 15 Feb 2017 09:51:46 +0800 Subject: [PATCH 3/3] Modify comment. --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 837d5548955c2..4750b38f829aa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2161,7 +2161,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } - test("After fetching failed, success of old attempt of stage should be taken as valid.") { + test("After a fetch failure, success of old attempt of stage should be taken as valid.") { // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC val rddA = new MyRDD(sc, 2, Nil) val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))