Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12505] [SQL] Pushdown a Limit on top of an Outer-Join #10454

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ object DefaultOptimizer extends Optimizer {
PushPredicateThroughProject,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
PushLimitThroughOuterJoin,
ColumnPruning,
// Operator combine
ProjectCollapsing,
Expand Down Expand Up @@ -857,6 +858,30 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}

/**
* Push [[Limit]] operators through [[Join]] operators, iff the join type is outer joins.
* Adding extra [[Limit]] operators on top of the outer-side child/children.
*/
object PushLimitThroughOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we will generate wrong results. It is not safe to push down the limit if we are not joining on the foreign key with the primary key, right? For example, for a left outer join, we push down the limit to the right table. It is possible all rows returned by the right side are having the same join column value.

am I missing anything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For left outer joins, we only push down the limit to the left table. Thus, I think it should be safe?

Basically, the rule is to add additional Limit node(s) on top of the outer-side child/children

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry. I was looking at the wrong line. For full outer join, is it safe? Also, with the limit, the result of left/right outer may not be deterministic, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. Actually, for left/right outer join, what will happen if we have A left outer join B on (A.key = B.key) sort by A.key limit 10?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a sort, the plan is different. We will have a Sort below Limit. Thus, it is not applicable to this rule.

For the full-outer join, I think the result is still correct, but the result might not be the same as the original plan.

In the new plan, we add extra limit. If the nodes below limit is deterministic, the result should be deterministic. If not deterministic, the result will be not deterministic. Right? Thus, this rule will not change the results' deterministic property. Sorry, this part is unclear to me. I am not sure my answer is correct.

Based on my understanding, when we use limit, the users will not expect a deterministic results, unless they use order by.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I found a hole. full outer = union (left outer, right outer). We are unable to add extra limit below Union, which removes the duplicates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, we have the a table A having 1, 2, 3, 4, 5 (say k is the column name). If we do A x FULL OUTER JOIN A y ON (x.k = y.k) limit 2 and we push limit to both side, it is possible that we get 1,2 from the left side and 3, 4 from the right side, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update the code and fix the bug. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the full outer join, my idea is to add extra limit to one side which has a higher statistics. Does that sound good?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to refine the idea.

For the full outer join,

  1. if both sides have limit. Do nothing.
  2. if only one side has a limit, add extra limit to that side.
  3. if both sides do not have a limit, add extra limit to one side which has a higher statistics

Is it better?


def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Limit(expr, Join(left, right, joinType, joinCondition)) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also can push it down through the Join's Projection.

joinType match {
case RightOuter =>
Limit(expr, Join(left, CombineLimits(Limit(expr, right)), joinType, joinCondition))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a stop condition to stop pushing Limit if it's already pushed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review! Since we call CombineLimits here to combine two consecutive Limits, we will not add extra Limit in the subsequent iteration. Thus, no change will be made in the plan. Thus, RuleExecutor will stop automatically. Am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, it's right, nvm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is right. However I think checking if it is already pushed would reduce unnecessary multiple applying this rule and CombineLimits. But the result should be the same. So it is minor.

case LeftOuter =>
Limit(expr, Join(CombineLimits(Limit(expr, left)), right, joinType, joinCondition))
case FullOuter =>
Limit(expr,
Join(
CombineLimits(Limit(expr, left)),
CombineLimits(Limit(expr, right)),
joinType, joinCondition))
case _ => f // DO Nothing for the other join types
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we add an extra limit in every iteration ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, but, after this rule, there exists another rule CombineLimits, which can combine the extra limits. I think we should still fix it anyway. Thanks!


/**
* Removes [[Cast Casts]] that are unnecessary because the input is already the correct type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftSemi, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand All @@ -41,6 +41,7 @@ class FilterPushdownSuite extends PlanTest {
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
PushLimitThroughOuterJoin,
ColumnPruning,
ProjectCollapsing) :: Nil
}
Expand Down Expand Up @@ -750,4 +751,56 @@ class FilterPushdownSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("limit: push down left outer join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, LeftOuter)
.limit(1)
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.limit(1)
val correctAnswer =
left.join(y, LeftOuter).limit(1).analyze

comparePlans(optimized, correctAnswer)
}

test("limit: push down right outer join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, RightOuter)
.limit(1)
}

val optimized = Optimize.execute(originalQuery.analyze)
val right = testRelation.limit(1)
val correctAnswer =
x.join(right, RightOuter).limit(1).analyze

comparePlans(optimized, correctAnswer)
}

test("limit: push down full outer join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, FullOuter)
.limit(1)
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.limit(1)
val right = testRelation.limit(1)
val correctAnswer =
left.join(right, FullOuter).limit(1).analyze

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
Expand Down