Skip to content

Commit

Permalink
Eliminate sorts without limit in the subquery of Join/Aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
WangGuangxin committed Oct 3, 2019
1 parent 51d6ba7 commit 75b43f5
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
Batch("Join Reorder", FixedPoint(1),
CostBasedJoinReorder) :+
Batch("Remove Redundant Sorts", Once,
RemoveRedundantSorts) :+
RemoveRedundantSorts,
RemoveSortInSubquery) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
Batch("Object Expressions Optimization", fixedPoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule

/**
* [[Sort]] without [[Limit]] in subquery is useless. For example,
*
* {{{
* SELECT * FROM
* (SELECT f1 FROM tbl1 ORDER BY f2) temp1
* JOIN
* (SELECT f3 FROM tbl2) temp2
* ON temp1.f1 = temp2.f3
* }}}
*
* is equal to
*
* {{{
* SELECT * FROM
* (SELECT f1 FROM tbl1) temp1
* JOIN
* (SELECT f3 FROM tbl2) temp2
* ON temp1.f1 = temp2.f3"
* }}}
*
* This rule try to remove this kind of [[Sort]] operator.
*/
object RemoveSortInSubquery extends Rule[LogicalPlan] with PredicateHelper {
private def removeTopLevelSort(plan: LogicalPlan): LogicalPlan = {
plan match {
case Sort(_, _, child) => child
case Project(fields, child) => Project(fields, removeTopLevelSort(child))
case other => other
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case j @ Join(oldLeft, oldRight, _, _, _) =>
j.copy(left = removeTopLevelSort(oldLeft), right = removeTopLevelSort(oldRight))
case g @ Aggregate(_, _, oldChild) =>
g.copy(child = removeTopLevelSort(oldChild))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.{LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor

class RemoveSortInSubquerySuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Limit PushDown", FixedPoint(10), LimitPushDown) ::
Batch("Remove Redundant Sorts", Once, RemoveSortInSubquery) :: Nil
}

object PushDownOptimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Limit PushDown", FixedPoint(10), LimitPushDown) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelationB = LocalRelation('d.int)

test("remove orderBy in groupBy subquery") {
val projectPlan = testRelation.select('a, 'b)
val unnecessaryOrderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val groupByPlan = unnecessaryOrderByPlan.groupBy('a)(count(1))
val optimized = Optimize.execute(groupByPlan.analyze)
val correctAnswer = projectPlan.groupBy('a)(count(1)).analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("should not remove orderBy with limit in groupBy subquery") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc).limit(10)
val groupByPlan = orderByPlan.groupBy('a)(count(1))
val optimized = Optimize.execute(groupByPlan.analyze)
val correctAnswer = groupByPlan.analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("remove orderBy in join subquery") {
val projectPlan = testRelation.select('a, 'b)
val unnecessaryOrderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val projectPlanB = testRelationB.select('d)
val joinPlan = unnecessaryOrderByPlan.join(projectPlanB).select('a, 'd)
val optimized = Optimize.execute(joinPlan.analyze)
val correctAnswer = projectPlan.join(projectPlanB).select('a, 'd).analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("should not remove orderBy with limit in join subquery") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc).limit(10)
val projectPlanB = testRelationB.select('d)
val joinPlan = orderByPlan.join(projectPlanB).select('a, 'd)
val optimized = Optimize.execute(joinPlan.analyze)
val correctAnswer = joinPlan.analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("should not remove orderBy in left join subquery if there is an outer limit") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val projectPlanB = testRelationB.select('d)
val joinPlan = orderByPlan
.join(projectPlanB, LeftOuter)
.limit(10)
val optimized = Optimize.execute(joinPlan.analyze)
val correctAnswer = PushDownOptimizer.execute(joinPlan.analyze)
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("remove orderBy in right join subquery event if there is an outer limit") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val projectPlanB = testRelationB.select('d)
val joinPlan = orderByPlan
.join(projectPlanB, RightOuter)
.limit(10)
val optimized = Optimize.execute(joinPlan.analyze)
val correctAnswer = projectPlan
.join(projectPlanB, RightOuter)
.limit(10)
comparePlans(Optimize.execute(optimized), correctAnswer)
}
}

0 comments on commit 75b43f5

Please sign in to comment.