Skip to content

Commit

Permalink
merge with RemoveRedudanctSorts
Browse files Browse the repository at this point in the history
  • Loading branch information
WangGuangxin committed Oct 24, 2019
1 parent a9e9be9 commit 425f76d
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
Batch("Join Reorder", FixedPoint(1),
CostBasedJoinReorder) :+
Batch("Remove Redundant Sorts", Once,
RemoveRedundantSorts,
RemoveSortInSubquery) :+
RemoveRedundantSorts) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
Batch("Object Expressions Optimization", fixedPoint,
Expand Down Expand Up @@ -968,12 +967,18 @@ object EliminateSorts extends Rule[LogicalPlan] {
* Removes redundant Sort operation. This can happen:
* 1) if the child is already sorted
* 2) if there is another Sort operator separated by 0...n Project/Filter operators
* 3) if the Sort operator is within Join and without Limit
* 4) if the Sort operator is within GroupBy and the aggregate function is order irrelevant
*/
object RemoveRedundantSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
child
case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
case j @ Join(originLeft, originRight, _, _, _) =>
j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
g.copy(child = recursiveRemoveSort(originChild))
}

def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match {
Expand All @@ -988,6 +993,16 @@ object RemoveRedundantSorts extends Rule[LogicalPlan] {
case f: Filter => f.condition.deterministic
case _ => false
}

def isOrderIrrelevantAggs(aggs: Seq[NamedExpression]): Boolean = {
val aggExpressions = aggs.flatMap { e =>
e.collect {
case ae: AggregateExpression => ae
}
}

aggExpressions.forall(_.aggregateFunction.isInstanceOf[OrderIrrelevantAggs])
}
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,21 @@ class RemoveRedundantSortsSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Limit PushDown", Once,
LimitPushDown) ::
Batch("Remove Redundant Sorts", Once,
RemoveRedundantSorts) ::
Batch("Collapse Project", Once,
CollapseProject) :: Nil
}

object PushDownOptimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Limit PushDown", FixedPoint(10), LimitPushDown) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelationB = LocalRelation('d.int)

test("remove redundant order by") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
Expand Down Expand Up @@ -96,7 +104,7 @@ class RemoveRedundantSortsSuite extends PlanTest {
}

test("sort should not be removed when there is a node which doesn't guarantee any order") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc)
val orderedPlan = testRelation.select('a, 'b)
val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)
val optimized = Optimize.execute(groupedAndResorted.analyze)
val correctAnswer = groupedAndResorted.analyze
Expand Down Expand Up @@ -135,4 +143,96 @@ class RemoveRedundantSortsSuite extends PlanTest {
.select(('b + 1).as('c)).orderBy('c.asc).analyze
comparePlans(optimizedThrice, correctAnswerThrice)
}

test("remove orderBy in groupBy clause with count aggs") {
val projectPlan = testRelation.select('a, 'b)
val unnecessaryOrderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val groupByPlan = unnecessaryOrderByPlan.groupBy('a)(count(1))
val optimized = Optimize.execute(groupByPlan.analyze)
val correctAnswer = projectPlan.groupBy('a)(count(1)).analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("remove orderBy in groupBy clause with sum aggs") {
val projectPlan = testRelation.select('a, 'b)
val unnecessaryOrderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val groupByPlan = unnecessaryOrderByPlan.groupBy('a)(sum('a))
val optimized = Optimize.execute(groupByPlan.analyze)
val correctAnswer = projectPlan.groupBy('a)(sum('a)).analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("remove orderBy in groupBy clause with first aggs") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val groupByPlan = orderByPlan.groupBy('a)(first('a))
val optimized = Optimize.execute(groupByPlan.analyze)
val correctAnswer = groupByPlan.analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("remove orderBy in groupBy clause with first and count aggs") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val groupByPlan = orderByPlan.groupBy('a)(first('a), count(1))
val optimized = Optimize.execute(groupByPlan.analyze)
val correctAnswer = groupByPlan.analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("should not remove orderBy with limit in groupBy clause") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc).limit(10)
val groupByPlan = orderByPlan.groupBy('a)(count(1))
val optimized = Optimize.execute(groupByPlan.analyze)
val correctAnswer = groupByPlan.analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("remove orderBy in join clause") {
val projectPlan = testRelation.select('a, 'b)
val unnecessaryOrderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val projectPlanB = testRelationB.select('d)
val joinPlan = unnecessaryOrderByPlan.join(projectPlanB).select('a, 'd)
val optimized = Optimize.execute(joinPlan.analyze)
val correctAnswer = projectPlan.join(projectPlanB).select('a, 'd).analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("should not remove orderBy with limit in join clause") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc).limit(10)
val projectPlanB = testRelationB.select('d)
val joinPlan = orderByPlan.join(projectPlanB).select('a, 'd)
val optimized = Optimize.execute(joinPlan.analyze)
val correctAnswer = joinPlan.analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("should not remove orderBy in left join clause if there is an outer limit") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val projectPlanB = testRelationB.select('d)
val joinPlan = orderByPlan
.join(projectPlanB, LeftOuter)
.limit(10)
val optimized = Optimize.execute(joinPlan.analyze)
val correctAnswer = PushDownOptimizer.execute(joinPlan.analyze)
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("remove orderBy in right join clause event if there is an outer limit") {
val projectPlan = testRelation.select('a, 'b)
val orderByPlan = projectPlan.orderBy('a.asc, 'b.desc)
val projectPlanB = testRelationB.select('d)
val joinPlan = orderByPlan
.join(projectPlanB, RightOuter)
.limit(10)
val optimized = Optimize.execute(joinPlan.analyze)
val noOrderByPlan = projectPlan
.join(projectPlanB, RightOuter)
.limit(10)
val correctAnswer = PushDownOptimizer.execute(noOrderByPlan.analyze)
comparePlans(Optimize.execute(optimized), correctAnswer)
}
}

This file was deleted.

0 comments on commit 425f76d

Please sign in to comment.