Skip to content

Commit

Permalink
[SPARK-24369][SQL] Correct handling for multiple distinct aggregation…
Browse files Browse the repository at this point in the history
…s having the same argument set

## What changes were proposed in this pull request?
This pr fixed an issue when having multiple distinct aggregations having the same argument set, e.g.,
```
scala>: paste
val df = sql(
  s"""SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
     | FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
   """.stripMargin)

java.lang.RuntimeException
You hit a query analyzer bug. Please report your query to Spark user mailing list.
```
The root cause is that `RewriteDistinctAggregates` can't detect multiple distinct aggregations if they have the same argument set. This pr modified code so that `RewriteDistinctAggregates` could count the number of aggregate expressions with `isDistinct=true`.

## How was this patch tested?
Added tests in `DataFrameAggregateSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21443 from maropu/SPARK-24369.
  • Loading branch information
maropu authored and cloud-fan committed May 30, 2018
1 parent 9e7bad0 commit 1e46f92
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Extract distinct aggregate expressions.
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
val distincgAggExpressions = aggExpressions.filter(_.isDistinct)
val distinctAggGroups = distincgAggExpressions.groupBy { e =>
val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
if (unfoldableChildren.nonEmpty) {
// Only expand the unfoldable children
Expand All @@ -132,7 +133,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Aggregation strategy can handle queries with a single distinct group.
if (distinctAggGroups.size > 1) {
if (distincgAggExpressions.size > 1) {
// Create the attributes for the grouping id and the group by clause.
val gid = AttributeReference("gid", IntegerType, nullable = false)()
val groupByMap = a.groupingExpressions.collect {
Expand All @@ -151,7 +152,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Setup unique distinct aggregate children.
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq
val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair)
val distinctAggChildAttrs = distinctAggChildAttrMap.map(_._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our MultipleDistinctRewriter should take care this case.
// column sets. Our `RewriteDistinctAggregates` should take care this case.
sys.error("You hit a query analyzer bug. Please report your query to " +
"Spark user mailing list.")
}
Expand Down
6 changes: 5 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/group-by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ SELECT 1 from (
FROM (select 1 as x) a
WHERE false
) b
where b.z != b.z
where b.z != b.z;

-- SPARK-24369 multiple distinct aggregations having the same argument set
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y);
11 changes: 10 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/group-by.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 26
-- Number of queries: 27


-- !query 0
Expand Down Expand Up @@ -241,3 +241,12 @@ where b.z != b.z
struct<1:int>
-- !query 25 output



-- !query 26
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
-- !query 26 schema
struct<corr(DISTINCT CAST(x AS DOUBLE), CAST(y AS DOUBLE)):double,corr(DISTINCT CAST(y AS DOUBLE), CAST(x AS DOUBLE)):double,count(1):bigint>
-- !query 26 output
1.0 1.0 3

0 comments on commit 1e46f92

Please sign in to comment.