Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13732] [SPARK-13797] [SQL] Remove projectList from Window and Eliminate useless Window #11565

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
59daa48
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 5, 2016
41d5f64
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 7, 2016
25f6ff6
remove projectList from Window
gatorsmile Mar 7, 2016
467b095
clean the comment.
gatorsmile Mar 8, 2016
b169236
changed the column pruning rule for Window.
gatorsmile Mar 10, 2016
b229ea2
style fix.
gatorsmile Mar 10, 2016
472a6e3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 10, 2016
60fcafa
Merge branch 'removeProjListWindow' into removeProjListWindowNew
gatorsmile Mar 10, 2016
f0fbe78
address comments.
gatorsmile Mar 10, 2016
f8fd37f
address comments.
gatorsmile Mar 10, 2016
4dd3e66
added a comment
gatorsmile Mar 10, 2016
6cf6f44
eliminate useless Aggregate and Window
gatorsmile Mar 10, 2016
44326f1
remove aggregate replacement.
gatorsmile Mar 10, 2016
fc96d84
address comments.
gatorsmile Mar 10, 2016
6a59b42
address comments.
gatorsmile Mar 11, 2016
bd35ee7
address comments.
gatorsmile Mar 11, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class Analyzer(
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
(oldVersion, oldVersion.copy(generatorOutput = newOutput))

case oldVersion @ Window(_, windowExpressions, _, _, child)
case oldVersion @ Window(windowExpressions, _, _, child)
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
Expand Down Expand Up @@ -658,10 +658,6 @@ class Analyzer(
case p: Project =>
val missing = missingAttrs -- p.child.outputSet
Project(p.projectList ++ missingAttrs, addMissingAttr(p.child, missing))
case w: Window =>
val missing = missingAttrs -- w.child.outputSet
w.copy(projectList = w.projectList ++ missingAttrs,
child = addMissingAttr(w.child, missing))
case a: Aggregate =>
// all the missing attributes should be grouping expressions
// TODO: push down AggregateExpression
Expand Down Expand Up @@ -1166,7 +1162,6 @@ class Analyzer(
// Set currentChild to the newly created Window operator.
currentChild =
Window(
currentChild.output,
windowExpressions,
partitionSpec,
orderSpec,
Expand Down Expand Up @@ -1436,10 +1431,10 @@ object CleanupAliases extends Rule[LogicalPlan] {
val cleanedAggs = aggs.map(trimNonTopLevelAliases(_).asInstanceOf[NamedExpression])
Aggregate(grouping.map(trimAliases), cleanedAggs, child)

case w @ Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
case w @ Window(windowExprs, partitionSpec, orderSpec, child) =>
val cleanedWindowExprs =
windowExprs.map(e => trimNonTopLevelAliases(e).asInstanceOf[NamedExpression])
Window(projectList, cleanedWindowExprs, partitionSpec.map(trimAliases),
Window(cleanedWindowExprs, partitionSpec.map(trimAliases),
orderSpec.map(trimAliases(_).asInstanceOf[SortOrder]), child)

// Operators that operate on objects should only have expressions from encoders, which should
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ package object dsl {
Aggregate(groupingExprs, aliasedExprs, logicalPlan)
}

def window(
windowExpressions: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder]): LogicalPlan =
Window(windowExpressions, partitionSpec, orderSpec, logicalPlan)

def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan)

def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,21 +315,17 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
* - LeftSemiJoin
*/
object ColumnPruning extends Rule[LogicalPlan] {
def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Prunes the unused columns from project list of Project/Aggregate/Window/Expand
// Prunes the unused columns from project list of Project/Aggregate/Expand
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains)))
case p @ Project(_, a: Aggregate) if (a.outputSet -- p.references).nonEmpty =>
p.copy(
child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains)))
case p @ Project(_, w: Window) if (w.outputSet -- p.references).nonEmpty =>
p.copy(child = w.copy(
projectList = w.projectList.filter(p.references.contains),
windowExpressions = w.windowExpressions.filter(p.references.contains)))
case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty =>
val newOutput = e.output.filter(a.references.contains(_))
val newProjects = e.projections.map { proj =>
Expand All @@ -343,11 +339,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
case mp @ MapPartitions(_, _, _, child) if (child.outputSet -- mp.references).nonEmpty =>
mp.copy(child = prunedChild(child, mp.references))

// Prunes the unused columns from child of Aggregate/Window/Expand/Generate
// Prunes the unused columns from child of Aggregate/Expand/Generate
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
a.copy(child = prunedChild(child, a.references))
case w @ Window(_, _, _, _, child) if (child.outputSet -- w.references).nonEmpty =>
w.copy(child = prunedChild(child, w.references))
case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty =>
e.copy(child = prunedChild(child, e.references))
case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty =>
Expand Down Expand Up @@ -384,9 +378,21 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Eliminate no-op Projects
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child

// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this rule near the rule that filter out useless window expressions, which makes people eaiser to understand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.


// Can't prune the columns on LeafNode
case p @ Project(_, l: LeafNode) => p

// Prune windowExpressions and child of Window
case p @ Project(_, w: Window) if (w.outputSet -- p.references).nonEmpty =>
val newWindowExprs = w.windowExpressions.filter(p.references.contains)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rethink about it, seems we can still separate it into 2 rules.
We can add a def windowOutputSet = AttributeSet(windowExpressions.map(_.toAttribute)) to Window operator, and change the first case to case p @ Project(_, w: Window) if (w.windowOutputSet -- p.references).nonEmpty => and filter out useless window expressions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do it.Thanks!

val newGrandChild =
prunedChild(w.child, p.references ++ AttributeSet(newWindowExprs.flatMap(_.references)))
p.copy(child = w.copy(
windowExpressions = newWindowExprs,
child = newGrandChild))

// for all other logical plans that inherits the output from it's children
case p @ Project(_, child) =>
val required = child.references ++ p.references
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,13 @@ case class Aggregate(
}

case class Window(
projectList: Seq[Attribute],
windowExpressions: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: LogicalPlan) extends UnaryNode {

override def output: Seq[Attribute] =
projectList ++ windowExpressions.map(_.toAttribute)
child.output ++ windowExpressions.map(_.toAttribute)
}

private[sql] object Expand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count}
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -258,6 +259,71 @@ class ColumnPruningSuite extends PlanTest {
comparePlans(optimized1, analysis.EliminateSubqueryAliases(correctAnswer1))
}

test("Column pruning on Window with useless aggregate functions") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)

val originalQuery =
input.groupBy('a, 'c, 'd)('a, 'c, 'd,
WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window)).select('a, 'c)

val correctAnswer =
input.select('a, 'c, 'd).groupBy('a, 'c, 'd)('a, 'c).analyze

val optimized = Optimize.execute(originalQuery.analyze)

comparePlans(optimized, correctAnswer)
}

test("Column pruning on Window with selected agg expressions") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)

val originalQuery =
input.select('a, 'b, 'c, 'd,
WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window)).where('window > 1).select('a, 'c)

val correctAnswer =
input.select('a, 'b, 'c)
.window(WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window) :: Nil,
'a :: Nil, 'b.asc :: Nil)
.select('a, 'c, 'window).select('a, 'c, 'window, 'window)
.select('a, 'c, 'window).where('window > 1).select('a, 'c).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to see 3 selects here, we can add CollapseProject rule into the optimizer in this test suite.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Thanks!


val optimized = Optimize.execute(originalQuery.analyze)

comparePlans(optimized, correctAnswer)
}

test("Column pruning on Window in select") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)

val originalQuery =
input.select('a, 'b, 'c, 'd,
WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window)).select('a, 'c)

val correctAnswer =
input.select('a, 'c).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: they can fit in one line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will do.


val optimized = Optimize.execute(originalQuery.analyze)

comparePlans(optimized, correctAnswer)
}

test("Column pruning on Union") {
val input1 = LocalRelation('a.int, 'b.string, 'c.double)
val input2 = LocalRelation('c.int, 'd.string, 'e.double)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Filter(condition, planLater(child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.Expand(e.projections, e.output, planLater(child)) :: Nil
case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
execution.Window(
projectList, windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>
execution.Window(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, Unsaf
* of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]].
*/
case class Window(
projectList: Seq[Attribute],
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan)
extends UnaryNode {

override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute)
override def output: Seq[Attribute] =
child.output ++ windowExpression.map(_.toAttribute)

override def requiredChildDistribution: Seq[Distribution] = {
if (partitionSpec.isEmpty) {
Expand Down Expand Up @@ -275,7 +275,7 @@ case class Window(
val unboundToRefMap = expressions.zip(references).toMap
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
UnsafeProjection.create(
projectList ++ patchedWindowExpression,
child.output ++ patchedWindowExpression,
child.output)
}

Expand Down