From 882126c2671e1733d572350af9749e9f8bdca1c2 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Mon, 4 Dec 2017 20:23:14 +0800 Subject: [PATCH 1/2] Do not update accumulator for resubmitted task in result stage --- .../apache/spark/scheduler/DAGScheduler.scala | 14 ++++++++++--- .../spark/scheduler/DAGSchedulerSuite.scala | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9153751d03c1b..c2498d4808e91 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1187,9 +1187,17 @@ class DAGScheduler( // only updated in certain cases. event.reason match { case Success => - stage match { - case rs: ResultStage if rs.activeJob.isEmpty => - // Ignore update if task's job has finished. + task match { + case rt: ResultTask[_, _] => + val resultStage = stage.asInstanceOf[ResultStage] + resultStage.activeJob match { + case Some(job) => + // Only update the accumulator once for each result task. + if (!job.finished(rt.outputId)) { + updateAccumulators(event) + } + case None => // Ignore update if task's job has finished. + } case _ => updateAccumulators(event) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index feefb6a4d73f0..3c92ff8f7a9ec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1832,6 +1832,27 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("accumulator not calculated for resubmitted task in result stage") { + // just for register + val accum = AccumulatorSuite.createLongAccum("a") + val finalRdd = new MyRDD(sc, 2, Nil) + submit(finalRdd, Array(0, 1)) + // finish the first task + completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) + // verify stage exists + assert(scheduler.stageIdToStage.contains(0)) + + // finish the first task again (simulate a speculative task or a resubmitted task) + completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + + // The accumulator should only be updated once. + assert(accum.value === 1) + + runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success, 42)) + assertDataStructuresEmpty() + } + test("accumulators are updated on exception failures") { val acc1 = AccumulatorSuite.createLongAccum("ingenieur") val acc2 = AccumulatorSuite.createLongAccum("boulanger") From 756f02f1586edd14e42e32cd119e43132e9d13ee Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Tue, 5 Dec 2017 10:36:23 +0800 Subject: [PATCH 2/2] remove unnecessary comment --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3c92ff8f7a9ec..d812b5bd92c1b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1833,7 +1833,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } test("accumulator not calculated for resubmitted task in result stage") { - // just for register val accum = AccumulatorSuite.createLongAccum("a") val finalRdd = new MyRDD(sc, 2, Nil) submit(finalRdd, Array(0, 1))