From 7e790412ed6409fdda96216dde7f4f408bb04a57 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 10 Nov 2018 17:04:25 +0100 Subject: [PATCH 1/2] [SPARK-26003] Improve SQLAppStatusListener.aggregateMetrics performance --- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 6978ec3a85715..1a1f357809ca8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -159,7 +159,7 @@ class SQLAppStatusListener( } private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { - val metricIds = exec.metrics.map(_.accumulatorId).sorted + val metricIds = exec.metrics.map(_.accumulatorId).toSet val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap val metrics = exec.stages.toSeq .flatMap { stageId => Option(stageMetrics.get(stageId)) } @@ -170,7 +170,7 @@ class SQLAppStatusListener( .filter { case (id, _) => metricIds.contains(id) } .groupBy(_._1) .map { case (id, values) => - id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq) + id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2)) } // Check the execution again for whether the aggregated metrics data has been calculated. From 031d512b84e0b84a1876c098e0842f13d37c38e8 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 12 Nov 2018 16:55:42 +0100 Subject: [PATCH 2/2] address comment --- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 1a1f357809ca8..45954f21c5925 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -159,7 +159,6 @@ class SQLAppStatusListener( } private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { - val metricIds = exec.metrics.map(_.accumulatorId).toSet val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap val metrics = exec.stages.toSeq .flatMap { stageId => Option(stageMetrics.get(stageId)) } @@ -167,7 +166,7 @@ class SQLAppStatusListener( .flatMap { metrics => metrics.ids.zip(metrics.values) } val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq) - .filter { case (id, _) => metricIds.contains(id) } + .filter { case (id, _) => metricTypes.contains(id) } .groupBy(_._1) .map { case (id, values) => id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2))