Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35529][SQL] Add fallback metrics for hash aggregate #32671

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ case class HashAggregateExec(
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"),
"avgHashProbe" ->
SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"))
SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"),
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks"))

// This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
// map and/or the sort-based aggregation once it has processed a given number of input rows.
Expand All @@ -86,6 +87,7 @@ case class HashAggregateExec(
val spillSize = longMetric("spillSize")
val avgHashProbe = longMetric("avgHashProbe")
val aggTime = longMetric("aggTime")
val numTasksFallBacked = longMetric("numTasksFallBacked")

child.execute().mapPartitionsWithIndex { (partIndex, iter) =>

Expand All @@ -112,7 +114,8 @@ case class HashAggregateExec(
numOutputRows,
peakMemory,
spillSize,
avgHashProbe)
avgHashProbe,
numTasksFallBacked)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
Expand Down Expand Up @@ -460,7 +463,8 @@ case class HashAggregateExec(
sorter: UnsafeKVExternalSorter,
peakMemory: SQLMetric,
spillSize: SQLMetric,
avgHashProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {
avgHashProbe: SQLMetric,
numTasksFallBacked: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {

// update peak execution memory
val mapMemory = hashMap.getPeakMemoryUsedBytes
Expand All @@ -479,6 +483,7 @@ case class HashAggregateExec(
}

// merge the final hashMap into sorter
numTasksFallBacked += 1
sorter.merge(hashMap.destructAndCreateExternalSorter())
hashMap.free()
val sortedIter = sorter.sortedIterator()
Expand Down Expand Up @@ -764,9 +769,10 @@ case class HashAggregateExec(
val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val avgHashProbe = metricTerm(ctx, "avgHashProbe")
val numTasksFallBacked = metricTerm(ctx, "numTasksFallBacked")

val finishRegularHashMap = s"$iterTerm = $thisPlan.finishAggregate(" +
s"$hashMapTerm, $sorterTerm, $peakMemory, $spillSize, $avgHashProbe);"
s"$hashMapTerm, $sorterTerm, $peakMemory, $spillSize, $avgHashProbe, $numTasksFallBacked);"
val finishHashMap = if (isFastHashMapEnabled) {
s"""
|$iterTermForFastHashMap = $fastHashMapTerm.rowIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ case class ObjectHashAggregateExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext,
"number of tasks fall-backed to sort-based aggregation")
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks")
)

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class TungstenAggregationIterator(
numOutputRows: SQLMetric,
peakMemory: SQLMetric,
spillSize: SQLMetric,
avgHashProbe: SQLMetric)
avgHashProbe: SQLMetric,
numTasksFallBacked: SQLMetric)
extends AggregationIterator(
partIndex,
groupingExpressions,
Expand Down Expand Up @@ -277,6 +278,7 @@ class TungstenAggregationIterator(

// Step 7: set sortBased to true.
sortBased = true
numTasksFallBacked += 1
}

///////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val expected1 = Seq(
Map("number of output rows" -> 2L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern),
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 1L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern))
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L))
val shuffleExpected1 = Map(
"records read" -> 2L,
"local blocks read" -> 2L,
Expand All @@ -126,10 +128,12 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val expected2 = Seq(
Map("number of output rows" -> 4L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern),
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 3L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern))
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L))

val shuffleExpected2 = Map(
"records read" -> 4L,
Expand Down Expand Up @@ -212,15 +216,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
testSparkPlanMetrics(df2, 1, Map(
2L -> (("ObjectHashAggregate", Map(
"number of output rows" -> 4L,
"number of tasks fall-backed to sort-based aggregation" -> 0L))),
"number of sort fallback tasks" -> 0L))),
1L -> (("Exchange", Map(
"shuffle records written" -> 4L,
"records read" -> 4L,
"local blocks read" -> 4L,
"remote blocks read" -> 0L))),
0L -> (("ObjectHashAggregate", Map(
"number of output rows" -> 3L,
"number of tasks fall-backed to sort-based aggregation" -> 0L))))
"number of sort fallback tasks" -> 0L))))
)

// 2 partitions and each partition contains 2 keys, with fallback to sort-based aggregation
Expand All @@ -229,10 +233,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
testSparkPlanMetrics(df3, 1, Map(
2L -> (("ObjectHashAggregate", Map(
"number of output rows" -> 4L,
"number of tasks fall-backed to sort-based aggregation" -> 2L))),
"number of sort fallback tasks" -> 2L))),
0L -> (("ObjectHashAggregate", Map(
"number of output rows" -> 3L,
"number of tasks fall-backed to sort-based aggregation" -> 1L))))
"number of sort fallback tasks" -> 1L))))
)
testSparkPlanMetricsWithPredicates(df3, 1, Map(
2L -> (("ObjectHashAggregate", Map(
Expand Down