From 638cbf5b415d7c7a796af70d29eaf2d664ee469a Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 25 May 2021 23:17:48 -0700 Subject: [PATCH 1/2] Add fallback metrics for hash aggregate --- .../execution/aggregate/HashAggregateExec.scala | 15 +++++++++++---- .../aggregate/TungstenAggregationIterator.scala | 4 +++- .../sql/execution/metric/SQLMetricsSuite.scala | 12 ++++++++---- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 3c1304e9cdad8..2947f2c7321e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -67,7 +67,9 @@ case class HashAggregateExec( "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"), "avgHashProbe" -> - SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters")) + SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"), + "numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, + "number of tasks fall-backed to sort-based aggregation")) // This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash // map and/or the sort-based aggregation once it has processed a given number of input rows. @@ -86,6 +88,7 @@ case class HashAggregateExec( val spillSize = longMetric("spillSize") val avgHashProbe = longMetric("avgHashProbe") val aggTime = longMetric("aggTime") + val numTasksFallBacked = longMetric("numTasksFallBacked") child.execute().mapPartitionsWithIndex { (partIndex, iter) => @@ -112,7 +115,8 @@ case class HashAggregateExec( numOutputRows, peakMemory, spillSize, - avgHashProbe) + avgHashProbe, + numTasksFallBacked) if (!hasInput && groupingExpressions.isEmpty) { numOutputRows += 1 Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput()) @@ -460,7 +464,8 @@ case class HashAggregateExec( sorter: UnsafeKVExternalSorter, peakMemory: SQLMetric, spillSize: SQLMetric, - avgHashProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = { + avgHashProbe: SQLMetric, + numTasksFallBacked: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = { // update peak execution memory val mapMemory = hashMap.getPeakMemoryUsedBytes @@ -479,6 +484,7 @@ case class HashAggregateExec( } // merge the final hashMap into sorter + numTasksFallBacked += 1 sorter.merge(hashMap.destructAndCreateExternalSorter()) hashMap.free() val sortedIter = sorter.sortedIterator() @@ -764,9 +770,10 @@ case class HashAggregateExec( val peakMemory = metricTerm(ctx, "peakMemory") val spillSize = metricTerm(ctx, "spillSize") val avgHashProbe = metricTerm(ctx, "avgHashProbe") + val numTasksFallBacked = metricTerm(ctx, "numTasksFallBacked") val finishRegularHashMap = s"$iterTerm = $thisPlan.finishAggregate(" + - s"$hashMapTerm, $sorterTerm, $peakMemory, $spillSize, $avgHashProbe);" + s"$hashMapTerm, $sorterTerm, $peakMemory, $spillSize, $avgHashProbe, $numTasksFallBacked);" val finishHashMap = if (isFastHashMapEnabled) { s""" |$iterTermForFastHashMap = $fastHashMapTerm.rowIterator(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 99358fbf4e94f..0a5e8838e1531 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -93,7 +93,8 @@ class TungstenAggregationIterator( numOutputRows: SQLMetric, peakMemory: SQLMetric, spillSize: SQLMetric, - avgHashProbe: SQLMetric) + avgHashProbe: SQLMetric, + numTasksFallBacked: SQLMetric) extends AggregationIterator( partIndex, groupingExpressions, @@ -277,6 +278,7 @@ class TungstenAggregationIterator( // Step 7: set sortBased to true. sortBased = true + numTasksFallBacked += 1 } /////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index ac9edc4d23e3b..6cf8431f0c546 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -106,10 +106,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils val expected1 = Seq( Map("number of output rows" -> 2L, "avg hash probe bucket list iters" -> - aggregateMetricsPattern), + aggregateMetricsPattern, + "number of tasks fall-backed to sort-based aggregation" -> 0L), Map("number of output rows" -> 1L, "avg hash probe bucket list iters" -> - aggregateMetricsPattern)) + aggregateMetricsPattern, + "number of tasks fall-backed to sort-based aggregation" -> 0L)) val shuffleExpected1 = Map( "records read" -> 2L, "local blocks read" -> 2L, @@ -126,10 +128,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils val expected2 = Seq( Map("number of output rows" -> 4L, "avg hash probe bucket list iters" -> - aggregateMetricsPattern), + aggregateMetricsPattern, + "number of tasks fall-backed to sort-based aggregation" -> 0L), Map("number of output rows" -> 3L, "avg hash probe bucket list iters" -> - aggregateMetricsPattern)) + aggregateMetricsPattern, + "number of tasks fall-backed to sort-based aggregation" -> 0L)) val shuffleExpected2 = Map( "records read" -> 4L, From 0456d2dcc6c7777406c9bbb47fe3e48e4aa9ad10 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 26 May 2021 00:30:52 -0700 Subject: [PATCH 2/2] Address comment to rename metrics --- .../execution/aggregate/HashAggregateExec.scala | 3 +-- .../aggregate/ObjectHashAggregateExec.scala | 3 +-- .../sql/execution/metric/SQLMetricsSuite.scala | 16 ++++++++-------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 2947f2c7321e9..60023af646499 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -68,8 +68,7 @@ case class HashAggregateExec( "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"), "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"), - "numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, - "number of tasks fall-backed to sort-based aggregation")) + "numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks")) // This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash // map and/or the sort-based aggregation once it has processed a given number of input rows. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 559f545dc05ad..fa56bb875a776 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -75,8 +75,7 @@ case class ObjectHashAggregateExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), - "numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, - "number of tasks fall-backed to sort-based aggregation") + "numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks") ) protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 6cf8431f0c546..179f6171464bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -107,11 +107,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils Map("number of output rows" -> 2L, "avg hash probe bucket list iters" -> aggregateMetricsPattern, - "number of tasks fall-backed to sort-based aggregation" -> 0L), + "number of sort fallback tasks" -> 0L), Map("number of output rows" -> 1L, "avg hash probe bucket list iters" -> aggregateMetricsPattern, - "number of tasks fall-backed to sort-based aggregation" -> 0L)) + "number of sort fallback tasks" -> 0L)) val shuffleExpected1 = Map( "records read" -> 2L, "local blocks read" -> 2L, @@ -129,11 +129,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils Map("number of output rows" -> 4L, "avg hash probe bucket list iters" -> aggregateMetricsPattern, - "number of tasks fall-backed to sort-based aggregation" -> 0L), + "number of sort fallback tasks" -> 0L), Map("number of output rows" -> 3L, "avg hash probe bucket list iters" -> aggregateMetricsPattern, - "number of tasks fall-backed to sort-based aggregation" -> 0L)) + "number of sort fallback tasks" -> 0L)) val shuffleExpected2 = Map( "records read" -> 4L, @@ -216,7 +216,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils testSparkPlanMetrics(df2, 1, Map( 2L -> (("ObjectHashAggregate", Map( "number of output rows" -> 4L, - "number of tasks fall-backed to sort-based aggregation" -> 0L))), + "number of sort fallback tasks" -> 0L))), 1L -> (("Exchange", Map( "shuffle records written" -> 4L, "records read" -> 4L, @@ -224,7 +224,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils "remote blocks read" -> 0L))), 0L -> (("ObjectHashAggregate", Map( "number of output rows" -> 3L, - "number of tasks fall-backed to sort-based aggregation" -> 0L)))) + "number of sort fallback tasks" -> 0L)))) ) // 2 partitions and each partition contains 2 keys, with fallback to sort-based aggregation @@ -233,10 +233,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils testSparkPlanMetrics(df3, 1, Map( 2L -> (("ObjectHashAggregate", Map( "number of output rows" -> 4L, - "number of tasks fall-backed to sort-based aggregation" -> 2L))), + "number of sort fallback tasks" -> 2L))), 0L -> (("ObjectHashAggregate", Map( "number of output rows" -> 3L, - "number of tasks fall-backed to sort-based aggregation" -> 1L)))) + "number of sort fallback tasks" -> 1L)))) ) testSparkPlanMetricsWithPredicates(df3, 1, Map( 2L -> (("ObjectHashAggregate", Map(