Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38505][SQL] Make partial aggregation adaptive #35806

Closed
wants to merge 6 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Mar 10, 2022

What changes were proposed in this pull request?

We can skip do partial aggregation to avoid spilling if this step does not reduce the number of rows too much.

By setting spark.sql.aggregate.adaptivePartialAggregationThreshold to 0 this feature can be disabled.

Why are the changes needed?

Improve partial aggregation phase performance and we can implement these 2 features after this PR:

  1. SPARK-36245: Partial deduplicate the right side of left semi/anti join
  2. SPARK-38506: Push partial aggregation through join

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and TPC-H 5T benchmark test.

SQL Before this PR(Seconds) After this PR(Seconds)
q15 66  51
q17 86 80
q18 129 122

@wangyum wangyum marked this pull request as draft March 10, 2022 15:18
@github-actions github-actions bot added the SQL label Mar 10, 2022
@wangyum
Copy link
Member Author

wangyum commented Mar 15, 2022

Another benchmark:

import org.apache.spark.benchmark.Benchmark
val numRows = 1024 * 1024 * 50

spark.sql(s"CREATE TABLE t1 using parquet AS SELECT id  AS a, id % ${numRows / 10000} AS b, id % ${numRows / 10000} AS c, id AS d FROM range(1, ${numRows}L, 1, 10)")
val benchmark = new Benchmark("Benchmark WholeStageCodegenExec", numRows, minNumIters = 2)


Seq(0, 10000).foreach { threshold =>
  benchmark.addCase(s"SELECT a, c, sum(b), sum(d) FROM t1 where a > 100 group by a, c and partialAggThreshold=$threshold") { _ =>
    withSQLConf("spark.sql.aggregate.adaptivePartialAggregationThreshold" -> threshold.toString) {
      spark.sql("SELECT a, c, sum(b), sum(d) FROM t1 where a > 100 group by a, c").write.format("noop").mode("Overwrite").save()
    }
  }
}

benchmark.run()
Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark WholeStageCodegenExec:                                                               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT a, c, sum(b), sum(d) FROM t1 where a > 100 group by a, c and partialAggThreshold=0              56519          57012         697          0.9        1078.0       1.0X
SELECT a, c, sum(b), sum(d) FROM t1 where a > 100 group by a, c and partialAggThreshold=10000          41908          42369         653          1.3         799.3       1.3X

@wangyum
Copy link
Member Author

wangyum commented Mar 16, 2022

Before this PR After this PR
image image

@wangyum wangyum changed the title [WIP][SPARK-38505][SQL] Make partial aggregation adaptive [SPARK-38505][SQL] Make partial aggregation adaptive Mar 16, 2022
@wangyum wangyum marked this pull request as ready for review March 16, 2022 03:04
|if (!$skipPartialAggregateTerm) {
| if ($numberOfConsumedTerm == ${conf.adaptivePartialAggregationThreshold}) {
| $numberOfKeys
| if ((double) $numberOfConsumedKeysTerm / (double) $numberOfConsumedTerm > 0.85) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this 0.85 as configurable via conf, like it's done in trino, this can add value to end user giving him more control

https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java#L790-L795

}
s"""
|if (!$skipPartialAggregateTerm) {
| if ($numberOfConsumedTerm == ${conf.adaptivePartialAggregationThreshold}) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this :

Suggested change
| if ($numberOfConsumedTerm == ${conf.adaptivePartialAggregationThreshold}) {
| if ($numberOfConsumedTerm >= ${conf.adaptivePartialAggregationThreshold}) {

As let's say we made conf.adaptivePartialAggregationThreshold as 10K, at that point % was 0.84 but on processing more let's say 2K rows this increases to 0.85, we will never disable our partial Agg.

As per my understanding : this will be in-sync with trino impl as well.
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationController.java#L63-L67

.checkValue(threshold => threshold >= 0 && threshold < (1 << 16),
"The threshold value must be bigger than or equal to 0 and less than " +
s"1 << ${FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT.key}.")
.createWithDefault(10000)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, any reason that the default value of adaptivePartialAggregationThreshold is 10,000? In Trino the default is 100,000

Comment on lines +162 to +171
child
.collectUntil(p => p.isInstanceOf[WholeStageCodegenExec] ||
!p.isInstanceOf[CodegenSupport] ||
p.isInstanceOf[LeafExecNode]).forall {
case _: ProjectExec | _: FilterExec | _: ColumnarToRowExec => true
case _: SerializeFromObjectExec => true
case _: InputAdapter => true
// HashAggregateExec, ExpandExec, SortMergeJoinExec ...
case _ => false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to limit child node types? Is this for a performance reason? If so, after nodes like aggregate, join, and expand, skipping partial aggregate may still benefit performance. Why isn’t adaptivePartialAggregationThreshold good enough?

Comment on lines +914 to +918
val numberOfKeys = if (fastHashMapTerm != null) {
s"$numberOfConsumedKeysTerm = $fastHashMapTerm.getNumKeys();"
} else if (hashMapTerm != null) {
s"$numberOfConsumedKeysTerm = $hashMapTerm.getNumKeys();"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When fastHashMap is enabled, this implementation doesn’t count keys in regular hashMap. However, even when number of processed keys is less than fastHashMap capacity, the regularHashMap seems can still be non-empty when the fastHashMap
1. unable to find group key within maxSteps (maxSteps = 2)
2. run out of max supported page size.

So even though adaptivePartialAggregationThreshold must be smaller than fastHashMap capacity, it's still possible that both fastHashMap and regular hashMap are non-empty. A potential fix can be numberOfKeys = fastHashMap.getNumKeys() + hashMap.getNumKeys(), and don't enable skipPartialAggregate once the aggregate spills. With this fix we can also remove the adaptivePartialAggregationThreshold < fastHashMap's capacity check

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 26, 2022
@github-actions github-actions bot closed this Sep 27, 2022
@DenineLu
Copy link

DenineLu commented Dec 5, 2022

I was interested in working on this, but I tested it with an online production task and found that the performance was regressing. Even though the aggregation time is shortened, the whole stage is more time consuming. Have you encountered this situation please?

@YuzhouSun
Copy link

I was interested in working on this, but I tested it with an online production task and found that the performance was regressing. Even though the aggregation time is shortened, the whole stage is more time consuming. Have you encountered this situation please?

Hi @DenineLu, just curious, if possible, could you share more details about the regression case you encountered, please? For example, how much aggregation time is shortened, how much longer is the whole stage, number of partial aggregate input rows and number of output rows with and without the optimization, etc. Thank you.

BTW the author moved related changes to a newer and larger PR: #36552 (for SPARK-38506
Push partial aggregation through join)

@DenineLu
Copy link

DenineLu commented Dec 7, 2022

Thanks for your reply, I have provided two graphs which show that the HashAgg node shortens the execution time, but the whole stage becomes slower. I guess maybe because of this code, HashAgg's time statistics are not accurate anymore.

|if (!$initAgg) {
|  $initAgg = true;
|  $createFastHashMap
|  $addHookToCloseFastHashMap
|  $hashMapTerm = $thisPlan.createHashMap();
|  long $beforeAgg = System.nanoTime();
|  $doAggFuncName();
|  $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
|  $shouldStopCheckCode
|}
|// This function cannot be counted in time, but will be executed frequently
|$childDoAgg

not_refine

The top picture does not add this patch, the bottom picture adds this patch.

refine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants