From 57fc9afbda5265d389771c0e102266e39171034b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 4 May 2016 22:56:24 +0800 Subject: [PATCH 1/7] Only send back updated accumulators to driver --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4dfd532e93624..175cdb1dec2d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1097,8 +1097,8 @@ class DAGScheduler( throw new SparkException(s"attempted to access non-existent accumulator $id") } acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]]) - // To avoid UI cruft, ignore cases where value wasn't updated - if (acc.name.isDefined && !updates.isZero) { + // Only display named accumulators on UI. + if (acc.name.isDefined) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 95bcc7bc9653a..57fe99263054d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -155,7 +155,9 @@ private[spark] abstract class Task[T]( */ def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = { if (context != null) { - context.taskMetrics.accumulators().filter { a => !taskFailed || a.countFailedValues } + context.taskMetrics.accumulators().filter { a => + !a.isZero && (!taskFailed || a.countFailedValues) + } } else { Seq.empty } From c523feeb58376ed4813d1e5119638fe6528f742a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 May 2016 10:59:57 +0800 Subject: [PATCH 2/7] add assert --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 175cdb1dec2d3..6637a9136f58a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1089,6 +1089,9 @@ class DAGScheduler( val stage = stageIdToStage(task.stageId) try { event.accumUpdates.foreach { updates => + // We only send back updated accumulators + assert(!updates.isZero) + val id = updates.id // Find the corresponding accumulator on the driver and update it val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match { From 41f5cb4da8d9192bc75f547ec1b4dd68d6205161 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 May 2016 22:33:58 +0800 Subject: [PATCH 3/7] improve --- .../apache/spark/scheduler/DAGScheduler.scala | 7 +--- .../org/apache/spark/scheduler/Task.scala | 8 +++- .../org/apache/spark/util/AccumulatorV2.scala | 4 +- .../spark/scheduler/TaskContextSuite.scala | 37 +++++++++++++++---- 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6637a9136f58a..4dfd532e93624 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1089,9 +1089,6 @@ class DAGScheduler( val stage = stageIdToStage(task.stageId) try { event.accumUpdates.foreach { updates => - // We only send back updated accumulators - assert(!updates.isZero) - val id = updates.id // Find the corresponding accumulator on the driver and update it val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match { @@ -1100,8 +1097,8 @@ class DAGScheduler( throw new SparkException(s"attempted to access non-existent accumulator $id") } acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]]) - // Only display named accumulators on UI. - if (acc.name.isDefined) { + // To avoid UI cruft, ignore cases where value wasn't updated + if (acc.name.isDefined && !updates.isZero) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 57fe99263054d..f40fcf1949beb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -156,7 +156,13 @@ private[spark] abstract class Task[T]( def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = { if (context != null) { context.taskMetrics.accumulators().filter { a => - !a.isZero && (!taskFailed || a.countFailedValues) + if (taskFailed) { + a.countFailedValues && !a.isZero + } else { + // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its + // value will be updated at driver side if task not fail. + !a.isZero || a.name == Some(InternalAccumulator.RESULT_SIZE) + } } } else { Seq.empty diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index d8f380e1230e5..c4879036f6522 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -256,7 +256,7 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { * Adds v to the accumulator, i.e. increment sum by v and count by 1. * @since 2.0.0 */ - override def isZero: Boolean = _count == 0L + override def isZero: Boolean = _sum == 0L && _count == 0 override def copyAndReset(): LongAccumulator = new LongAccumulator @@ -321,7 +321,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { private[this] var _sum = 0.0 private[this] var _count = 0L - override def isZero: Boolean = _count == 0L + override def isZero: Boolean = _sum == 0.0 && _count == 0 override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 9aca4dbc23644..e5ec50e27496b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -168,8 +168,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark test("failed tasks collect only accumulators whose values count during failures") { sc = new SparkContext("local", "test") - val acc1 = AccumulatorSuite.createLongAccum("x", true) - val acc2 = AccumulatorSuite.createLongAccum("y", false) + val acc1 = AccumulatorSuite.createLongAccum("x", false) + val acc2 = AccumulatorSuite.createLongAccum("y", true) + acc1.add(1) + acc2.add(1) // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. val taskMetrics = TaskMetrics.empty @@ -185,12 +187,33 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark } // First, simulate task success. This should give us all the accumulators. val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false) - val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2) - TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2) + TaskMetricsSuite.assertUpdatesEquals(accumUpdates1.takeRight(2), Seq(acc1, acc2)) // Now, simulate task failures. This should give us only the accums that count failed values. - val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true) - val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1) - TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4) + val accumUpdates2 = task.collectAccumulatorUpdates(taskFailed = true) + TaskMetricsSuite.assertUpdatesEquals(accumUpdates2.takeRight(1), Seq(acc2)) + } + + test("only updated accumulators will be sent back to driver") { + sc = new SparkContext("local", "test") + val acc1 = AccumulatorSuite.createLongAccum("x") + val acc2 = AccumulatorSuite.createLongAccum("y") + // Create a dummy task. We won't end up running this; we just want to collect + // accumulator updates from it. + val taskMetrics = TaskMetrics.empty + val task = new Task[Int](0, 0, 0) { + context = new TaskContextImpl(0, 0, 0L, 0, + new TaskMemoryManager(SparkEnv.get.memoryManager, 0L), + new Properties, + SparkEnv.get.metricsSystem, + taskMetrics) + taskMetrics.registerAccumulator(acc1) + taskMetrics.registerAccumulator(acc2) + override def runTask(tc: TaskContext): Int = 0 + } + acc1.add(1) + val updatedAccums = task.collectAccumulatorUpdates() + assert(updatedAccums.map(_.id).contains(acc1.id)) + assert(!updatedAccums.map(_.id).contains(acc2.id)) } test("localProperties are propagated to executors correctly") { From b48dda8402bd85ab02586e36bd7eb9440c140e00 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 6 May 2016 09:59:11 +0800 Subject: [PATCH 4/7] fix sql --- .../scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 3 ++- .../apache/spark/sql/execution/metric/SQLMetricsSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 29c54111ea7bd..84a02915fba70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -164,7 +164,8 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi taskEnd.taskInfo.taskId, taskEnd.stageId, taskEnd.stageAttemptId, - taskEnd.taskMetrics.accumulators().map(a => a.toInfo(Some(a.value), None)), + taskEnd.taskMetrics.accumulators().filter(_.isInstanceOf[SQLMetric]) + .map(a => a.toInfo(Some(a.value), None)), finishTask = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index d41e88a0aa853..e46967cc30eb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -91,7 +91,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { expectedMetrics.contains(node.id) }.map { node => val nodeMetrics = node.metrics.map { metric => - val metricValue = metricValues(metric.accumulatorId) + val metricValue = metricValues.getOrElse(metric.accumulatorId, "0") (metric.name, metricValue) }.toMap (node.id, node.name -> nodeMetrics) From cb210349233e06a368f4693b92ed50314e168eab Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 6 May 2016 12:11:58 +0800 Subject: [PATCH 5/7] do not filter out zero value external accumulator --- .../org/apache/spark/executor/TaskMetrics.scala | 2 +- .../scala/org/apache/spark/scheduler/Task.scala | 16 +++++++--------- .../spark/sql/execution/ui/SQLListener.scala | 3 +-- .../sql/execution/metric/SQLMetricsSuite.scala | 2 +- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 7f4652c2dd765..1893167cf7261 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -218,7 +218,7 @@ class TaskMetrics private[spark] () extends Serializable { /** * External accumulators registered with this task. */ - @transient private lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] + @transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { externalAccums += a diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index f40fcf1949beb..7d2c219782d63 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -155,15 +155,13 @@ private[spark] abstract class Task[T]( */ def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = { if (context != null) { - context.taskMetrics.accumulators().filter { a => - if (taskFailed) { - a.countFailedValues && !a.isZero - } else { - // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its - // value will be updated at driver side if task not fail. - !a.isZero || a.name == Some(InternalAccumulator.RESULT_SIZE) - } - } + context.taskMetrics.internalAccums.filter { a => + // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its + // value will be updated at driver side. + !a.isZero || a.name == Some(InternalAccumulator.RESULT_SIZE) + // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not filter + // them out. + } ++ context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues) } else { Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 84a02915fba70..510a2ee3bfafb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -164,8 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi taskEnd.taskInfo.taskId, taskEnd.stageId, taskEnd.stageAttemptId, - taskEnd.taskMetrics.accumulators().filter(_.isInstanceOf[SQLMetric]) - .map(a => a.toInfo(Some(a.value), None)), + taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)), finishTask = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index e46967cc30eb5..d41e88a0aa853 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -91,7 +91,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { expectedMetrics.contains(node.id) }.map { node => val nodeMetrics = node.metrics.map { metric => - val metricValue = metricValues.getOrElse(metric.accumulatorId, "0") + val metricValue = metricValues(metric.accumulatorId) (metric.name, metricValue) }.toMap (node.id, node.name -> nodeMetrics) From b4e7385823880b622f17d5cdf57fca037fe93cb7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 6 May 2016 15:26:04 +0800 Subject: [PATCH 6/7] fix tests --- .../spark/scheduler/TaskContextSuite.scala | 16 ++++++++-------- .../sql/execution/ui/SQLListenerSuite.scala | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index e5ec50e27496b..368668bc7e2e4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -193,10 +193,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark TaskMetricsSuite.assertUpdatesEquals(accumUpdates2.takeRight(1), Seq(acc2)) } - test("only updated accumulators will be sent back to driver") { + test("only updated internal accumulators will be sent back to driver") { sc = new SparkContext("local", "test") - val acc1 = AccumulatorSuite.createLongAccum("x") - val acc2 = AccumulatorSuite.createLongAccum("y") // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. val taskMetrics = TaskMetrics.empty @@ -206,14 +204,16 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark new Properties, SparkEnv.get.metricsSystem, taskMetrics) - taskMetrics.registerAccumulator(acc1) - taskMetrics.registerAccumulator(acc2) + taskMetrics.incMemoryBytesSpilled(10) override def runTask(tc: TaskContext): Int = 0 } - acc1.add(1) val updatedAccums = task.collectAccumulatorUpdates() - assert(updatedAccums.map(_.id).contains(acc1.id)) - assert(!updatedAccums.map(_.id).contains(acc2.id)) + assert(updatedAccums.length == 2) + // the RESULT_SIZE accumulator will be sent back anyway. + assert(updatedAccums(0).name == Some(InternalAccumulator.RESULT_SIZE)) + assert(updatedAccums(0).value == 0) + assert(updatedAccums(1).name == Some(InternalAccumulator.MEMORY_BYTES_SPILLED)) + assert(updatedAccums(1).value == 10) } test("localProperties are propagated to executors correctly") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 5e08658e5efa2..67e44849ca877 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.ui import java.util.Properties -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.mock import org.apache.spark._ import org.apache.spark.executor.TaskMetrics @@ -74,13 +74,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { ) private def createTaskMetrics(accumulatorUpdates: Map[Long, Long]): TaskMetrics = { - val metrics = mock(classOf[TaskMetrics]) - when(metrics.accumulators()).thenReturn(accumulatorUpdates.map { case (id, update) => + val metrics = TaskMetrics.empty + accumulatorUpdates.foreach { case (id, update) => val acc = new LongAccumulator acc.metadata = AccumulatorMetadata(id, Some(""), true) - acc.setValue(update) - acc - }.toSeq) + acc.add(update) + metrics.registerAccumulator(acc) + } metrics } From 18aa4abb4ddd4cf0800e0b353077d083f66096de Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 10 May 2016 08:20:21 +0800 Subject: [PATCH 7/7] fix sql metrics --- .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index f82e0b8bca77a..786110477d8cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -66,7 +66,7 @@ private[sql] object SQLMetrics { def createMetric(sc: SparkContext, name: String): SQLMetric = { val acc = new SQLMetric(SUM_METRIC) - acc.register(sc, name = Some(name), countFailedValues = true) + acc.register(sc, name = Some(name), countFailedValues = false) acc } @@ -79,7 +79,7 @@ private[sql] object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = true) + acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) acc } @@ -88,7 +88,7 @@ private[sql] object SQLMetrics { // duration(min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = true) + acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) acc }