Skip to content
Permalink
Browse files

[SPARK-24369][SQL] Correct handling for multiple distinct aggregation…

…s having the same argument set

## What changes were proposed in this pull request?

bring back #21443

This is a different approach: just change the check to count distinct columns with `toSet`

## How was this patch tested?

a new test to verify the planner behavior.

Author: Wenchen Fan <wenchen@databricks.com>
Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21487 from cloud-fan/back.

(cherry picked from commit 416cd1f)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
  • Loading branch information...
cloud-fan authored and gatorsmile committed Jun 4, 2018
1 parent 1819454 commit 36f1d5e17a79bc343d6ea88af2d1fed1f02c132f
@@ -372,9 +372,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

val (functionsWithDistinct, functionsWithoutDistinct) =
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
if (functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our MultipleDistinctRewriter should take care this case.
// column sets. Our `RewriteDistinctAggregates` should take care this case.
sys.error("You hit a query analyzer bug. Please report your query to " +
"Spark user mailing list.")
}
@@ -68,4 +68,8 @@ SELECT 1 from (
FROM (select 1 as x) a
WHERE false
) b
where b.z != b.z
where b.z != b.z;

-- SPARK-24369 multiple distinct aggregations having the same argument set
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y);
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 26
-- Number of queries: 27


-- !query 0
@@ -241,3 +241,12 @@ where b.z != b.z
struct<1:int>
-- !query 25 output



-- !query 26
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
-- !query 26 schema
struct<corr(DISTINCT CAST(x AS DOUBLE), CAST(y AS DOUBLE)):double,corr(DISTINCT CAST(y AS DOUBLE), CAST(x AS DOUBLE)):double,count(1):bigint>
-- !query 26 output
1.0 1.0 3
@@ -69,6 +69,27 @@ class PlannerSuite extends SharedSQLContext {
testPartialAggregationPlan(query)
}

test("mixed aggregates with same distinct columns") {
def assertNoExpand(plan: SparkPlan): Unit = {
assert(plan.collect { case e: ExpandExec => e }.isEmpty)
}

withTempView("v") {
Seq((1, 1.0, 1.0), (1, 2.0, 2.0)).toDF("i", "j", "k").createTempView("v")
// one distinct column
val query1 = sql("SELECT sum(DISTINCT j), max(DISTINCT j) FROM v GROUP BY i")
assertNoExpand(query1.queryExecution.executedPlan)

// 2 distinct columns
val query2 = sql("SELECT corr(DISTINCT j, k), count(DISTINCT j, k) FROM v GROUP BY i")
assertNoExpand(query2.queryExecution.executedPlan)

// 2 distinct columns with different order
val query3 = sql("SELECT corr(DISTINCT j, k), count(DISTINCT k, j) FROM v GROUP BY i")
assertNoExpand(query3.queryExecution.executedPlan)
}
}

test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
def checkPlan(fieldTypes: Seq[DataType]): Unit = {
withTempView("testLimit") {

0 comments on commit 36f1d5e

Please sign in to comment.
You can’t perform that action at this time.