Skip to content

Commit

Permalink
address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Mar 11, 2016
1 parent fc96d84 commit 6a59b42
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,15 @@ object ColumnPruning extends Rule[LogicalPlan] {
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Prunes the unused columns from project list of Project/Aggregate/Expand
// Prunes the unused columns from project list of Project/Aggregate/Window/Expand
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains)))
case p @ Project(_, a: Aggregate) if (a.outputSet -- p.references).nonEmpty =>
p.copy(
child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains)))
case p @ Project(_, w: Window) if (w.windowOutputSet -- p.references).nonEmpty =>
p.copy(child = w.copy(
windowExpressions = w.windowExpressions.filter(p.references.contains)))
case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty =>
val newOutput = e.output.filter(a.references.contains(_))
val newProjects = e.projections.map { proj =>
Expand Down Expand Up @@ -384,15 +387,6 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Can't prune the columns on LeafNode
case p @ Project(_, l: LeafNode) => p

// Prune windowExpressions and child of Window
case p @ Project(_, w: Window) if (w.outputSet -- p.references).nonEmpty =>
val newWindowExprs = w.windowExpressions.filter(p.references.contains)
val newGrandChild =
prunedChild(w.child, p.references ++ AttributeSet(newWindowExprs.flatMap(_.references)))
p.copy(child = w.copy(
windowExpressions = newWindowExprs,
child = newGrandChild))

// for all other logical plans that inherits the output from it's children
case p @ Project(_, child) =>
val required = child.references ++ p.references
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ case class Window(

override def output: Seq[Attribute] =
child.output ++ windowExpressions.map(_.toAttribute)

def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute))
}

private[sql] object Expand {
Expand Down

0 comments on commit 6a59b42

Please sign in to comment.