Skip to content

Commit

Permalink
[SPARK-40002][SQL] Don't push down limit through window using ntile
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Change `LimitPushDownThroughWindow` so that it no longer supports pushing down a limit through a window using ntile.

### Why are the changes needed?

In an unpartitioned window, the ntile function is currently applied to the result of the limit. This behavior produces results that conflict with Spark 3.1.3, Hive 2.3.9 and Prestodb 0.268

#### Example

Assume this data:
```
create table t1 stored as parquet as
select *
from range(101);
```
Also assume this query:
```
select id, ntile(10) over (order by id) as nt
from t1
limit 10;
```
With Spark 3.2.2, Spark 3.3.0, and master, the limit is applied before the ntile function:
```
+---+---+
|id |nt |
+---+---+
|0  |1  |
|1  |2  |
|2  |3  |
|3  |4  |
|4  |5  |
|5  |6  |
|6  |7  |
|7  |8  |
|8  |9  |
|9  |10 |
+---+---+
```
With Spark 3.1.3, and Hive 2.3.9, and Prestodb 0.268, the limit is applied _after_ ntile.

Spark 3.1.3:
```
+---+---+
|id |nt |
+---+---+
|0  |1  |
|1  |1  |
|2  |1  |
|3  |1  |
|4  |1  |
|5  |1  |
|6  |1  |
|7  |1  |
|8  |1  |
|9  |1  |
+---+---+
```
Hive 2.3.9:
```
+-----+-----+
| id  | nt  |
+-----+-----+
| 0   | 1   |
| 1   | 1   |
| 2   | 1   |
| 3   | 1   |
| 4   | 1   |
| 5   | 1   |
| 6   | 1   |
| 7   | 1   |
| 8   | 1   |
| 9   | 1   |
+-----+-----+
10 rows selected (1.72 seconds)
```
Prestodb 0.268:
```
 id | nt
----+----
  0 |  1
  1 |  1
  2 |  1
  3 |  1
  4 |  1
  5 |  1
  6 |  1
  7 |  1
  8 |  1
  9 |  1
(10 rows)

```
### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Two new unit tests.

Closes #37443 from bersprockets/pushdown_ntile.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c9156e5)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
bersprockets authored and HyukjinKwon committed Aug 9, 2022
1 parent 509bf3b commit 16a6788
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, NTile, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, WINDOW}
Expand All @@ -33,8 +33,7 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] {
// The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW.
private def supportsPushdownThroughWindow(
windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall {
case Alias(WindowExpression(_: Rank | _: DenseRank | _: NTile | _: RowNumber,
WindowSpecDefinition(Nil, _,
case Alias(WindowExpression(_: Rank | _: DenseRank | _: RowNumber, WindowSpecDefinition(Nil, _,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{CurrentRow, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
import org.apache.spark.sql.catalyst.expressions.{CurrentRow, NTile, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -198,4 +198,15 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
Optimize.execute(originalQuery.analyze),
WithoutOptimize.execute(originalQuery.analyze))
}

test("SPARK-40002: Should not push through ntile window function") {
val originalQuery = testRelation
.select(a, b, c,
windowExpr(new NTile(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("nt"))
.limit(2)

comparePlans(
Optimize.execute(originalQuery.analyze),
WithoutOptimize.execute(originalQuery.analyze))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1127,4 +1127,17 @@ class DataFrameWindowFunctionsSuite extends QueryTest
)
)
}

test("SPARK-40002: ntile should apply before limit") {
val df = Seq.tabulate(101)(identity).toDF("id")
val w = Window.orderBy("id")
checkAnswer(
df.select($"id", ntile(10).over(w)).limit(3),
Seq(
Row(0, 1),
Row(1, 1),
Row(2, 1)
)
)
}
}

0 comments on commit 16a6788

Please sign in to comment.