Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
override def visitWindowDef(ctx: WindowDefContext): WindowSpecDefinition = withOrigin(ctx) {
// CLUSTER BY ... | PARTITION BY ... ORDER BY ...
val partition = ctx.partition.asScala.map(expression)
val order = ctx.sortItem.asScala.map(visitSortItem)
val order = if (ctx.sortItem.asScala.nonEmpty) {
ctx.sortItem.asScala.map(visitSortItem)
} else if (ctx.windowFrame != null &&
ctx.windowFrame().frameType.getType == SqlBaseParser.RANGE) {
// for RANGE window frame, we won't add default order spec
ctx.sortItem.asScala.map(visitSortItem)
} else {
// Same default behaviors like hive, when order spec is null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the ANSI SQL spec: If WD has no window ordering clause, then the window ordering is implementation-dependent, and all rows are peers.

I don't think this is a bug fix but rather a new feature. We need to justify it: what's the behavior of other popular SQL systems like presto, snowflake, redshift, etc.? And what's the benefit for end users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the ANSI SQL spec: If WD has no window ordering clause, then the window ordering is implementation-dependent, and all rows are peers.

I don't think this is a bug fix but rather a new feature. We need to justify it: what's the behavior of other popular SQL systems like presto, snowflake, redshift, etc.? And what's the benefit for end users?

I will check other SQL system later.
In our production, one benefit is we can migrate hive sql to spark sql smother and don't need to rewrite sql one by one

// set partition spec expression as order spec
ctx.partition.asScala.map { expr =>
SortOrder(expression(expr), Ascending, Ascending.defaultNullOrdering, Set.empty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. why do we set the ordering column as partition column? We should just leave it unspecified so only (non-window) aggregation functions work together with unbounded windows so it doesn't get affected by the order. This is what Scala API does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. why do we set the ordering column as partition column? We should just leave it unspecified so only (non-window) aggregation functions work together with unbounded windows so it doesn't get affected by the order. This is what Scala API does.

emmmm, hive doing like this...for me, when user not set order by clause, means he don't care about result order. For Range DataFrame we can't support this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the results will be useless. When can it be useful if the order is indeterministic for the functions dependent on the order .. ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the results will be useless. When can it be useful if the order is indeterministic for the functions dependent on the order .. ?

In postgre sql , if we don't specify order column, the result is according to partition column 's default sort order.

angerszhu=# explain analyze verbose select id, num, lead(id) over (partition by num) from s4;
                                                    QUERY PLAN                                                     
-------------------------------------------------------------------------------------------------------------------
 WindowAgg  (cost=158.51..198.06 rows=2260 width=12) (actual time=0.107..0.122 rows=6 loops=1)
   Output: id, num, lead(id) OVER (?)
   ->  Sort  (cost=158.51..164.16 rows=2260 width=8) (actual time=0.079..0.081 rows=6 loops=1)
         Output: num, id
         Sort Key: s4.num
         Sort Method: quicksort  Memory: 25kB
         ->  Seq Scan on public.s4  (cost=0.00..32.60 rows=2260 width=8) (actual time=0.057..0.061 rows=6 loops=1)
               Output: num, id
 Planning Time: 0.114 ms
 Execution Time: 0.214 ms

angerszhu=# explain analyze verbose select id, num, lead(id) over (partition by num order by id) from s4;
                                                    QUERY PLAN                                                     
-------------------------------------------------------------------------------------------------------------------
 WindowAgg  (cost=158.51..203.71 rows=2260 width=12) (actual time=0.976..1.017 rows=6 loops=1)
   Output: id, num, lead(id) OVER (?)
   ->  Sort  (cost=158.51..164.16 rows=2260 width=8) (actual time=0.067..0.070 rows=6 loops=1)
         Output: id, num
         Sort Key: s4.num, s4.id
         Sort Method: quicksort  Memory: 25kB
         ->  Seq Scan on public.s4  (cost=0.00..32.60 rows=2260 width=8) (actual time=0.042..0.045 rows=6 loops=1)
               Output: id, num
 Planning Time: 0.155 ms
 Execution Time: 1.208 ms
(10 rows)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess because PostgreSQL can keep the natural order. Spark can't keep the natural order. Is PostgreSQL result deterministic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deterministic

For same sql, result is deterministic.

And we add partition column as order by column by default can keep result deterministic.

I meet this problem when migration hive sql to spark sql.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not fix it because Spark side at least the results will be non-deterministic. I doubt if this is good to add this support only because of compatibility with other DMBSes when the output is expected to be useless.

Maybe disallowing it might be a better idea than finding another problem later caused by the different and indeterministic data.

Do you maybe know other cases from other distributed DBMSs such as presto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not fix it because Spark side at least the results will be non-deterministic. I doubt if this is good to add this support only because of compatibility with other DMBSes when the output is expected to be useless.

Maybe disallowing it might be a better idea than finding another problem later caused by the different and indeterministic data.

Do you maybe know other cases from other distributed DBMSs such as presto?

but in my fix, we add default order spec, the result will be deterministic.
In origin way, this kind sql can't run since it will get non-deterministic result and is rejected by

case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty =>
failAnalysis(s"Window function $wf requires window to be ordered, please add ORDER BY " +
s"clause. For example SELECT $wf(value_expr) OVER (PARTITION BY window_partition " +
s"ORDER BY window_ordering) from table")

}
}

// RANGE/ROWS BETWEEN ...
val frameSpecOption = Option(ctx.windowFrame).map { frame =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,13 @@ class ExpressionParserSuite extends AnalysisTest {
WindowExpression(func, WindowSpecDefinition(partitioning, ordering, frame))
}

val defaultOrder = Seq(SortOrder('a, Ascending, Set.empty), SortOrder('b, Ascending, Set.empty))
// Basic window testing.
assertEqual("foo(*) over w1", UnresolvedWindowExpression(func, WindowSpecReference("w1")))
assertEqual("foo(*) over ()", windowed())
assertEqual("foo(*) over (partition by a, b)", windowed(Seq('a, 'b)))
assertEqual("foo(*) over (distribute by a, b)", windowed(Seq('a, 'b)))
assertEqual("foo(*) over (cluster by a, b)", windowed(Seq('a, 'b)))
assertEqual("foo(*) over (partition by a, b)", windowed(Seq('a, 'b), defaultOrder))
assertEqual("foo(*) over (distribute by a, b)", windowed(Seq('a, 'b), defaultOrder))
assertEqual("foo(*) over (cluster by a, b)", windowed(Seq('a, 'b), defaultOrder))
assertEqual("foo(*) over (order by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc)))
assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc)))
assertEqual("foo(*) over (partition by a, b order by c)", windowed(Seq('a, 'b), Seq('c.asc)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct<>
SELECT four, ten, SUM(SUM(four)) OVER (PARTITION BY four), AVG(ten) FROM tenk1
GROUP BY four, ten ORDER BY four, ten
-- !query schema
struct<four:int,ten:int,sum(sum(CAST(four AS BIGINT))) OVER (PARTITION BY four ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(ten):double>
struct<four:int,ten:int,sum(sum(CAST(four AS BIGINT))) OVER (PARTITION BY four ORDER BY four ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint,avg(ten):double>
-- !query output
0 0 0 0.0
0 2 0 2.0
Expand Down Expand Up @@ -306,7 +306,7 @@ SELECT last(ten) OVER (PARTITION BY four), ten, four FROM
(SELECT * FROM tenk1 WHERE unique2 < 10 ORDER BY four, ten)s
ORDER BY four, ten
-- !query schema
struct<last(ten, false) OVER (PARTITION BY four ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):int,ten:int,four:int>
struct<last(ten, false) OVER (PARTITION BY four ORDER BY four ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int,ten:int,four:int>
-- !query output
4 0 0
4 0 0
Expand Down Expand Up @@ -341,8 +341,7 @@ struct<ten:int,two:int,gsum:bigint,wsum:bigint>
-- !query
SELECT count(*) OVER (PARTITION BY four), four FROM (SELECT * FROM tenk1 WHERE two = 1)s WHERE unique2 < 10
-- !query schema
struct<count(1) OVER (PARTITION BY four ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,four:int>
-- !query output
struct<count(1) OVER (PARTITION BY four ORDER BY four ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint,four:int>-- !query output
2 3
2 3
4 1
Expand Down Expand Up @@ -422,7 +421,7 @@ struct<ten:int,two:int,gsum:bigint,wsum:bigint>
-- !query
SELECT count(*) OVER (PARTITION BY four) FROM (SELECT * FROM tenk1 WHERE FALSE)s
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, now I completely got what you're trying to do it. You do want window functions to work without specifying the ordering, and non-window functions already work without specifying ordering (because the results will be deterministic anyway). Yes, -1 for the same comment from @hvanhovell.

-- !query schema
struct<count(1) OVER (PARTITION BY four ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint>
struct<count(1) OVER (PARTITION BY four ORDER BY four ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint>
-- !query output


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ struct<x:bigint,sum(x) OVER (ORDER BY x ASC NULLS FIRST RANGE BETWEEN CAST((- 1)
-- !query
SELECT count(*) OVER (PARTITION BY four) FROM (SELECT * FROM tenk1 UNION ALL SELECT * FROM tenk2)s LIMIT 0
-- !query schema
struct<count(1) OVER (PARTITION BY four ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint>
struct<count(1) OVER (PARTITION BY four ORDER BY four ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint>
-- !query output


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,18 @@ NULL a NULL
-- !query
SELECT udf(val), cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, udf(val)
-- !query schema
struct<>
struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,row_number() OVER (PARTITION BY cate ORDER BY cate ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int>
-- !query output
org.apache.spark.sql.AnalysisException
Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
NULL NULL 1
3 NULL 2
NULL a 1
1 a 2
1 a 3
2 a 4
1 b 1
2 b 2
3 b 3



-- !query
Expand Down
13 changes: 10 additions & 3 deletions sql/core/src/test/resources/sql-tests/results/window.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,17 @@ NULL a NULL
-- !query
SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, val
-- !query schema
struct<>
struct<val:int,cate:string,row_number() OVER (PARTITION BY cate ORDER BY cate ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int>
-- !query output
org.apache.spark.sql.AnalysisException
Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
NULL NULL 1
3 NULL 2
NULL a 1
1 a 2
1 a 3
2 a 4
1 b 1
2 b 2
3 b 3


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,4 +487,48 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession {

spark.catalog.dropTempView("nums")
}

test("Window function set partitionSpec as order spec when orderSpec is empty") {
val data = Seq(
WindowData(1, "a", 5),
WindowData(2, "a", 6),
WindowData(3, "b", 7),
WindowData(4, "b", 8),
WindowData(5, "c", 9),
WindowData(6, "c", 10)
)
sparkContext.parallelize(data).toDF().createOrReplaceTempView("windowData")

checkAnswer(
sql(
"""
|select month, area, product, lead(month) over (partition by area) as lead_month
|from windowData
""".stripMargin),
Seq(
(1, "a", 5, 2),
(2, "a", 6, null),
(3, "b", 7, 4),
(4, "b", 8, null),
(5, "c", 9, 6),
(6, "c", 10, null)
).map(i => Row(i._1, i._2, i._3, i._4)))


checkAnswer(
sql(
"""
|select month, area, product,
|lead(month) over (partition by area order by product desc) as lead_month
|from windowData
""".stripMargin),
Seq(
(1, "a", 5, null),
(2, "a", 6, 1),
(3, "b", 7, null),
(4, "b", 8, 3),
(5, "c", 9, null),
(6, "c", 10, 5)
).map(i => Row(i._1, i._2, i._3, i._4)))
}
}