Skip to content

Commit

Permalink
eliminate useless Aggregate and Window
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Mar 10, 2016
1 parent 4dd3e66 commit 6cf6f44
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
* - LeftSemiJoin
*/
object ColumnPruning extends Rule[LogicalPlan] {
def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Prunes the unused columns from project list of Project/Aggregate/Expand
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
Expand Down Expand Up @@ -378,13 +374,21 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Eliminate no-op Projects
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child

// Eliminate no-op Window
case w: Window if sameOutput(w.child.output, w.output) => w.child

// Convert Aggregate to Project if no aggregate function exists
case a: Aggregate if !containsAggregates(a.expressions) =>
Project(a.aggregateExpressions, a.child)

// Can't prune the columns on LeafNode
case p @ Project(_, l: LeafNode) => p

// Prune windowExpressions and child of Window
case p @ Project(_, w: Window) if (w.outputSet -- p.references).nonEmpty =>
val newWindowExprs = w.windowExpressions.filter(p.references.contains)
val newGrandChild = prunedChild(w.child, w.references ++ p.references)
val newGrandChild =
prunedChild(w.child, p.references ++ AttributeSet(newWindowExprs.flatMap(_.references)))
p.copy(child = w.copy(
windowExpressions = newWindowExprs,
child = newGrandChild))
Expand All @@ -407,6 +411,18 @@ object ColumnPruning extends Rule[LogicalPlan] {
} else {
c
}

private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

private def isAggregateExpression(e: Expression): Boolean = {
e.isInstanceOf[AggregateExpression] || e.isInstanceOf[Grouping] || e.isInstanceOf[GroupingID]
}

private def containsAggregates(exprs: Seq[Expression]): Boolean = {
exprs.exists(_.find(isAggregateExpression).nonEmpty)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ class ColumnPruningSuite extends PlanTest {
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.select('a)
.groupBy('a)('a).analyze
.select('a).analyze

comparePlans(optimized, correctAnswer)
}
Expand All @@ -202,7 +201,7 @@ class ColumnPruningSuite extends PlanTest {
val correctAnswer =
testRelation
.select('a)
.groupBy('a)('a as 'c).analyze
.select('a as 'c).analyze

comparePlans(optimized, correctAnswer)
}
Expand Down Expand Up @@ -263,17 +262,15 @@ class ColumnPruningSuite extends PlanTest {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)

val originalQuery =
input.groupBy('a)('a, 'b, 'c, 'd,
input.groupBy('a, 'c, 'd)('a, 'c, 'd,
WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window)).select('a, 'c)

val correctAnswer =
input.select('a, 'b, 'c).groupBy('a)('a, 'b, 'c)
.window(Seq.empty[NamedExpression], 'a :: Nil, 'b.asc :: Nil)
.select('a, 'c).analyze
input.select('a, 'c).analyze

val optimized = Optimize.execute(originalQuery.analyze)

Expand Down Expand Up @@ -319,9 +316,7 @@ class ColumnPruningSuite extends PlanTest {
UnspecifiedFrame)).as('window)).select('a, 'c)

val correctAnswer =
input.select('a, 'b, 'c)
.window(Seq.empty[NamedExpression], 'a :: Nil, 'b.asc :: Nil)
.select('a, 'c).analyze
input.select('a, 'c).analyze

val optimized = Optimize.execute(originalQuery.analyze)

Expand Down

0 comments on commit 6cf6f44

Please sign in to comment.