Skip to content

Commit

Permalink
[SPARK-32761][SQL] Allow aggregating multiple foldable distinct expre…
Browse files Browse the repository at this point in the history
…ssions

### What changes were proposed in this pull request?
For queries with multiple foldable distinct columns, since they will be eliminated during
execution, it's not mandatory to let `RewriteDistinctAggregates` handle this case. And
in the current code, `RewriteDistinctAggregates` *dose* miss some "aggregating with
multiple foldable distinct expressions" cases.
For example: `select count(distinct 2), count(distinct 2, 3)` will be missed.

But in the planner, this will trigger an error that "multiple distinct expressions" are not allowed.
As the foldable distinct columns can be eliminated finally, we can allow this in the aggregation
planner check.

### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added test case

Closes #29607 from linhongliu-db/SPARK-32761.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
linhongliu-db authored and cloud-fan committed Sep 1, 2020
1 parent fea9360 commit a410658
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
Expand Up @@ -432,7 +432,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

val (functionsWithDistinct, functionsWithoutDistinct) =
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length > 1) {
if (functionsWithDistinct.map(
_.aggregateFunction.children.filterNot(_.foldable).toSet).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our `RewriteDistinctAggregates` should take care this case.
sys.error("You hit a query analyzer bug. Please report your query to " +
Expand Down Expand Up @@ -463,7 +464,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// to be [COUNT(DISTINCT foo), MAX(DISTINCT foo)], but
// [COUNT(DISTINCT bar), COUNT(DISTINCT foo)] is disallowed because those two distinct
// aggregates have different column expressions.
val distinctExpressions = functionsWithDistinct.head.aggregateFunction.children
val distinctExpressions =
functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable)
val normalizedNamedDistinctExpressions = distinctExpressions.map { e =>
// Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here
// because `distinctExpressions` is not extracted during logical phase.
Expand Down
Expand Up @@ -2546,6 +2546,10 @@ class DataFrameSuite extends QueryTest
val df = Seq(Double.NaN).toDF("d")
checkAnswer(df.selectExpr("ln(d)"), Row(Double.NaN))
}

test("SPARK-32761: aggregating multiple distinct CONSTANT columns") {
checkAnswer(sql("select count(distinct 2), count(distinct 2,3)"), Row(1, 1))
}
}

case class GroupByKey(a: Int, b: Int)

0 comments on commit a410658

Please sign in to comment.