Skip to content

Commit

Permalink
fix the bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Dec 24, 2015
1 parent 72f73fb commit ee91303
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -859,26 +859,56 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}

/**
* Push [[Limit]] operators through [[Join]] operators, iff the join type is outer joins.
* Adding extra [[Limit]] operators on top of the outer-side child/children.
* Push [[Limit]] operators through [[Join]] operators or [[Project]] + [[Join]] operators,
* iff the join type is outer joins.
* Case 1: If the type is [[LeftOuter]] or [[RightOuter]], add extra [[Limit]] operators
* on top of the outer-side child.
* Case 2: If the type is [[FullOuter]] and only one child is [[Limit]], add extra [[Limit]]
* operators on the child that is not [[Limit]]
* Case 3: If the type is [[FullOuter]] and no child is [[Limit]], add extra [[Limit]]
* operators on the child whose statistics is higher.
*/
object PushLimitThroughOuterJoin extends Rule[LogicalPlan] with PredicateHelper {

private def makeNewJoinWithLimit(j: Join, limitExpr: Expression): Join = {
j.joinType match {
// RightOuter join:
// Add extra Limit in the right child
case RightOuter =>
Join(j.left, CombineLimits(Limit(limitExpr, j.right)), j.joinType, j.condition)
// LeftOuter join:
// Add extra Limit in the left child
case LeftOuter =>
Join(CombineLimits(Limit(limitExpr, j.left)), j.right, j.joinType, j.condition)
// FullOuter join whose left child is not Limit but right child is Limit
// Add extra Limit in the right child
case FullOuter if !j.left.isInstanceOf[Limit] && j.right.isInstanceOf[Limit] =>
Join(j.left, CombineLimits(Limit(limitExpr, j.right)), j.joinType, j.condition)
// FullOuter join whose left child is Limit but right child is not Limit
// Add extra Limit in the left child
case FullOuter if j.left.isInstanceOf[Limit] && !j.right.isInstanceOf[Limit] =>
Join(CombineLimits(Limit(limitExpr, j.left)), j.right, j.joinType, j.condition)
// FullOuter join whose left and right children are not Limit:
// Add extra Limit in the child whose statistics is higher
case FullOuter if !j.left.isInstanceOf[Limit] && !j.right.isInstanceOf[Limit] =>
if (j.left.statistics.sizeInBytes <= j.right.statistics.sizeInBytes) {
Join(j.left, CombineLimits(Limit(limitExpr, j.right)), j.joinType, j.condition)
} else {
Join(CombineLimits(Limit(limitExpr, j.left)), j.right, j.joinType, j.condition)
}
// DO Nothing for the other cases
case _ => j
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Limit(expr, Join(left, right, joinType, joinCondition)) =>
joinType match {
case RightOuter =>
Limit(expr, Join(left, CombineLimits(Limit(expr, right)), joinType, joinCondition))
case LeftOuter =>
Limit(expr, Join(CombineLimits(Limit(expr, left)), right, joinType, joinCondition))
case FullOuter =>
Limit(expr,
Join(
CombineLimits(Limit(expr, left)),
CombineLimits(Limit(expr, right)),
joinType, joinCondition))
case _ => f // DO Nothing for the other join types
}
case Limit(limitExpr, j: Join) =>
Limit(limitExpr,
makeNewJoinWithLimit(j, limitExpr))
case Limit(limitExpr, Project(projectList, j: Join)) =>
Limit(limitExpr,
Project(projectList,
makeNewJoinWithLimit(j, limitExpr)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,11 +796,29 @@ class FilterPushdownSuite extends PlanTest {
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.limit(1)
val left = testRelation
val right = testRelation.limit(1)
val correctAnswer =
left.join(right, FullOuter).limit(1).analyze

comparePlans(optimized, correctAnswer)
}

test("limit: push down full outer join + project") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery = {
x.join(y, FullOuter).select('a, 'b, 'd)
.limit(1)
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.select('a, 'b)
val right = testRelation1.limit(1)
val correctAnswer =
left.join(right, FullOuter).select('a, 'b, 'd).limit(1).analyze

comparePlans(optimized, correctAnswer)
}
}

0 comments on commit ee91303

Please sign in to comment.