Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12505] [SQL] Pushdown a Limit on top of an Outer-Join #10454

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ object DefaultOptimizer extends Optimizer {
PushPredicateThroughProject,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
PushLimitThroughOuterJoin,
ColumnPruning,
// Operator combine
ProjectCollapsing,
Expand Down Expand Up @@ -857,6 +858,60 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}

/**
* Push [[Limit]] operators through [[Join]] operators or [[Project]] + [[Join]] operators,
* iff the join type is outer joins.
* Case 1: If the type is [[LeftOuter]] or [[RightOuter]], add extra [[Limit]] operators
* on top of the outer-side child.
* Case 2: If the type is [[FullOuter]] and only one child is [[Limit]], add extra [[Limit]]
* operators on the child that is not [[Limit]]
* Case 3: If the type is [[FullOuter]] and no child is [[Limit]], add extra [[Limit]]
* operators on the child whose statistics is higher.
*/
object PushLimitThroughOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we will generate wrong results. It is not safe to push down the limit if we are not joining on the foreign key with the primary key, right? For example, for a left outer join, we push down the limit to the right table. It is possible all rows returned by the right side are having the same join column value.

am I missing anything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For left outer joins, we only push down the limit to the left table. Thus, I think it should be safe?

Basically, the rule is to add additional Limit node(s) on top of the outer-side child/children

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry. I was looking at the wrong line. For full outer join, is it safe? Also, with the limit, the result of left/right outer may not be deterministic, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. Actually, for left/right outer join, what will happen if we have A left outer join B on (A.key = B.key) sort by A.key limit 10?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a sort, the plan is different. We will have a Sort below Limit. Thus, it is not applicable to this rule.

For the full-outer join, I think the result is still correct, but the result might not be the same as the original plan.

In the new plan, we add extra limit. If the nodes below limit is deterministic, the result should be deterministic. If not deterministic, the result will be not deterministic. Right? Thus, this rule will not change the results' deterministic property. Sorry, this part is unclear to me. I am not sure my answer is correct.

Based on my understanding, when we use limit, the users will not expect a deterministic results, unless they use order by.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I found a hole. full outer = union (left outer, right outer). We are unable to add extra limit below Union, which removes the duplicates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, we have the a table A having 1, 2, 3, 4, 5 (say k is the column name). If we do A x FULL OUTER JOIN A y ON (x.k = y.k) limit 2 and we push limit to both side, it is possible that we get 1,2 from the left side and 3, 4 from the right side, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update the code and fix the bug. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the full outer join, my idea is to add extra limit to one side which has a higher statistics. Does that sound good?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to refine the idea.

For the full outer join,

  1. if both sides have limit. Do nothing.
  2. if only one side has a limit, add extra limit to that side.
  3. if both sides do not have a limit, add extra limit to one side which has a higher statistics

Is it better?


private def makeNewJoinWithLimit(j: Join, limitExpr: Expression): Join = {
j.joinType match {
// RightOuter join:
// Add extra Limit in the right child
case RightOuter =>
Join(j.left, CombineLimits(Limit(limitExpr, j.right)), j.joinType, j.condition)
// LeftOuter join:
// Add extra Limit in the left child
case LeftOuter =>
Join(CombineLimits(Limit(limitExpr, j.left)), j.right, j.joinType, j.condition)
// FullOuter join whose left child is not Limit but right child is Limit
// Add extra Limit in the right child
case FullOuter if !j.left.isInstanceOf[Limit] && j.right.isInstanceOf[Limit] =>
Join(j.left, CombineLimits(Limit(limitExpr, j.right)), j.joinType, j.condition)
// FullOuter join whose left child is Limit but right child is not Limit
// Add extra Limit in the left child
case FullOuter if j.left.isInstanceOf[Limit] && !j.right.isInstanceOf[Limit] =>
Join(CombineLimits(Limit(limitExpr, j.left)), j.right, j.joinType, j.condition)
// FullOuter join whose left and right children are not Limit:
// Add extra Limit in the child whose statistics is higher
case FullOuter if !j.left.isInstanceOf[Limit] && !j.right.isInstanceOf[Limit] =>
if (j.left.statistics.sizeInBytes <= j.right.statistics.sizeInBytes) {
Join(j.left, CombineLimits(Limit(limitExpr, j.right)), j.joinType, j.condition)
} else {
Join(CombineLimits(Limit(limitExpr, j.left)), j.right, j.joinType, j.condition)
}
// DO Nothing for the other cases
case _ => j
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Limit(limitExpr, j: Join) =>
Limit(limitExpr,
makeNewJoinWithLimit(j, limitExpr))
case Limit(limitExpr, Project(projectList, j: Join)) =>
Limit(limitExpr,
Project(projectList,
makeNewJoinWithLimit(j, limitExpr)))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we add an extra limit in every iteration ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, but, after this rule, there exists another rule CombineLimits, which can combine the extra limits. I think we should still fix it anyway. Thanks!


/**
* Removes [[Cast Casts]] that are unnecessary because the input is already the correct type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftSemi, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand All @@ -41,6 +41,7 @@ class FilterPushdownSuite extends PlanTest {
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
PushLimitThroughOuterJoin,
ColumnPruning,
ProjectCollapsing) :: Nil
}
Expand Down Expand Up @@ -750,4 +751,74 @@ class FilterPushdownSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("limit: push down left outer join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, LeftOuter)
.limit(1)
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.limit(1)
val correctAnswer =
left.join(y, LeftOuter).limit(1).analyze

comparePlans(optimized, correctAnswer)
}

test("limit: push down right outer join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, RightOuter)
.limit(1)
}

val optimized = Optimize.execute(originalQuery.analyze)
val right = testRelation.limit(1)
val correctAnswer =
x.join(right, RightOuter).limit(1).analyze

comparePlans(optimized, correctAnswer)
}

test("limit: push down full outer join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, FullOuter)
.limit(1)
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation.limit(1)
val correctAnswer =
left.join(right, FullOuter).limit(1).analyze

comparePlans(optimized, correctAnswer)
}

test("limit: push down full outer join + project") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery = {
x.join(y, FullOuter).select('a, 'b, 'd)
.limit(1)
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.select('a, 'b)
val right = testRelation1.limit(1)
val correctAnswer =
left.join(right, FullOuter).select('a, 'b, 'd).limit(1).analyze

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
Expand Down