From a0c7646c57663c92f11fb4d023745525f7712be8 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Thu, 6 Apr 2023 08:03:16 +0200 Subject: [PATCH] PR comments And remove unneeded config in spec --- .../main/scala/org/apache/spark/executor/TaskMetrics.scala | 5 ++--- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 2 +- .../test/scala/org/apache/spark/executor/ExecutorSuite.scala | 2 -- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 2 +- .../spark/sql/execution/metric/SQLMetricsTestUtils.scala | 2 +- 5 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 3d63f5e898d65..1f9cb0f755a58 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -265,14 +265,13 @@ class TaskMetrics private[spark] () extends Serializable { */ @transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]] - private[spark] def externalAccums() = _externalAccums.asScala + private[spark] def externalAccums = _externalAccums.asScala private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { _externalAccums.add(a) } - private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = - internalAccums ++ _externalAccums.asScala + private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = { // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index dbacdc6d72f0b..001e3220e73b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -208,7 +208,7 @@ private[spark] abstract class Task[T]( context.taskMetrics.nonZeroInternalAccums() ++ // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not // filter them out. - context.taskMetrics.externalAccums().filter(a => !taskFailed || a.countFailedValues) + context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues) } else { Seq.empty } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 2655a2dd9dc1f..745720a54af36 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -275,8 +275,6 @@ class ExecutorSuite extends SparkFunSuite test("SPARK-39696: Using accumulators should not cause heartbeat to fail") { val conf = new SparkConf().setMaster("local").setAppName("executor suite test") conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms") - conf.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "500ms") - conf.set(Network.NETWORK_TIMEOUT_INTERVAL.key, "400ms") sc = new SparkContext(conf) val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator${i}")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 02e1c9cd00aac..7b9f877bdef5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -178,7 +178,7 @@ class SQLAppStatusListener( // work around a race in the DAGScheduler. The metrics info does not contain accumulator info // when reading event logs in the SHS, so we have to rely on the accumulator in that case. val accums = if (live && event.taskMetrics != null) { - event.taskMetrics.externalAccums().flatMap { a => + event.taskMetrics.externalAccums.flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { Some(a.toInfo(Some(a.value), None)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 81be8d97572b1..81667d52e16ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -311,7 +311,7 @@ object InputOutputMetricsHelper { res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead var maxOutputRows = 0L - for (accum <- taskEnd.taskMetrics.externalAccums()) { + for (accum <- taskEnd.taskMetrics.externalAccums) { val info = accum.toInfo(Some(accum.value), None) if (info.name.toString.contains("number of output rows")) { info.update match {