Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-34575][SQL] Push down limit through window when partitionSpec …
…is empty ### What changes were proposed in this pull request? Push down limit through `Window` when the partitionSpec of all window functions is empty and the same order is used. This is a real case from production: ![image](https://user-images.githubusercontent.com/5399861/109457143-3900c680-7a95-11eb-9078-806b041175c2.png) This pr support 2 cases: 1. All window functions have same orderSpec: ```sql SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn, RANK() OVER(ORDER BY a) AS rk FROM t1 LIMIT 5; == Optimized Logical Plan == Window [row_number() windowspecdefinition(a#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#4, rank(a#9L) windowspecdefinition(a#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#5], [a#9L ASC NULLS FIRST] +- GlobalLimit 5 +- LocalLimit 5 +- Sort [a#9L ASC NULLS FIRST], true +- Relation default.t1[A#9L,B#10L,C#11L] parquet ``` 2. There is a window function with a different orderSpec: ```sql SELECT a, ROW_NUMBER() OVER(ORDER BY a) AS rn, RANK() OVER(ORDER BY b DESC) AS rk FROM t1 LIMIT 5; == Optimized Logical Plan == Project [a#9L, rn#4, rk#5] +- Window [rank(b#10L) windowspecdefinition(b#10L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#5], [b#10L DESC NULLS LAST] +- GlobalLimit 5 +- LocalLimit 5 +- Sort [b#10L DESC NULLS LAST], true +- Window [row_number() windowspecdefinition(a#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#4], [a#9L ASC NULLS FIRST] +- Project [a#9L, b#10L] +- Relation default.t1[A#9L,B#10L,C#11L] parquet ``` ### Why are the changes needed? Improve query performance. ```scala spark.range(500000000L).selectExpr("id AS a", "id AS b").write.saveAsTable("t1") spark.sql("SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rowId FROM t1 LIMIT 5").show ``` Before this pr | After this pr -- | -- ![image](https://user-images.githubusercontent.com/5399861/109456919-c68fe680-7a94-11eb-89ca-67ec03267158.png) | ![image](https://user-images.githubusercontent.com/5399861/109456927-cd1e5e00-7a94-11eb-9866-d76b2665caea.png) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #31691 from wangyum/SPARK-34575. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
- Loading branch information
Showing
4 changed files
with
280 additions
and
1 deletion.
There are no files selected for viewing
56 changes: 56 additions & 0 deletions
56
...t/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.optimizer | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, IntegerLiteral, NamedExpression, RankLike, RowFrame, RowNumberLike, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} | ||
import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
|
||
/** | ||
* Pushes down [[LocalLimit]] beneath WINDOW. This rule optimizes the following case: | ||
* {{{ | ||
* SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn FROM Tab1 LIMIT 5 ==> | ||
* SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn FROM (SELECT * FROM Tab1 ORDER BY a LIMIT 5) t | ||
* }}} | ||
*/ | ||
object LimitPushDownThroughWindow extends Rule[LogicalPlan] { | ||
// The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW. | ||
private def supportsPushdownThroughWindow( | ||
windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall { | ||
case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(Nil, _, | ||
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true | ||
case _ => false | ||
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
// Adding an extra Limit below WINDOW when the partitionSpec of all window functions is empty. | ||
case LocalLimit(limitExpr @ IntegerLiteral(limit), | ||
window @ Window(windowExpressions, Nil, orderSpec, child)) | ||
if supportsPushdownThroughWindow(windowExpressions) && child.maxRows.forall(_ > limit) && | ||
limit < conf.topKSortFallbackThreshold => | ||
// Sort is needed here because we need global sort. | ||
window.copy(child = Limit(limitExpr, Sort(orderSpec, true, child))) | ||
// There is a Project between LocalLimit and Window if they do not have the same output. | ||
case LocalLimit(limitExpr @ IntegerLiteral(limit), project @ Project(_, | ||
window @ Window(windowExpressions, Nil, orderSpec, child))) | ||
if supportsPushdownThroughWindow(windowExpressions) && child.maxRows.forall(_ > limit) && | ||
limit < conf.topKSortFallbackThreshold => | ||
// Sort is needed here because we need global sort. | ||
project.copy(child = window.copy(child = Limit(limitExpr, Sort(orderSpec, true, child)))) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
190 changes: 190 additions & 0 deletions
190
.../test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.optimizer | ||
|
||
import org.apache.spark.sql.Row | ||
import org.apache.spark.sql.catalyst.dsl.expressions._ | ||
import org.apache.spark.sql.catalyst.dsl.plans._ | ||
import org.apache.spark.sql.catalyst.expressions.{CurrentRow, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding} | ||
import org.apache.spark.sql.catalyst.plans._ | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules._ | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
class LimitPushdownThroughWindowSuite extends PlanTest { | ||
// CollapseProject and RemoveNoopOperators is needed because we need it to collapse project. | ||
private val limitPushdownRules = Seq( | ||
CollapseProject, | ||
RemoveNoopOperators, | ||
LimitPushDownThroughWindow, | ||
EliminateLimits, | ||
ConstantFolding, | ||
BooleanSimplification) | ||
|
||
private object Optimize extends RuleExecutor[LogicalPlan] { | ||
val batches = | ||
Batch("Limit pushdown through window", FixedPoint(100), | ||
limitPushdownRules: _*) :: Nil | ||
} | ||
|
||
private object WithoutOptimize extends RuleExecutor[LogicalPlan] { | ||
val batches = | ||
Batch("Without limit pushdown through window", FixedPoint(100), | ||
limitPushdownRules | ||
.filterNot(_.ruleName.equals(LimitPushDownThroughWindow.ruleName)): _*) :: Nil | ||
} | ||
|
||
private val testRelation = LocalRelation.fromExternalRows( | ||
Seq("a".attr.int, "b".attr.int, "c".attr.int), | ||
1.to(6).map(_ => Row(1, 2, 3))) | ||
|
||
private val a = testRelation.output(0) | ||
private val b = testRelation.output(1) | ||
private val c = testRelation.output(2) | ||
private val windowFrame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow) | ||
|
||
test("Push down limit through window when partitionSpec is empty") { | ||
val originalQuery = testRelation | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) | ||
.limit(2) | ||
val correctAnswer = testRelation | ||
.select(a, b, c) | ||
.orderBy(c.desc) | ||
.limit(2) | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) | ||
|
||
comparePlans( | ||
Optimize.execute(originalQuery.analyze), | ||
WithoutOptimize.execute(correctAnswer.analyze)) | ||
} | ||
|
||
test("Push down limit through window for multiple window functions") { | ||
val originalQuery = testRelation | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"), | ||
windowExpr(new Rank(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rk")) | ||
.limit(2) | ||
val correctAnswer = testRelation | ||
.select(a, b, c) | ||
.orderBy(c.desc) | ||
.limit(2) | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"), | ||
windowExpr(new Rank(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rk")) | ||
|
||
comparePlans( | ||
Optimize.execute(originalQuery.analyze), | ||
WithoutOptimize.execute(correctAnswer.analyze)) | ||
} | ||
|
||
test("Push down limit through window respect spark.sql.execution.topKSortFallbackThreshold") { | ||
Seq(1, 100).foreach { threshold => | ||
withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> threshold.toString) { | ||
val originalQuery = testRelation | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) | ||
.limit(2) | ||
val correctAnswer = if (threshold == 1) { | ||
originalQuery | ||
} else { | ||
testRelation | ||
.select(a, b, c) | ||
.orderBy(c.desc) | ||
.limit(2) | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) | ||
} | ||
|
||
comparePlans( | ||
Optimize.execute(originalQuery.analyze), | ||
WithoutOptimize.execute(correctAnswer.analyze)) | ||
} | ||
} | ||
} | ||
|
||
test("Push down to first window if order column is different") { | ||
val originalQuery = testRelation | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, b.desc :: Nil, windowFrame)).as("rn"), | ||
windowExpr(new Rank(), windowSpec(Nil, c.asc :: Nil, windowFrame)).as("rk")) | ||
.limit(2) | ||
val correctAnswer = testRelation | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, b.desc :: Nil, windowFrame)).as("rn")) | ||
.orderBy(c.asc) | ||
.limit(2) | ||
.select(a, b, c, $"rn".attr, | ||
windowExpr(new Rank(), windowSpec(Nil, c.asc :: Nil, windowFrame)).as("rk")) | ||
|
||
comparePlans( | ||
Optimize.execute(originalQuery.analyze), | ||
WithoutOptimize.execute(correctAnswer.analyze)) | ||
} | ||
|
||
test("Push down if there is a Project between LocalLimit and Window") { | ||
val originalQuery = testRelation | ||
.select(a, b, | ||
windowExpr(RowNumber(), windowSpec(Nil, b.desc :: Nil, windowFrame)).as("rn")) | ||
.select(a, $"rn".attr) | ||
.limit(2) | ||
val correctAnswer = testRelation | ||
.select(a, b) | ||
.orderBy(b.desc) | ||
.limit(2) | ||
.select(a, windowExpr(RowNumber(), windowSpec(Nil, b.desc :: Nil, windowFrame)).as("rn")) | ||
|
||
comparePlans( | ||
Optimize.execute(originalQuery.analyze), | ||
WithoutOptimize.execute(correctAnswer.analyze)) | ||
} | ||
|
||
test("Should not push down if partitionSpec is not empty") { | ||
val originalQuery = testRelation | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) | ||
.limit(2) | ||
|
||
comparePlans( | ||
Optimize.execute(originalQuery.analyze), | ||
WithoutOptimize.execute(originalQuery.analyze)) | ||
} | ||
|
||
test("Should not push down when child's maxRows smaller than limit value") { | ||
val originalQuery = testRelation | ||
.select(a, b, c, | ||
windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) | ||
.limit(20) | ||
|
||
comparePlans( | ||
Optimize.execute(originalQuery.analyze), | ||
WithoutOptimize.execute(originalQuery.analyze)) | ||
} | ||
|
||
test("Should not push down if it is not RankLike/RowNumberLike window function") { | ||
val originalQuery = testRelation | ||
.select(a, b, c, | ||
windowExpr(count(b), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) | ||
.limit(2) | ||
|
||
comparePlans( | ||
Optimize.execute(originalQuery.analyze), | ||
WithoutOptimize.execute(originalQuery.analyze)) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters