Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12503][SPARK-12505] Limit pushdown in UNION ALL and OUTER JOIN #11121

Closed
wants to merge 16 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases:

  • If a limit is on top of a UNION ALL operator, then a partition-local limit operator will be pushed to each of the union operator's children.
  • If a limit is on top of an OUTER JOIN then a partition-local limit will be pushed to one side of the join. For LEFT OUTER and RIGHT OUTER joins, the limit will be pushed to the left and right side, respectively. For FULL OUTER join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger.

These optimizations were proposed previously by @gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical Limit operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the Limit operator into separate LocalLimit and GlobalLimit operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of @gatorsmile's patches, with changes and simplifications due to partition-local-limiting.

When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a maxRows method to SparkPlan which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion.

@SparkQA
Copy link

SparkQA commented Feb 8, 2016

Test build #50940 has finished for PR 11121 at commit 7b86111.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case RightOuter => join.copy(right = maybePushLimit(exp, right))
case LeftOuter => join.copy(left = maybePushLimit(exp, left))
case FullOuter =>
join.copy(left = maybePushLimit(exp, left), right = maybePushLimit(exp, right))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not right. Please check the original PR. @yhuai and I had a discussion about this issue.

Since the full outer join will remove the duplicates, we are unable to add the extra limit to both sides. As long as we can ensure the completeness of one Child, the generated results will be still correct like the left/right outer join.

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this up now and will add a brief comment summarizing this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Please also update the description of this PR. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one concern about the rule as implemented in your PR:

If we have a full outer join which initially has neither of its children limited and then we push a limit to the side with larger statistics, then a second firing of the LimitPushDown rule would match on one of the cases where only a single side is limited and would push a limit to the other side, leading to the wrong answer because we would have limited both sides.

Therefore, I think we might want to restrict this rule to only fire in the case where a neither side of the full outer join has a pre-existing limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I wonder whether we should check whether maxRows is defined rather than checking whether the outer join's children are Limits, since that frees us from having to reason about whether the limit could be further pushed. On the other hand, if we always leave the original LocalLimit in place then I don't think we currently need to worry about the limit being further pushed down to a point where the child would no longer be a limit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. If one side has a pre-existing limit, we just need to add it in that side. Of course, two adjacent limits can be combined.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maxRows was added for this purpose. This original idea is from @marmbrus .

@JoshRosen
Copy link
Contributor Author

Alright, updated to address the FullOuter bugs. I'm going to take another self-review pass later to make sure that I haven't overlooked anything else.

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, x.join(LocalLimit(1, yBig), FullOuter)).analyze
comparePlans(optimized, correctAnswer)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll also add tests for the cases where both inputs are limited.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more tests now.

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50947 has finished for PR 11121 at commit 9abb38f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Feb 10, 2016

Note: when we merge this, we should remove the triggering of the rule from the optimizer, and only add it back once we have whole-stage codegen for Limit.

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #51045 has finished for PR 11121 at commit d59d2c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #51052 has finished for PR 11121 at commit ad5e40f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

I've updated this to disable the optimizer rule for now (it's still tested in the LimitPushdownSuite, though).

@rxin
Copy link
Contributor

rxin commented Feb 11, 2016

cc @cloud-fan for review

* Any operator that a Limit can be pushed passed should override this function (e.g., Union).
* Any operator that can push through a Limit should override this function (e.g., Project).
*/
def maxRows: Option[Expression] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we going to handle non-literal maxRow in the future? If not, maybe define it as Option[Long] is simpler and better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I experimented with this but ran into problems because the argument of LIMIT can be an expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about returning None if the argument of Limit is non-literal expression?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

It feels to me that this is only useful when we know the value, not if it is some subquery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this interact with the constant-folding rules? Will expressions be maximally constant-folded before this is invoked? Just trying to reason about whether there are any ordering issues here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a def not a lazy val, I think it's fine with constant-folding rules.
BTW using Option[Expression] may be sub-optimal for something like Union, as its maxRow: Some(children.flatMap(_.maxRows).reduce { (a, b) => Add(a, b) }) can't be constant-folded(maxRow is a method) and will always be a non-literal expression, we can't push limit through it.

@cloud-fan
Copy link
Contributor

LGTM except one comment

@JoshRosen
Copy link
Contributor Author

I've updated this patch to change maxRows into an Option[Long] and have added a test to make sure that constant-folding works as expected.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Feb 13, 2016

Test build #51227 has finished for PR 11121 at commit c28343f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

#11171 did a few changes in conversion from logical plan to SQL. I believe this is the cause of build failure.

@SparkQA
Copy link

SparkQA commented Feb 13, 2016

Test build #51234 has finished for PR 11121 at commit 020fafe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

I've updated this to fix compilation after that SQLBuilder change. One quick question: do we have to worry about the logical plan -> SQL conversion being applied to optimized plans? If so, things might get tricky because we now have separate GlobalLimit and LocalLimit logical plan nodes. Because of the unapply method that I defined on the Limit object, we'll continue to match the previous behavior for GlobalLimit(x, LocalLimit(x, child)), so this should continue to work for queries which haven't undergone optimization.

@gatorsmile
Copy link
Member

IMO, in the future, if we need to convert the optimized plans to SQL, we need to add a few rules in SQLBuilder to revert back the changes of some optimization rules. Otherwise, the Parser is unable to parse the generated SQL. I already hit a couple of issues caused by Analyzer rules.

Also cc @liancheng

@rxin
Copy link
Contributor

rxin commented Feb 14, 2016

We could consider adding a partition local limit or some hint at some point in the parser.

cc @hvanhovell

@JoshRosen
Copy link
Contributor Author

To confirm: is merging this patch blocking on anything or can concerns related to converting optimized plans to SQL be addressed in a followup patch?

Would you like me to add explicit test cases for plan -> SQL generation for a variety of queries involving limits?

@cloud-fan
Copy link
Contributor

I think this is good to go, we can defer the "converting optimized plans to SQL" in a followup patch.
cc @liancheng

@rxin
Copy link
Contributor

rxin commented Feb 15, 2016

Yea - I don't think we ever turn optimized plan into sql right now.

@liancheng
Copy link
Contributor

@cloud-fan I agree. For SQL generation, currently we can only focus on resolved plans parsed from HiveQL. @gatorsmile I think after finishing that part, we may gain better knowledge about how to handle arbitrary resolved logical plans. How do you think?

@rxin
Copy link
Contributor

rxin commented Feb 15, 2016

I'm going to merge this in master.

@asfgit asfgit closed this in a8bbc4f Feb 15, 2016
@JoshRosen JoshRosen deleted the limit-pushdown-2 branch February 15, 2016 01:42
@gatorsmile
Copy link
Member

@liancheng I agree. : )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants