Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16208][SQL] Add PropagateEmptyRelation optimizer #13906

Closed
wants to merge 11 commits into from
Closed

[SPARK-16208][SQL] Add PropagateEmptyRelation optimizer #13906

wants to merge 11 commits into from

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Jun 25, 2016

What changes were proposed in this pull request?

This PR adds a new logical optimizer, PropagateEmptyRelation, to collapse a logical plans consisting of only empty LocalRelations.

Optimizer Targets

  1. Binary(or Higher)-node Logical Plans
    • Union with all empty children.
    • Join with one or two empty children (including Intersect/Except).
  2. Unary-node Logical Plans
    • Project/Filter/Sample/Join/Limit/Repartition with all empty children.
    • Aggregate with all empty children and without AggregateFunction expressions, COUNT.
    • Generate with Explode because other UserDefinedGenerators like Hive UDTF returns results.

Sample Query

WITH t1 AS (SELECT a FROM VALUES 1 t(a)),
     t2 AS (SELECT b FROM VALUES 1 t(b) WHERE 1=2)
SELECT a,b
FROM t1, t2
WHERE a=b
GROUP BY a,b
HAVING a>1
ORDER BY a,b

Before

scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain
== Physical Plan ==
*Sort [a#0 ASC, b#1 ASC], true, 0
+- Exchange rangepartitioning(a#0 ASC, b#1 ASC, 200)
   +- *HashAggregate(keys=[a#0, b#1], functions=[])
      +- Exchange hashpartitioning(a#0, b#1, 200)
         +- *HashAggregate(keys=[a#0, b#1], functions=[])
            +- *BroadcastHashJoin [a#0], [b#1], Inner, BuildRight
               :- *Filter (isnotnull(a#0) && (a#0 > 1))
               :  +- LocalTableScan [a#0]
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                  +- *Filter (isnotnull(b#1) && (b#1 > 1))
                     +- LocalTableScan <empty>, [b#1]

After

scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having a>1 order by a,b").explain
== Physical Plan ==
LocalTableScan <empty>, [a#0, b#1]

How was this patch tested?

Pass the Jenkins tests (including a new testsuite).

@SparkQA
Copy link

SparkQA commented Jun 25, 2016

Test build #61242 has finished for PR 13906 at commit 7ddf449.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case x if x.isInstanceOf[ObjectProducer] || x.isInstanceOf[ObjectConsumer] => x

// Case 1: If groupingExpressions contains all aggregation expressions, the result is empty.
case a @ Aggregate(ge, ae, child) if isEmptyLocalRelation(child) && ae.forall(ge.contains(_)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this kind of blacklisting approach is too risky -- if we were to introduce a new logical node in the future, most likely we will forget to update this rule.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @rxin .
I see. I will update this PR into whitelist approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, at the first, I thought you meant line 1080 for case p: LogicalPlan,

https://github.com/apache/spark/pull/13906/files#diff-a636a87d8843eeccca90140be91d4fafR1080 .

Did I understand your advice correctly?

@SparkQA
Copy link

SparkQA commented Jun 26, 2016

Test build #61260 has finished for PR 13906 at commit bd39437.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @rxin .
I just remembered this PR while looking your whitelist PR. :)
Any advice for this PR?

import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

class CollapseEmptyPlanSuite extends PlanTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should test something that shouldn't have been converted too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, any other scenario except the existing followings?

  • test("one non-empty local relation")
  • test("one non-empty and one empty local relations")
  • test("aggregating expressions on empty plan")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update to have more.

@dongjoon-hyun
Copy link
Member Author

Anyway, thank you for review again, @rxin !

@SparkQA
Copy link

SparkQA commented Jun 28, 2016

Test build #61363 has finished for PR 13906 at commit c06ae60.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Case 4: The following plans having at least one empty relation return empty results.
case p: LogicalPlan if p.children.exists(isEmptyLocalRelation) =>
p match {
case Join(_, _, Inner, _) | _: Intersect =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this out rather than doing a two level nesting

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

@rxin
Copy link
Contributor

rxin commented Jun 28, 2016

cc @cloud-fan to take a look too.

* SELECT SUM(a) FROM t WHERE 1=0 GROUP BY a HAVING COUNT(*)>1 ORDER BY a (Not optimized)
* }}}
*/
object CollapseEmptyPlan extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you actually move this into a separate file? the optimizer is becoming too large and I want to break it apart soon. No point adding new things in this file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. What name is suitable for this optimizer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just CollapseEmptyPlan.scala is okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

@dongjoon-hyun
Copy link
Member Author

Yep. It's done!

@SparkQA
Copy link

SparkQA commented Jun 28, 2016

Test build #61409 has finished for PR 13906 at commit bd79a14.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

Some more thoughts about it: Currently we only execute Project locally if the underlying relation is LocalRelation. Is it possible that we can also execute Aggregate locally? If it's too hard I'm ok to handle empty relation first.

@dongjoon-hyun
Copy link
Member Author

Thank you, @cloud-fan .
It seems to be a good idea to handle operators on LocalRelations.
But, if possible, may I dig that on another PR? :)

@cloud-fan
Copy link
Contributor

I'm not sure if this optimization is useful

  1. empty LocalRelation is a corner case and seems not worth to optimize.
  2. the optimization rule in this PR is kind of complex.
  3. if we have better handling for LocalRelation in the futuren(like the LocalNode), this rule will become useless.

cc @marmbrus @yhuai

@dongjoon-hyun
Copy link
Member Author

Sounds interesting. You mean LocalNode that computes all local node operators on LocalRelation, right?

@dongjoon-hyun
Copy link
Member Author

It sounds promising. Maybe, Spark 2.1?

@dongjoon-hyun
Copy link
Member Author

By the way, for complexity, it's 23 line optimizer without blank/comments. In fact, it's less than NullPropagation.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Jun 29, 2016

For 3, I respect your opinion. I just make another commit for 2. Now, it's 19 lines.

@SparkQA
Copy link

SparkQA commented Jun 29, 2016

Test build #61449 has finished for PR 13906 at commit 4d937dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 29, 2016

Test build #61462 has finished for PR 13906 at commit 4bc2452.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


case p: LogicalPlan if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) =>
p match {
case _: Project | _: Generate | _: Filter | _: Sample | _: Join |
Copy link
Contributor

@liancheng liancheng Jun 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually Generate can't be included here. Our Generate also supports Hive style UDTF, which has a weird semantics: for a UDTF f, after all rows being processed, f.close() will be called, and more rows can be generated within f.close(). This means a UDTF may generate one or more rows even if the underlying input is empty.

See here and PR #5338 for more details.

@liancheng
Copy link
Contributor

My feeling is that, this optimization rule is mostly useful for binary plan nodes like inner join and intersection, where we can avoid scanning output of the non-empty side.

On the other hand, for unary plan nodes, firstly it doesn't bring much performance benefits, especially when whole stage codegen is enabled; secondly there are non-obvious and tricky corner cases, like Aggregate and Generate.

That said, although this patch is not a big one, it does introduce non-trivial complexities. For example, I didn't immediately realize that why Aggregate must be special cased at first (COUNT(x) may return 0 for empty input). The Generate case is even trickier.

So my suggestion is to only implement this rule for inner join and intersection, which are much simpler to handle. what do you think?

*/
object CollapseEmptyPlan extends Rule[LogicalPlan] with PredicateHelper {
private def isEmptyLocalRelation(plan: LogicalPlan): Boolean =
plan.isInstanceOf[LocalRelation] && plan.asInstanceOf[LocalRelation].data.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan match {
  case p: LocalRelation => p.data.isEmpty
  case _ => false
}

@cloud-fan
Copy link
Contributor

@liancheng , I think we still need to keep some simple rules for unary node, which also helps the binary cases, as the empty relation is propagated up.

@liancheng
Copy link
Contributor

@cloud-fan Yea, that's a good point.

@dongjoon-hyun
Copy link
Member Author

Thank you so much, @liancheng and @cloud-fan !
I indeed focused on only UnaryNode plans before.
Now I've learned your view points, and reconsider BinaryNode logical plans, mainly Join.

I update the code and PR description according to the comments.

  • Add more Join optimizations. (@liancheng)
    (Intersect is considered in LeftSemi by ReplaceIntersectWithSemiJoin)
  • Exclude all Generator except Explode. (@liancheng)
  • Handle SELECT col + 1 … GROUP BY col. (@cloud-fan)
  • Change isEmptyLocalRelation function to use pattern match. (@liancheng)

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61581 has finished for PR 13906 at commit 7ab1725.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Oh, please hold on merging this PR. Scala 2.10 reports some errors. I'll inform you again.

@dongjoon-hyun
Copy link
Member Author

It's the exact same Scala 2.10 match reachability build error at previous fda51f1 .

The solution is the same. I split the case statements.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61598 has finished for PR 13906 at commit 984854b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @rxin , @cloud-fan , @liancheng . It's ready for merging again.
Thank you so much for being together with this PR!

case p: Union if p.children.forall(isEmptyLocalRelation) =>
empty(p)

case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry to be so nit-pick, but it seems better to separate the join case, e.g.

case p @ Join(_, _, Inner, _) if p.children.exists(isEmptyLocalRelation) => empty(p)
case p @ Join(_, _, LeftOuter | LeftSemi | LeftAnti, _) if isEmptyLocalRelation(p.left) => empty(p)
...

@asfgit asfgit closed this in c553976 Jul 1, 2016
@liancheng
Copy link
Contributor

Merged to master.

@cloud-fan Sorry that I didn't notice your comment while merging it. We may address it in follow-up ones.

@dongjoon-hyun
Copy link
Member Author

Thank you for merging, @liancheng , @cloud-fan , and @rxin .

@JoshRosen
Copy link
Contributor

I found a bug in the part of this rule which deals with Aggregate; see #17929 for the fix.

ghost pushed a commit to dbtsai/spark that referenced this pull request May 10, 2017
…te without grouping

## What changes were proposed in this pull request?

The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate without a group-by and thus should return a single row. However, Spark incorrectly returns zero rows.

This is caused by SPARK-16208 / apache#13906, a patch which added an optimizer rule to propagate EmptyRelation through operators. The logic for handling aggregates is wrong: it checks whether aggregate expressions are non-empty for deciding whether the output should be empty, whereas it should be checking grouping expressions instead:

An aggregate with non-empty grouping expression will return one output row per group. If the input to the grouped aggregate is empty then all groups will be empty and thus the output will be empty. It doesn't matter whether the aggregation output columns include aggregate expressions since that won't affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization opportunity by not propagating EmptyRelation in the case where a grouped aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to consider only the presence/absence of grouping expressions, not the aggregate functions themselves, when deciding whether to propagate EmptyRelation.

## How was this patch tested?

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#17929 from JoshRosen/fix-PropagateEmptyRelation.
asfgit pushed a commit that referenced this pull request May 10, 2017
…te without grouping

## What changes were proposed in this pull request?

The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate without a group-by and thus should return a single row. However, Spark incorrectly returns zero rows.

This is caused by SPARK-16208 / #13906, a patch which added an optimizer rule to propagate EmptyRelation through operators. The logic for handling aggregates is wrong: it checks whether aggregate expressions are non-empty for deciding whether the output should be empty, whereas it should be checking grouping expressions instead:

An aggregate with non-empty grouping expression will return one output row per group. If the input to the grouped aggregate is empty then all groups will be empty and thus the output will be empty. It doesn't matter whether the aggregation output columns include aggregate expressions since that won't affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization opportunity by not propagating EmptyRelation in the case where a grouped aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to consider only the presence/absence of grouping expressions, not the aggregate functions themselves, when deciding whether to propagate EmptyRelation.

## How was this patch tested?

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17929 from JoshRosen/fix-PropagateEmptyRelation.

(cherry picked from commit a90c5cd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
asfgit pushed a commit that referenced this pull request May 10, 2017
…te without grouping

The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate without a group-by and thus should return a single row. However, Spark incorrectly returns zero rows.

This is caused by SPARK-16208 / #13906, a patch which added an optimizer rule to propagate EmptyRelation through operators. The logic for handling aggregates is wrong: it checks whether aggregate expressions are non-empty for deciding whether the output should be empty, whereas it should be checking grouping expressions instead:

An aggregate with non-empty grouping expression will return one output row per group. If the input to the grouped aggregate is empty then all groups will be empty and thus the output will be empty. It doesn't matter whether the aggregation output columns include aggregate expressions since that won't affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization opportunity by not propagating EmptyRelation in the case where a grouped aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to consider only the presence/absence of grouping expressions, not the aggregate functions themselves, when deciding whether to propagate EmptyRelation.

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17929 from JoshRosen/fix-PropagateEmptyRelation.

(cherry picked from commit a90c5cd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
…te without grouping

## What changes were proposed in this pull request?

The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate without a group-by and thus should return a single row. However, Spark incorrectly returns zero rows.

This is caused by SPARK-16208 / apache#13906, a patch which added an optimizer rule to propagate EmptyRelation through operators. The logic for handling aggregates is wrong: it checks whether aggregate expressions are non-empty for deciding whether the output should be empty, whereas it should be checking grouping expressions instead:

An aggregate with non-empty grouping expression will return one output row per group. If the input to the grouped aggregate is empty then all groups will be empty and thus the output will be empty. It doesn't matter whether the aggregation output columns include aggregate expressions since that won't affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization opportunity by not propagating EmptyRelation in the case where a grouped aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to consider only the presence/absence of grouping expressions, not the aggregate functions themselves, when deciding whether to propagate EmptyRelation.

## How was this patch tested?

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#17929 from JoshRosen/fix-PropagateEmptyRelation.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
…te without grouping

The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate without a group-by and thus should return a single row. However, Spark incorrectly returns zero rows.

This is caused by SPARK-16208 / apache#13906, a patch which added an optimizer rule to propagate EmptyRelation through operators. The logic for handling aggregates is wrong: it checks whether aggregate expressions are non-empty for deciding whether the output should be empty, whereas it should be checking grouping expressions instead:

An aggregate with non-empty grouping expression will return one output row per group. If the input to the grouped aggregate is empty then all groups will be empty and thus the output will be empty. It doesn't matter whether the aggregation output columns include aggregate expressions since that won't affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization opportunity by not propagating EmptyRelation in the case where a grouped aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to consider only the presence/absence of grouping expressions, not the aggregate functions themselves, when deciding whether to propagate EmptyRelation.

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#17929 from JoshRosen/fix-PropagateEmptyRelation.

(cherry picked from commit a90c5cd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants