Skip to content

Commit

Permalink
[SPARK-37001][SQL] Disable two level of map for final hash aggregatio…
Browse files Browse the repository at this point in the history
…n by default

### What changes were proposed in this pull request?

This PR is to disable two level of maps for final hash aggregation by default. The feature was introduced in #32242 and we found it can lead to query performance regression when the final aggregation gets rows with a lot of distinct keys. The 1st level hash map is full so a lot of rows will waste the 1st hash map lookup and inserted into 2nd hash map. This feature still benefits query with not so many distinct keys though, so introducing a config here `spark.sql.codegen.aggregate.final.map.twolevel.enabled`, to allow query to enable the feature when seeing benefit.

### Why are the changes needed?

Fix query regression.

### Does this PR introduce _any_ user-facing change?

Yes, the introduced `spark.sql.codegen.aggregate.final.map.twolevel.enabled` config.

### How was this patch tested?

Existing unit test in `AggregationQuerySuite.scala`.

Also verified generated code for an example query in the file:

```
spark.sql(
    """
      |SELECT key, avg(value)
      |FROM agg1
      |GROUP BY key
    """.stripMargin)
```

Verified the generated code for final hash aggregation not have two level maps by default:
https://gist.github.com/c21/d4ce87ef28a22d1ce839e0cda000ce14 .

Verified the generated code for final hash aggregation have two level maps if enabling the config:
https://gist.github.com/c21/4b59752c1f3f98303b60ccff66b5db69 .

Closes #34270 from c21/agg-fix.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 3354a21)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
c21 authored and cloud-fan committed Oct 14, 2021
1 parent d93d056 commit f886d51
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ case class Grouping(child: Expression) extends Expression with Unevaluable
Examples:
> SELECT name, _FUNC_(), sum(age), avg(height) FROM VALUES (2, 'Alice', 165), (5, 'Bob', 180) people(age, name, height) GROUP BY cube(name, height);
Alice 0 2 165.0
Bob 0 5 180.0
Alice 1 2 165.0
NULL 3 7 172.5
Bob 0 5 180.0
Bob 1 5 180.0
NULL 2 2 165.0
NULL 2 5 180.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ENABLE_TWOLEVEL_AGG_MAP_PARTIAL_ONLY =
buildConf("spark.sql.codegen.aggregate.map.twolevel.partialOnly")
.internal()
.doc("Enable two-level aggregate hash map for partial aggregate only, " +
"because final aggregate might get more distinct keys compared to partial aggregate. " +
"Overhead of looking up 1st-level map might dominate when having a lot of distinct keys.")
.version("3.2.1")
.booleanConf
.createWithDefault(true)

val ENABLE_VECTORIZED_HASH_MAP =
buildConf("spark.sql.codegen.aggregate.map.vectorized.enable")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,14 @@ case class HashAggregateExec(
val isNotByteArrayDecimalType = bufferSchema.map(_.dataType).filter(_.isInstanceOf[DecimalType])
.forall(!DecimalType.isByteArrayDecimalType(_))

isSupported && isNotByteArrayDecimalType
val isEnabledForAggModes =
if (modes.forall(mode => mode == Partial || mode == PartialMerge)) {
true
} else {
!conf.getConf(SQLConf.ENABLE_TWOLEVEL_AGG_MAP_PARTIAL_ONLY)
}

isSupported && isNotByteArrayDecimalType && isEnabledForAggModes
}

private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = {
Expand Down

0 comments on commit f886d51

Please sign in to comment.