Skip to content

Commit

Permalink
Add fallback metrics for hash aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
c21 committed May 26, 2021
1 parent af1dba7 commit 638cbf5
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ case class HashAggregateExec(
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"),
"avgHashProbe" ->
SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"))
SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"),
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext,
"number of tasks fall-backed to sort-based aggregation"))

// This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
// map and/or the sort-based aggregation once it has processed a given number of input rows.
Expand All @@ -86,6 +88,7 @@ case class HashAggregateExec(
val spillSize = longMetric("spillSize")
val avgHashProbe = longMetric("avgHashProbe")
val aggTime = longMetric("aggTime")
val numTasksFallBacked = longMetric("numTasksFallBacked")

child.execute().mapPartitionsWithIndex { (partIndex, iter) =>

Expand All @@ -112,7 +115,8 @@ case class HashAggregateExec(
numOutputRows,
peakMemory,
spillSize,
avgHashProbe)
avgHashProbe,
numTasksFallBacked)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
Expand Down Expand Up @@ -460,7 +464,8 @@ case class HashAggregateExec(
sorter: UnsafeKVExternalSorter,
peakMemory: SQLMetric,
spillSize: SQLMetric,
avgHashProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {
avgHashProbe: SQLMetric,
numTasksFallBacked: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {

// update peak execution memory
val mapMemory = hashMap.getPeakMemoryUsedBytes
Expand All @@ -479,6 +484,7 @@ case class HashAggregateExec(
}

// merge the final hashMap into sorter
numTasksFallBacked += 1
sorter.merge(hashMap.destructAndCreateExternalSorter())
hashMap.free()
val sortedIter = sorter.sortedIterator()
Expand Down Expand Up @@ -764,9 +770,10 @@ case class HashAggregateExec(
val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val avgHashProbe = metricTerm(ctx, "avgHashProbe")
val numTasksFallBacked = metricTerm(ctx, "numTasksFallBacked")

val finishRegularHashMap = s"$iterTerm = $thisPlan.finishAggregate(" +
s"$hashMapTerm, $sorterTerm, $peakMemory, $spillSize, $avgHashProbe);"
s"$hashMapTerm, $sorterTerm, $peakMemory, $spillSize, $avgHashProbe, $numTasksFallBacked);"
val finishHashMap = if (isFastHashMapEnabled) {
s"""
|$iterTermForFastHashMap = $fastHashMapTerm.rowIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class TungstenAggregationIterator(
numOutputRows: SQLMetric,
peakMemory: SQLMetric,
spillSize: SQLMetric,
avgHashProbe: SQLMetric)
avgHashProbe: SQLMetric,
numTasksFallBacked: SQLMetric)
extends AggregationIterator(
partIndex,
groupingExpressions,
Expand Down Expand Up @@ -277,6 +278,7 @@ class TungstenAggregationIterator(

// Step 7: set sortBased to true.
sortBased = true
numTasksFallBacked += 1
}

///////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val expected1 = Seq(
Map("number of output rows" -> 2L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern),
aggregateMetricsPattern,
"number of tasks fall-backed to sort-based aggregation" -> 0L),
Map("number of output rows" -> 1L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern))
aggregateMetricsPattern,
"number of tasks fall-backed to sort-based aggregation" -> 0L))
val shuffleExpected1 = Map(
"records read" -> 2L,
"local blocks read" -> 2L,
Expand All @@ -126,10 +128,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val expected2 = Seq(
Map("number of output rows" -> 4L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern),
aggregateMetricsPattern,
"number of tasks fall-backed to sort-based aggregation" -> 0L),
Map("number of output rows" -> 3L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern))
aggregateMetricsPattern,
"number of tasks fall-backed to sort-based aggregation" -> 0L))

val shuffleExpected2 = Map(
"records read" -> 4L,
Expand Down

0 comments on commit 638cbf5

Please sign in to comment.