Skip to content

Commit

Permalink
Address comment to rename metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
c21 committed May 26, 2021
1 parent 638cbf5 commit 0456d2d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ case class HashAggregateExec(
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"),
"avgHashProbe" ->
SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"),
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext,
"number of tasks fall-backed to sort-based aggregation"))
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks"))

// This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
// map and/or the sort-based aggregation once it has processed a given number of input rows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ case class ObjectHashAggregateExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext,
"number of tasks fall-backed to sort-based aggregation")
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks")
)

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
Map("number of output rows" -> 2L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of tasks fall-backed to sort-based aggregation" -> 0L),
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 1L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of tasks fall-backed to sort-based aggregation" -> 0L))
"number of sort fallback tasks" -> 0L))
val shuffleExpected1 = Map(
"records read" -> 2L,
"local blocks read" -> 2L,
Expand All @@ -129,11 +129,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
Map("number of output rows" -> 4L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of tasks fall-backed to sort-based aggregation" -> 0L),
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 3L,
"avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of tasks fall-backed to sort-based aggregation" -> 0L))
"number of sort fallback tasks" -> 0L))

val shuffleExpected2 = Map(
"records read" -> 4L,
Expand Down Expand Up @@ -216,15 +216,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
testSparkPlanMetrics(df2, 1, Map(
2L -> (("ObjectHashAggregate", Map(
"number of output rows" -> 4L,
"number of tasks fall-backed to sort-based aggregation" -> 0L))),
"number of sort fallback tasks" -> 0L))),
1L -> (("Exchange", Map(
"shuffle records written" -> 4L,
"records read" -> 4L,
"local blocks read" -> 4L,
"remote blocks read" -> 0L))),
0L -> (("ObjectHashAggregate", Map(
"number of output rows" -> 3L,
"number of tasks fall-backed to sort-based aggregation" -> 0L))))
"number of sort fallback tasks" -> 0L))))
)

// 2 partitions and each partition contains 2 keys, with fallback to sort-based aggregation
Expand All @@ -233,10 +233,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
testSparkPlanMetrics(df3, 1, Map(
2L -> (("ObjectHashAggregate", Map(
"number of output rows" -> 4L,
"number of tasks fall-backed to sort-based aggregation" -> 2L))),
"number of sort fallback tasks" -> 2L))),
0L -> (("ObjectHashAggregate", Map(
"number of output rows" -> 3L,
"number of tasks fall-backed to sort-based aggregation" -> 1L))))
"number of sort fallback tasks" -> 1L))))
)
testSparkPlanMetricsWithPredicates(df3, 1, Map(
2L -> (("ObjectHashAggregate", Map(
Expand Down

0 comments on commit 0456d2d

Please sign in to comment.