Skip to content

Commit

Permalink
addressed comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Dec 29, 2015
1 parent 358d62e commit 2823a57
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ object DefaultOptimizer extends Optimizer {
// Operator combine
ProjectCollapsing,
CombineFilters,
CombineLimits,
// Constant folding
NullPropagation,
OptimizeIn,
Expand All @@ -62,12 +61,44 @@ object DefaultOptimizer extends Optimizer {
SimplifyFilters,
SimplifyCasts,
SimplifyCaseConversionExpressions) ::
Batch("Push Down Limits", FixedPoint(100),
PushDownLimit,
CombineLimits,
ConstantFolding,
BooleanSimplification) ::
Batch("Decimal Optimizations", FixedPoint(100),
DecimalAggregates) ::
Batch("LocalRelation", FixedPoint(100),
ConvertToLocalRelation) :: Nil
}

/**
* Pushes down Limit for reducing the amount of returned data.
*
* 1. Adding Extra Limit beneath the operations, including Union All.
* 2. Project is pushed through Limit in the rule ColumnPruning
*
* Any operator that a Limit can be pushed passed should override the maxRows function.
*
* Note: This rule has to be done when the logical plan is stable;
* Otherwise, it could impact the other rules.
*/
object PushDownLimit extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transform {

// Adding extra Limit below UNION ALL iff both left and right childs are not Limit or
// do not have Limit descendants. This heuristic is valid assuming there does not exist
// any Limit push-down rule that is unable to infer the value of maxRows.
case Limit(exp, Union(left, right))
if left.maxRows.isEmpty || right.maxRows.isEmpty =>
Limit(exp,
Union(
Limit(exp, left),
Limit(exp, right)))
}
}

/**
* Pushes operations down into a Sample.
*/
Expand Down Expand Up @@ -153,17 +184,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
)
)

// Adding extra Limit below UNION ALL iff both left and right childs are not Limit and no Limit
// was pushed down before. This heuristic is valid assuming there does not exist any Limit
// push-down rule that is unable to infer the value of maxRows. Any operator that a Limit can
// be pushed passed should override this function.
case Limit(exp, Union(left, right))
if left.maxRows.isEmpty || right.maxRows.isEmpty =>
Limit(exp,
Union(
Limit(exp, left),
Limit(exp, right)))

// Push down deterministic projection through UNION ALL
case p @ Project(projectList, u @ Union(left, right)) =>
if (projectList.forall(_.deterministic)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ private[sql] object SetOperation {

case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {

override def maxRows: Option[Expression] =
for (leftMax <- left.maxRows; rightMax <- right.maxRows) yield Add(leftMax, rightMax)

override def statistics: Statistics = {
val sizeInBytes = left.statistics.sizeInBytes + right.statistics.sizeInBytes
Statistics(sizeInBytes = sizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._

class PushdownLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubQueries) ::
Batch("Push Down Limit", Once,
PushDownLimit,
CombineLimits,
ConstantFolding,
BooleanSimplification) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)

test("Union: limit to each side") {
val unionQuery = Union(testRelation, testRelation2).limit(1)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

test("Union: limit to each side with the new limit number") {
val testLimitUnion = Union(testRelation, testRelation2.limit(3))
val unionQuery = testLimitUnion.limit(1)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

test("Union: no limit to both sides") {
val testLimitUnion = Union(testRelation.limit(2), testRelation2.select('d).limit(3))
val unionQuery = testLimitUnion.limit(2)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(2, Union(testRelation.limit(2), testRelation2.select('d).limit(3))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,6 @@ class SetOperationPushDownSuite extends PlanTest {
comparePlans(exceptOptimized, exceptCorrectAnswer)
}

test("union: limit to each side") {
val unionQuery = testUnion.limit(1)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

test("union: limit to each side with the new limit number") {
val testLimitUnion = Union(testRelation, testRelation2.limit(3))
val unionQuery = testLimitUnion.limit(1)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

// If users already manually added the Limit, we do not add extra Limit
test("union: no limit to both sides") {
val testLimitUnion = Union(testRelation.limit(2), testRelation2.select('d).limit(3))
val unionQuery = testLimitUnion.limit(2)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(2, Union(testRelation.limit(2), testRelation2.select('d).limit(3))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

test("union: project to each side") {
val unionQuery = testUnion.select('a)
val unionOptimized = Optimize.execute(unionQuery.analyze)
Expand Down

0 comments on commit 2823a57

Please sign in to comment.