Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12503] [SQL] Pushing Limit Through Union All #10451

Closed
wants to merge 36 commits into from

Conversation

gatorsmile
Copy link
Member

"Rule that applies to a Limit on top of a Union. The original Limit won't go away after applying this rule, but additional Limit nodes will be created on top of each child of Union, so that these children produce less rows and Limit can be further optimized for children Relations."

– from https://issues.apache.org/jira/browse/CALCITE-832

Also, the same topic in Hive: https://issues.apache.org/jira/browse/HIVE-11775. This has been merged to HIVE.

This PR is for performance improvement. The idea is like predicate pushdown. It can reduce the number of rows processed by Union All.

After the improvement, we can see the changes in the optimized plan:

== Analyzed Logical Plan ==
i: int
Limit 1
+- Union
   :- Project [_1#0 AS i#1]
   :  +- LocalRelation [_1#0], [[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20]]
   +- Project [_1#2 AS i#3]
      +- LocalRelation [_1#2], [[1],[2],[3],[4],[5],[6],[7],[8],[9],[10]]

== Optimized Logical Plan ==
Limit 1
+- Union
   :- Limit 1 <---- extra Limit
   :  +- LocalRelation [i#1], [[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20]]
   +- Limit 1 <---- extra Limit
      +- LocalRelation [i#3], [[1],[2],[3],[4],[5],[6],[7],[8],[9],[10]]

@gatorsmile gatorsmile changed the title [SPARK-12503] [SQL] Pushing Limit Through Union [SPARK-12503] [SQL] Pushing Limit Through Union ALL Dec 23, 2015
@gatorsmile gatorsmile changed the title [SPARK-12503] [SQL] Pushing Limit Through Union ALL [SPARK-12503] [SQL] Pushing Limit Through Union All Dec 23, 2015
@SparkQA
Copy link

SparkQA commented Dec 23, 2015

Test build #48248 has finished for PR 10451 at commit 56fd782.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

@marmbrus @rxin Could you check if this is an appropriate improvement for Spark too?

Thanks!

Limit(exp, left),
Limit(exp, right)
)
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bug exists here. Will fix it soon. Thanks!

@SparkQA
Copy link

SparkQA commented Dec 24, 2015

Test build #48261 has finished for PR 10451 at commit 77105e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

CombineLimits(Limit(exp, left)),
CombineLimits(Limit(exp, right))
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a stop condition, or it will keep pushing Limit forever

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review! Since we call CombineLimits here, we will not add extra Limit in the subsequent iteration. Thus, I think it will cause the plan change. Thus, it will stop automatically, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After think it more, there may be a problem: If left or right is an operator that can push Limit down(currently there is no such operator, but we can't guarantee there won't be). Then every time you push down a Limit here, it will be pushed down further. Thus the CombineLimits can NOT detect that you have already pushed the Limit down, and keeps generating new Limit and pushing it down.

I think we should have a better way to detect whether we have pushed Limit down or not, or add some comments to say that this rule assumes the newly added Limit on top of left and right won't be removed by other optimization rules.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Limit might not converge to the same position after multiple pushdown.

Let me think about it. Thank you!

@SparkQA
Copy link

SparkQA commented Dec 24, 2015

Test build #48299 has finished for PR 10451 at commit 7f25d91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 29, 2015

Test build #48400 has finished for PR 10451 at commit 358d62e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

Thanks for working on this. I think its getting pretty close. A few minor cleanups that might be nice:

  • I think we should consider pulling all the Limit rules into their own LimitPushDown rule. The reasoning here is twofold: we can clearly comment in one central place the requirements with respect to implementing maxRows. It will be easier to turn off if it is ever doing the wrong thing.
  • We should do a pass through and add maxRows to any other logical plans where it make sense. Off the top of my head:
    • Filter = child.maxRows
    • Union = for(leftMax <- left.maxRows; rightMax <- rightMax) yield Add(leftMax, rightMax)
    • Distinct = child.maxRows
    • Aggregate - child.maxRows

// push-down rule that is unable to infer the value of maxRows. Any operator that a Limit can
// be pushed passed should override this function.
case Limit(exp, Union(left, right))
if left.maxRows.isEmpty || right.maxRows.isEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to not check left and right separately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below is the example. If one side has a limit child/descendant, we still can push it down to reduce the number of returned rows.

https://github.com/gatorsmile/spark/blob/unionLimit/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushdownLimitsSuite.scala#L50-L57

* Any operator that a Limit can be pushed passed should override the maxRows function.
*
* Note: This rule has to be done when the logical plan is stable;
* Otherwise, it could impact the other rules.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this means?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we push Limit through Filter, Aggregate and Distinct, the results will be wrong. For example, df.aggregate().limit(1) and df.limit(1).aggregate()will generate the different results.

This statement is true iff we can push Limit through some operators. So far, we did not find any eligible operators except outer/left-outer/right-outer Join and Union. Thus, let me revert them back. Thanks!

@gatorsmile
Copy link
Member Author

After rethinking the Limit push-down rules, we are unable to push Limit through any operator that could change the number of rows or generate the values based on the inputs. Thus, so far, the eligible candidates are Project, Union All and Outer/LeftOuter/RightOuter Join. Please correct me if my understanding is not right.

Feel free to let me know if the codes need an update. Thank you!

// safe to pushdown Limit through it. Once we add UNION DISTINCT, we will not be able to
// pushdown Limit.
case Limit(exp, Union(left, right))
if left.maxRows.isEmpty || right.maxRows.isEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but why not break this into two parts. So that we push to the left when the left is not limited and we push to the right when the right is not limited. Now you push to both sides if either is not limited.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. : )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check the limit value? If the maxRows is larger than the limit we wanna push down, seems it still makes sense to push it down?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that also makes sense. Will do the change after these three running test cases. : )

@SparkQA
Copy link

SparkQA commented Dec 30, 2015

Test build #48435 has finished for PR 10451 at commit ca5c104.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 30, 2015

Test build #48431 has finished for PR 10451 at commit 2823a57.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 30, 2015

Test build #48433 has finished for PR 10451 at commit cfbeea7.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

The latest version covers both cases:

  1. If the children' maxRows has a smaller number, add the extra limit.
  2. If the children' maxRows is None, add the extra limit.

Hopefully, you like the latest implementation. : ) @marmbrus @cloud-fan

@SparkQA
Copy link

SparkQA commented Dec 30, 2015

Test build #48461 has finished for PR 10451 at commit 7cf955f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 30, 2015

Test build #48465 has finished for PR 10451 at commit 7899312.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jan 12, 2016

Let's close the limit push down pull requests. We will need to design this more properly because it is expensive to push down large limits.

@gatorsmile
Copy link
Member Author

Sure, let me close it.

@gatorsmile gatorsmile closed this Jan 12, 2016
asfgit pushed a commit that referenced this pull request Feb 15, 2016
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases:

- If a limit is on top of a `UNION ALL` operator, then a partition-local limit operator will be pushed to each of the union operator's children.
- If a limit is on top of an `OUTER JOIN` then a partition-local limit will be pushed to one side of the join. For `LEFT OUTER` and `RIGHT OUTER` joins, the limit will be pushed to the left and right side, respectively. For `FULL OUTER` join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger.

These optimizations were proposed previously by gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical `Limit` operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the `Limit` operator into separate `LocalLimit` and `GlobalLimit` operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of gatorsmile's patches, with changes and simplifications due to partition-local-limiting.

When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a `maxRows` method to `SparkPlan` which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11121 from JoshRosen/limit-pushdown-2.
@gatorsmile gatorsmile deleted the unionLimit branch August 6, 2016 15:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants