Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12505] [SQL] Pushdown a Limit on top of an Outer-Join #10454

Closed
wants to merge 4 commits into from

Conversation

gatorsmile
Copy link
Member

"Rule that applies to a Limit on top of an OUTER Join. The original Limit won't go away after applying this rule, but additional Limit node(s) will be created on top of the outer-side child (or children if it's a FULL OUTER Join). "
– from https://issues.apache.org/jira/browse/CALCITE-832

Also, the same topic in Hive: https://issues.apache.org/jira/browse/HIVE-11684 This has been merged to HIVE.

This PR is for performance improvement. The idea is like predicate pushdown. It can reduce the number of rows processed by Outer Joins. The performance improvement is significant when the number of rows in Limit is small but the cost of Join is large.

For example,

    val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as('a)
    val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as('b)
    val join = df.join(df2, $"a.int" === $"a.int", "outer").limit(1)

After the improvement, we can see the changes in the optimized plan:

== Analyzed Logical Plan ==
int: int, int2: int, str: string, int: int, int2: int, str: string
Limit 1
+- Join FullOuter, Some((int#3 = int#3))
   :- Subquery a
   :  +- Project [_1#0 AS int#3,_2#1 AS int2#4,_3#2 AS str#5]
   :     +- LocalRelation [_1#0,_2#1,_3#2], [[1,2,1],[3,4,3]]
   +- Subquery b
      +- Project [_1#6 AS int#9,_2#7 AS int2#10,_3#8 AS str#11]
         +- LocalRelation [_1#6,_2#7,_3#8], [[1,3,1],[5,6,5]]

== Optimized Logical Plan ==
Limit 1
+- Join FullOuter, Some((int#3 = int#3))
   :- LocalRelation [int#3,int2#4,str#5], [[1,2,1],[3,4,3]]
   +- Limit 1  <---- extra limit is added
      +- LocalRelation [int#9,int2#10,str#11], [[1,3,1],[5,6,5]]

case _ => f // DO Nothing for the other join types
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we add an extra limit in every iteration ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, but, after this rule, there exists another rule CombineLimits, which can combine the extra limits. I think we should still fix it anyway. Thanks!

@yhuai
Copy link
Contributor

yhuai commented Dec 23, 2015

Can you add an example in the description to illustrate this optimization (and the rational of it)?

@SparkQA
Copy link

SparkQA commented Dec 23, 2015

Test build #48252 has finished for PR 10454 at commit 3779874.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

object PushLimitThroughOuterJoin extends Rule[LogicalPlan] with PredicateHelper {

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Limit(expr, Join(left, right, joinType, joinCondition)) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also can push it down through the Join's Projection.

@SparkQA
Copy link

SparkQA commented Dec 24, 2015

Test build #48263 has finished for PR 10454 at commit 72f73fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case f @ Limit(expr, Join(left, right, joinType, joinCondition)) =>
joinType match {
case RightOuter =>
Limit(expr, Join(left, CombineLimits(Limit(expr, right)), joinType, joinCondition))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a stop condition to stop pushing Limit if it's already pushed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review! Since we call CombineLimits here to combine two consecutive Limits, we will not add extra Limit in the subsequent iteration. Thus, no change will be made in the plan. Thus, RuleExecutor will stop automatically. Am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, it's right, nvm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is right. However I think checking if it is already pushed would reduce unnecessary multiple applying this rule and CombineLimits. But the result should be the same. So it is minor.

@SparkQA
Copy link

SparkQA commented Dec 24, 2015

Test build #48298 has finished for PR 10454 at commit ee91303.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

The new fix contains two parts:

  1. Since the full outer join will remove the duplicates, we are unable to add the extra limit to both sides. As long as we can ensure the completeness of one Child, the generated results will be still correct like the left/right outer join.

  2. There are Projection on top of most Join operators. We also can push through this kind of combination.

Thank you for your reviews and comments! @yhuai @cloud-fan @viirya

@rxin
Copy link
Contributor

rxin commented Jan 12, 2016

@gatorsmile let's close the limit push down pull requests. We will need to design this more properly because it is expensive to push down large limits.

@gatorsmile
Copy link
Member Author

Sure, let me close it.

@gatorsmile gatorsmile closed this Jan 12, 2016
asfgit pushed a commit that referenced this pull request Feb 15, 2016
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases:

- If a limit is on top of a `UNION ALL` operator, then a partition-local limit operator will be pushed to each of the union operator's children.
- If a limit is on top of an `OUTER JOIN` then a partition-local limit will be pushed to one side of the join. For `LEFT OUTER` and `RIGHT OUTER` joins, the limit will be pushed to the left and right side, respectively. For `FULL OUTER` join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger.

These optimizations were proposed previously by gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical `Limit` operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the `Limit` operator into separate `LocalLimit` and `GlobalLimit` operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of gatorsmile's patches, with changes and simplifications due to partition-local-limiting.

When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a `maxRows` method to `SparkPlan` which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11121 from JoshRosen/limit-pushdown-2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants