Skip to content

Commit

Permalink
[SPARK-20686][SQL] PropagateEmptyRelation incorrectly handles aggrega…
Browse files Browse the repository at this point in the history
…te without grouping

## What changes were proposed in this pull request?

The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate without a group-by and thus should return a single row. However, Spark incorrectly returns zero rows.

This is caused by SPARK-16208 / #13906, a patch which added an optimizer rule to propagate EmptyRelation through operators. The logic for handling aggregates is wrong: it checks whether aggregate expressions are non-empty for deciding whether the output should be empty, whereas it should be checking grouping expressions instead:

An aggregate with non-empty grouping expression will return one output row per group. If the input to the grouped aggregate is empty then all groups will be empty and thus the output will be empty. It doesn't matter whether the aggregation output columns include aggregate expressions since that won't affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization opportunity by not propagating EmptyRelation in the case where a grouped aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to consider only the presence/absence of grouping expressions, not the aggregate functions themselves, when deciding whether to propagate EmptyRelation.

## How was this patch tested?

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17929 from JoshRosen/fix-PropagateEmptyRelation.

(cherry picked from commit a90c5cd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
JoshRosen authored and cloud-fan committed May 10, 2017
1 parent 7b6f3a1 commit ef50a95
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._
* - Join with one or two empty children (including Intersect/Except).
* 2. Unary-node Logical Plans
* - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
* - Aggregate with all empty children and without AggregateFunction expressions like COUNT.
* - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
*/
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
Expand All @@ -39,10 +38,6 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
case _ => false
}

private def containsAggregateExpression(e: Expression): Boolean = {
e.collectFirst { case _: AggregateFunction => () }.isDefined
}

private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty)

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
Expand All @@ -68,8 +63,13 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
case _: LocalLimit => empty(p)
case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p)
// AggregateExpressions like COUNT(*) return their results like 0.
case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) => empty(p)
// An aggregate with non-empty group expression will return one output row per group when the
// input to the aggregate is not empty. If the input to the aggregate is empty then all groups
// will be empty and thus the output will be empty.
//
// If the grouping expressions are empty, however, then the aggregate will always produce a
// single output row and thus we cannot propagate the EmptyRelation.
case Aggregate(ge, _, _) if ge.nonEmpty => empty(p)
// Generators like Hive-style UDTF may return their records within `close`.
case Generate(_: Explode, _, _, _, _, _) => empty(p)
case _ => p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer.analyze)
}

test("propagate empty relation through Aggregate without aggregate function") {
test("propagate empty relation through Aggregate with grouping expressions") {
val query = testRelation1
.where(false)
.groupBy('a)('a, ('a + 1).as('x))
Expand All @@ -153,13 +153,13 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("don't propagate empty relation through Aggregate with aggregate function") {
test("don't propagate empty relation through Aggregate without grouping expressions") {
val query = testRelation1
.where(false)
.groupBy('a)(count('a))
.groupBy()()

val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze
val correctAnswer = LocalRelation('a.int).groupBy()().analyze

comparePlans(optimized, correctAnswer)
}
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/group-by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ set spark.sql.groupByAliases=false;

-- Check analysis exceptions
SELECT a AS k, COUNT(b) FROM testData GROUP BY k;

-- Aggregate with empty input and non-empty GroupBy expressions.
SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a;

-- Aggregate with empty input and empty GroupBy expressions.
SELECT COUNT(1) FROM testData WHERE false;
SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t;
26 changes: 25 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/group-by.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 22
-- Number of queries: 25


-- !query 0
Expand Down Expand Up @@ -203,3 +203,27 @@ struct<>
-- !query 21 output
org.apache.spark.sql.AnalysisException
cannot resolve '`k`' given input columns: [a, b]; line 1 pos 47


-- !query 22
SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a
-- !query 22 schema
struct<a:int,count(1):bigint>
-- !query 22 output



-- !query 23
SELECT COUNT(1) FROM testData WHERE false
-- !query 23 schema
struct<count(1):bigint>
-- !query 23 output
0


-- !query 24
SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t
-- !query 24 schema
struct<1:int>
-- !query 24 output
1

0 comments on commit ef50a95

Please sign in to comment.