From 56fd78267618a3b3aab73ca01d682f4fb25a638c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 23 Dec 2015 10:08:03 -0800 Subject: [PATCH 01/10] union limit pushdown. --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 9 +++++++++ .../catalyst/optimizer/SetOperationPushDownSuite.scala | 8 ++++++++ 2 files changed, 17 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f6088695a9276..99504bd5d3efd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -153,6 +153,15 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { ) ) + // Push down limit into union + case Limit(exp, Union(left, right)) => + Limit(exp, + Union( + Limit(exp, left), + Limit(exp, right) + ) + ) + // Push down deterministic projection through UNION ALL case p @ Project(projectList, u @ Union(left, right)) => if (projectList.forall(_.deterministic)) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 1595ad9327423..915b5e5f21fd5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -61,6 +61,14 @@ class SetOperationPushDownSuite extends PlanTest { comparePlans(exceptOptimized, exceptCorrectAnswer) } + test("union: limit to each side") { + val unionQuery = testUnion.limit(1) + val unionOptimized = Optimize.execute(unionQuery.analyze) + val unionCorrectAnswer = + Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze + comparePlans(unionOptimized, unionCorrectAnswer) + } + test("union: project to each side") { val unionQuery = testUnion.select('a) val unionOptimized = Optimize.execute(unionQuery.analyze) From 77105e391a0adf6a6e0a3891a1c692e6bcdaf9f4 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 23 Dec 2015 14:45:41 -0800 Subject: [PATCH 02/10] combine the limits. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 99504bd5d3efd..bf19fb3430363 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -157,8 +157,8 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { case Limit(exp, Union(left, right)) => Limit(exp, Union( - Limit(exp, left), - Limit(exp, right) + CombineLimits(Limit(exp, left)), + CombineLimits(Limit(exp, right)) ) ) From 7f25d910e579449138845c0241405a29a97450fa Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 24 Dec 2015 00:11:52 -0800 Subject: [PATCH 03/10] update the comments. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bf19fb3430363..f013e7e494525 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -86,8 +86,8 @@ object SamplePushDown extends Rule[LogicalPlan] { * Operations that are safe to pushdown are listed as follows. * Union: * Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is - * safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT, - * we will not be able to pushdown Projections. + * safe to pushdown Filters, Projections and Limits through it. Once we add UNION DISTINCT, + * we will not be able to pushdown Projections and Limits. * * Intersect: * It is not safe to pushdown Projections through it because we need to get the From ae59f425457c5311ae2021c3654b6fca8c96f73e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 24 Dec 2015 22:53:09 -0800 Subject: [PATCH 04/10] add a stop flag. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 13 +++++++------ .../catalyst/plans/logical/basicOperators.scala | 16 +++++++++++++++- .../optimizer/SetOperationPushDownSuite.scala | 2 +- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f013e7e494525..3841504d793f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -154,12 +154,13 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { ) // Push down limit into union - case Limit(exp, Union(left, right)) => + case Limit(exp, Union(left, right), optimized) if !optimized => Limit(exp, Union( - CombineLimits(Limit(exp, left)), - CombineLimits(Limit(exp, right)) - ) + Limit(exp, left), + Limit(exp, right) + ), + optimized = true ) // Push down deterministic projection through UNION ALL @@ -263,7 +264,7 @@ object ColumnPruning extends Rule[LogicalPlan] { Join(left, prunedChild(right, allReferences), LeftSemi, condition) // Push down project through limit, so that we may have chance to push it further. - case Project(projectList, Limit(exp, child)) => + case Project(projectList, Limit(exp, child, _)) => Limit(exp, Project(projectList, child)) // Push down project if possible when the child is sort. @@ -891,7 +892,7 @@ object RemoveDispensableExpressions extends Rule[LogicalPlan] { */ object CombineLimits extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case ll @ Limit(le, nl @ Limit(ne, grandChild)) => + case ll @ Limit(le, nl @ Limit(ne, grandChild, _), _) => Limit(If(LessThan(ne, le), ne, le), grandChild) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 64ef4d799659f..0a1b48d56b4d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -448,7 +448,21 @@ case class Pivot( } } -case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { +/** Factory for constructing new `Limit` nodes. */ +object Limit { + def apply(limitExpr: Expression, child: LogicalPlan): Limit = { + new Limit(limitExpr, child, optimized = false) + } +} + +/** + * Take the first `limitExpr` rows. + * @param limitExpr The number of returned rows + * @param child Child operator + * @param optimized This node has been optimized. Note that this is only a flag marker used + * to avoid adding extra `Limit` nodes to the child operators more than once. + */ +case class Limit(limitExpr: Expression, child: LogicalPlan, optimized: Boolean) extends UnaryNode { override def output: Seq[Attribute] = child.output override lazy val statistics: Statistics = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 915b5e5f21fd5..1994f353b12db 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -65,7 +65,7 @@ class SetOperationPushDownSuite extends PlanTest { val unionQuery = testUnion.limit(1) val unionOptimized = Optimize.execute(unionQuery.analyze) val unionCorrectAnswer = - Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze + Limit(1, Union(testRelation.limit(1), testRelation2.limit(1)), optimized = true).analyze comparePlans(unionOptimized, unionCorrectAnswer) } From 3ccf3bd11f5ee89905c4a5cc4466fd4d332d0b42 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 24 Dec 2015 23:36:46 -0800 Subject: [PATCH 05/10] fixed the build failure. --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 183d9b65023b9..2084e3d164449 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -276,11 +276,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object TakeOrderedAndProject extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => + case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child), _) => execution.TakeOrderedAndProject(limit, order, None, planLater(child)) :: Nil case logical.Limit( IntegerLiteral(limit), - logical.Project(projectList, logical.Sort(order, true, child))) => + logical.Project(projectList, logical.Sort(order, true, child)), + _) => execution.TakeOrderedAndProject(limit, order, Some(projectList), planLater(child)) :: Nil case _ => Nil } @@ -345,7 +346,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil - case logical.Limit(IntegerLiteral(limit), child) => + case logical.Limit(IntegerLiteral(limit), child, _) => execution.Limit(limit, planLater(child)) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater)) :: Nil From 004ed6634ac392f96bef0fcf6def4f0de78498fd Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 25 Dec 2015 21:54:11 -0800 Subject: [PATCH 06/10] revert the changes back. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 13 ++++++------- .../catalyst/plans/logical/basicOperators.scala | 16 +--------------- .../optimizer/SetOperationPushDownSuite.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 7 +++---- 4 files changed, 11 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3841504d793f0..f013e7e494525 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -154,13 +154,12 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { ) // Push down limit into union - case Limit(exp, Union(left, right), optimized) if !optimized => + case Limit(exp, Union(left, right)) => Limit(exp, Union( - Limit(exp, left), - Limit(exp, right) - ), - optimized = true + CombineLimits(Limit(exp, left)), + CombineLimits(Limit(exp, right)) + ) ) // Push down deterministic projection through UNION ALL @@ -264,7 +263,7 @@ object ColumnPruning extends Rule[LogicalPlan] { Join(left, prunedChild(right, allReferences), LeftSemi, condition) // Push down project through limit, so that we may have chance to push it further. - case Project(projectList, Limit(exp, child, _)) => + case Project(projectList, Limit(exp, child)) => Limit(exp, Project(projectList, child)) // Push down project if possible when the child is sort. @@ -892,7 +891,7 @@ object RemoveDispensableExpressions extends Rule[LogicalPlan] { */ object CombineLimits extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case ll @ Limit(le, nl @ Limit(ne, grandChild, _), _) => + case ll @ Limit(le, nl @ Limit(ne, grandChild)) => Limit(If(LessThan(ne, le), ne, le), grandChild) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 0a1b48d56b4d6..64ef4d799659f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -448,21 +448,7 @@ case class Pivot( } } -/** Factory for constructing new `Limit` nodes. */ -object Limit { - def apply(limitExpr: Expression, child: LogicalPlan): Limit = { - new Limit(limitExpr, child, optimized = false) - } -} - -/** - * Take the first `limitExpr` rows. - * @param limitExpr The number of returned rows - * @param child Child operator - * @param optimized This node has been optimized. Note that this is only a flag marker used - * to avoid adding extra `Limit` nodes to the child operators more than once. - */ -case class Limit(limitExpr: Expression, child: LogicalPlan, optimized: Boolean) extends UnaryNode { +case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override lazy val statistics: Statistics = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 1994f353b12db..915b5e5f21fd5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -65,7 +65,7 @@ class SetOperationPushDownSuite extends PlanTest { val unionQuery = testUnion.limit(1) val unionOptimized = Optimize.execute(unionQuery.analyze) val unionCorrectAnswer = - Limit(1, Union(testRelation.limit(1), testRelation2.limit(1)), optimized = true).analyze + Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze comparePlans(unionOptimized, unionCorrectAnswer) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2084e3d164449..183d9b65023b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -276,12 +276,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object TakeOrderedAndProject extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child), _) => + case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => execution.TakeOrderedAndProject(limit, order, None, planLater(child)) :: Nil case logical.Limit( IntegerLiteral(limit), - logical.Project(projectList, logical.Sort(order, true, child)), - _) => + logical.Project(projectList, logical.Sort(order, true, child))) => execution.TakeOrderedAndProject(limit, order, Some(projectList), planLater(child)) :: Nil case _ => Nil } @@ -346,7 +345,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil - case logical.Limit(IntegerLiteral(limit), child, _) => + case logical.Limit(IntegerLiteral(limit), child) => execution.Limit(limit, planLater(child)) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater)) :: Nil From 6998ec9d091260c63b40964997126838812bbf03 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Dec 2015 15:07:24 -0800 Subject: [PATCH 07/10] added limitedNumRows into the logicalPlan --- .../sql/catalyst/optimizer/Optimizer.scala | 10 ++++----- .../catalyst/plans/logical/LogicalPlan.scala | 5 +++++ .../plans/logical/basicOperators.scala | 2 ++ .../optimizer/SetOperationPushDownSuite.scala | 21 +++++++++++++++++++ 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f013e7e494525..ea206da5bfb67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -153,14 +153,14 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { ) ) - // Push down limit into union - case Limit(exp, Union(left, right)) => + // Adding extra Limit below UNION ALL if both left and right childs are not Limit. + // This heuristic is valid assuming there does not exist any Limit push-down rule. + case Limit(exp, Union(left, right)) + if left.limitedNumRows.isEmpty || right.limitedNumRows.isEmpty => Limit(exp, Union( CombineLimits(Limit(exp, left)), - CombineLimits(Limit(exp, right)) - ) - ) + CombineLimits(Limit(exp, right)))) // Push down deterministic projection through UNION ALL case p @ Project(projectList, u @ Union(left, right)) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 8f8747e105932..d8690717e800b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -90,6 +90,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product) } + /** + * Returns the limited number of rows to be returned. + */ + def limitedNumRows: Option[Expression] = None + /** * Returns true if this expression and all its children have been resolved to a specific schema * and false if it still contains any unresolved placeholders. Implementations of LogicalPlan diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 64ef4d799659f..b91844f9ac5d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -451,6 +451,8 @@ case class Pivot( case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output + override def limitedNumRows: Option[Expression] = Option(limitExpr) + override lazy val statistics: Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 915b5e5f21fd5..9483ed10ba3a2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -31,6 +31,8 @@ class SetOperationPushDownSuite extends PlanTest { EliminateSubQueries) :: Batch("Union Pushdown", Once, SetOperationPushDown, + ConstantFolding, + BooleanSimplification, SimplifyFilters) :: Nil } @@ -69,6 +71,25 @@ class SetOperationPushDownSuite extends PlanTest { comparePlans(unionOptimized, unionCorrectAnswer) } + test("union: limit to each side with the new limit number") { + val testLimitUnion = Union(testRelation, testRelation2.limit(3)) + val unionQuery = testLimitUnion.limit(1) + val unionOptimized = Optimize.execute(unionQuery.analyze) + val unionCorrectAnswer = + Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze + comparePlans(unionOptimized, unionCorrectAnswer) + } + + // If users already manually added the Limit, we do not add extra Limit + test("union: no limit to both sides") { + val testLimitUnion = Union(testRelation.limit(2), testRelation2.limit(3)) + val unionQuery = testLimitUnion.limit(2) + val unionOptimized = Optimize.execute(unionQuery.analyze) + val unionCorrectAnswer = + Limit(2, Union(testRelation.limit(2), testRelation2.limit(3))).analyze + comparePlans(unionOptimized, unionCorrectAnswer) + } + test("union: project to each side") { val unionQuery = testUnion.select('a) val unionOptimized = Optimize.execute(unionQuery.analyze) From 09a56729669d2b3338b6d3b205008348519557f3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Dec 2015 15:32:31 -0800 Subject: [PATCH 08/10] changed limitedNumRows to maxRows --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ea206da5bfb67..258be6c8575dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -156,7 +156,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { // Adding extra Limit below UNION ALL if both left and right childs are not Limit. // This heuristic is valid assuming there does not exist any Limit push-down rule. case Limit(exp, Union(left, right)) - if left.limitedNumRows.isEmpty || right.limitedNumRows.isEmpty => + if left.maxRows.isEmpty || right.maxRows.isEmpty => Limit(exp, Union( CombineLimits(Limit(exp, left)), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index d8690717e800b..ad9fbbf48a754 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -93,7 +93,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** * Returns the limited number of rows to be returned. */ - def limitedNumRows: Option[Expression] = None + def maxRows: Option[Expression] = None /** * Returns true if this expression and all its children have been resolved to a specific schema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index b91844f9ac5d6..b329b941673e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -451,7 +451,7 @@ case class Pivot( case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - override def limitedNumRows: Option[Expression] = Option(limitExpr) + override def maxRows: Option[Expression] = Option(limitExpr) override lazy val statistics: Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] From 358d62e7736191420f0d0a364269baa4fa54b0cb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Dec 2015 22:14:12 -0800 Subject: [PATCH 09/10] address the comments. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 10 ++++++---- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 ++ .../sql/catalyst/plans/logical/basicOperators.scala | 2 ++ .../catalyst/optimizer/SetOperationPushDownSuite.scala | 5 +++-- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 258be6c8575dc..3e0a990fb327c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -153,14 +153,16 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { ) ) - // Adding extra Limit below UNION ALL if both left and right childs are not Limit. - // This heuristic is valid assuming there does not exist any Limit push-down rule. + // Adding extra Limit below UNION ALL iff both left and right childs are not Limit and no Limit + // was pushed down before. This heuristic is valid assuming there does not exist any Limit + // push-down rule that is unable to infer the value of maxRows. Any operator that a Limit can + // be pushed passed should override this function. case Limit(exp, Union(left, right)) if left.maxRows.isEmpty || right.maxRows.isEmpty => Limit(exp, Union( - CombineLimits(Limit(exp, left)), - CombineLimits(Limit(exp, right)))) + Limit(exp, left), + Limit(exp, right))) // Push down deterministic projection through UNION ALL case p @ Project(projectList, u @ Union(left, right)) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index ad9fbbf48a754..e72c03ee77303 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -92,6 +92,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** * Returns the limited number of rows to be returned. + * + * Any operator that a Limit can be pushed passed should override this function. */ def maxRows: Option[Expression] = None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index b329b941673e5..e9a65bf0508e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -28,6 +28,8 @@ import scala.collection.mutable.ArrayBuffer case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override def maxRows: Option[Expression] = child.maxRows + override lazy val resolved: Boolean = { val hasSpecialExpressions = projectList.exists ( _.collect { case agg: AggregateExpression => agg diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 9483ed10ba3a2..b0ed905648d1a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -31,6 +31,7 @@ class SetOperationPushDownSuite extends PlanTest { EliminateSubQueries) :: Batch("Union Pushdown", Once, SetOperationPushDown, + CombineLimits, ConstantFolding, BooleanSimplification, SimplifyFilters) :: Nil @@ -82,11 +83,11 @@ class SetOperationPushDownSuite extends PlanTest { // If users already manually added the Limit, we do not add extra Limit test("union: no limit to both sides") { - val testLimitUnion = Union(testRelation.limit(2), testRelation2.limit(3)) + val testLimitUnion = Union(testRelation.limit(2), testRelation2.select('d).limit(3)) val unionQuery = testLimitUnion.limit(2) val unionOptimized = Optimize.execute(unionQuery.analyze) val unionCorrectAnswer = - Limit(2, Union(testRelation.limit(2), testRelation2.limit(3))).analyze + Limit(2, Union(testRelation.limit(2), testRelation2.select('d).limit(3))).analyze comparePlans(unionOptimized, unionCorrectAnswer) } From 2823a57b14705f961e53f53c9341f42752889474 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Dec 2015 15:28:22 -0800 Subject: [PATCH 10/10] addressed comments. --- .../sql/catalyst/optimizer/Optimizer.scala | 44 ++++++++---- .../plans/logical/basicOperators.scala | 3 + .../optimizer/PushdownLimitsSuite.scala | 67 +++++++++++++++++++ .../optimizer/SetOperationPushDownSuite.scala | 27 -------- 4 files changed, 102 insertions(+), 39 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushdownLimitsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3e0a990fb327c..98e8d9d2b875b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -51,7 +51,6 @@ object DefaultOptimizer extends Optimizer { // Operator combine ProjectCollapsing, CombineFilters, - CombineLimits, // Constant folding NullPropagation, OptimizeIn, @@ -62,12 +61,44 @@ object DefaultOptimizer extends Optimizer { SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions) :: + Batch("Push Down Limits", FixedPoint(100), + PushDownLimit, + CombineLimits, + ConstantFolding, + BooleanSimplification) :: Batch("Decimal Optimizations", FixedPoint(100), DecimalAggregates) :: Batch("LocalRelation", FixedPoint(100), ConvertToLocalRelation) :: Nil } +/** + * Pushes down Limit for reducing the amount of returned data. + * + * 1. Adding Extra Limit beneath the operations, including Union All. + * 2. Project is pushed through Limit in the rule ColumnPruning + * + * Any operator that a Limit can be pushed passed should override the maxRows function. + * + * Note: This rule has to be done when the logical plan is stable; + * Otherwise, it could impact the other rules. + */ +object PushDownLimit extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + + // Adding extra Limit below UNION ALL iff both left and right childs are not Limit or + // do not have Limit descendants. This heuristic is valid assuming there does not exist + // any Limit push-down rule that is unable to infer the value of maxRows. + case Limit(exp, Union(left, right)) + if left.maxRows.isEmpty || right.maxRows.isEmpty => + Limit(exp, + Union( + Limit(exp, left), + Limit(exp, right))) + } +} + /** * Pushes operations down into a Sample. */ @@ -153,17 +184,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { ) ) - // Adding extra Limit below UNION ALL iff both left and right childs are not Limit and no Limit - // was pushed down before. This heuristic is valid assuming there does not exist any Limit - // push-down rule that is unable to infer the value of maxRows. Any operator that a Limit can - // be pushed passed should override this function. - case Limit(exp, Union(left, right)) - if left.maxRows.isEmpty || right.maxRows.isEmpty => - Limit(exp, - Union( - Limit(exp, left), - Limit(exp, right))) - // Push down deterministic projection through UNION ALL case p @ Project(projectList, u @ Union(left, right)) => if (projectList.forall(_.deterministic)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index e9a65bf0508e7..245269a5c6b76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -111,6 +111,9 @@ private[sql] object SetOperation { case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { + override def maxRows: Option[Expression] = + for (leftMax <- left.maxRows; rightMax <- right.maxRows) yield Add(leftMax, rightMax) + override def statistics: Statistics = { val sizeInBytes = left.statistics.sizeInBytes + right.statistics.sizeInBytes Statistics(sizeInBytes = sizeInBytes) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushdownLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushdownLimitsSuite.scala new file mode 100644 index 0000000000000..d3c97ceb8a04a --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushdownLimitsSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class PushdownLimitsSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubQueries) :: + Batch("Push Down Limit", Once, + PushDownLimit, + CombineLimits, + ConstantFolding, + BooleanSimplification) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int) + + test("Union: limit to each side") { + val unionQuery = Union(testRelation, testRelation2).limit(1) + val unionOptimized = Optimize.execute(unionQuery.analyze) + val unionCorrectAnswer = + Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze + comparePlans(unionOptimized, unionCorrectAnswer) + } + + test("Union: limit to each side with the new limit number") { + val testLimitUnion = Union(testRelation, testRelation2.limit(3)) + val unionQuery = testLimitUnion.limit(1) + val unionOptimized = Optimize.execute(unionQuery.analyze) + val unionCorrectAnswer = + Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze + comparePlans(unionOptimized, unionCorrectAnswer) + } + + test("Union: no limit to both sides") { + val testLimitUnion = Union(testRelation.limit(2), testRelation2.select('d).limit(3)) + val unionQuery = testLimitUnion.limit(2) + val unionOptimized = Optimize.execute(unionQuery.analyze) + val unionCorrectAnswer = + Limit(2, Union(testRelation.limit(2), testRelation2.select('d).limit(3))).analyze + comparePlans(unionOptimized, unionCorrectAnswer) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index b0ed905648d1a..0837f10736650 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -64,33 +64,6 @@ class SetOperationPushDownSuite extends PlanTest { comparePlans(exceptOptimized, exceptCorrectAnswer) } - test("union: limit to each side") { - val unionQuery = testUnion.limit(1) - val unionOptimized = Optimize.execute(unionQuery.analyze) - val unionCorrectAnswer = - Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze - comparePlans(unionOptimized, unionCorrectAnswer) - } - - test("union: limit to each side with the new limit number") { - val testLimitUnion = Union(testRelation, testRelation2.limit(3)) - val unionQuery = testLimitUnion.limit(1) - val unionOptimized = Optimize.execute(unionQuery.analyze) - val unionCorrectAnswer = - Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze - comparePlans(unionOptimized, unionCorrectAnswer) - } - - // If users already manually added the Limit, we do not add extra Limit - test("union: no limit to both sides") { - val testLimitUnion = Union(testRelation.limit(2), testRelation2.select('d).limit(3)) - val unionQuery = testLimitUnion.limit(2) - val unionOptimized = Optimize.execute(unionQuery.analyze) - val unionCorrectAnswer = - Limit(2, Union(testRelation.limit(2), testRelation2.select('d).limit(3))).analyze - comparePlans(unionOptimized, unionCorrectAnswer) - } - test("union: project to each side") { val unionQuery = testUnion.select('a) val unionOptimized = Optimize.execute(unionQuery.analyze)