-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-49563][SQL] Add SQL pipe syntax for the WINDOW operator #48649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan @gengliangwang :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this!! I tried to think of testing ideas.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Outdated
Show resolved
Hide resolved
Thank you @dtenedor for the consideration suggestions! I have updated the code accordingly and added more test cases. |
kindly ping @dtenedor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra test coverage helps a lot, thanks for adding it!
(Apologies for delay in review, will try to respond faster next time.)
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Outdated
Show resolved
Hide resolved
48e4332
to
7aae504
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this approach looks right! Just a final suggestion for a better error message in one location and an update to the SQL golden file tests to make them deterministic, then this PR should be complete.
sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
Outdated
Show resolved
Hide resolved
Thanks again for the review! @dtenedor Please feel free to let me know any other comments! |
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
Show resolved
Hide resolved
table windowTestData | ||
|> select cate, val, sum(val) over w1, first_value(cate) over w2 | ||
window w1 as (partition by cate) | ||
window w2 as (order by val) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused, aren't w1
and w2
both defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, is it a bug? @srielau
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is, it seems like the same bug for both regular SQL syntax and pipe operators, and the behavior would work the same for both (now, and if we were to change it later).
sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
Show resolved
Hide resolved
|> select cate, sum(val) over val | ||
window val as (partition by cate order by val); | ||
|
||
-- WINDOW definition can be referred in the downstream SELECT clause. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Angryrou the query in this test case should fail now, since the window w
is not yet defined in the |> select
operator. Do we need to re-generate the golden files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed! I added logic to ensure that each window name used in UnresolvedWindowExpression
is properly defined within the same |> SELECT
clause.
In the previous logic, UnresolvedWindowExpression
was analyzed only after scanning all pipe operators. As a result, in this test case, the window w
in the first |> SELECT
clause could incorrectly reference the definition in the second |> SELECT
.
With this change, an error will be raised with INVALID_SQL_SYNTAX.WINDOW_REFERENCE_NOT_FOUND
if a window name is used but no window clause exists, or INVALID_SQL_SYNTAX.UNRESOLVED_WINDOW_REFERENCE
if a window name in UnresolvedWindowExpression
is not defined within the same |> SELECT
clause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, just have a question about one remaining test case.
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
Show resolved
Hide resolved
|> select cate, sum(val) over val | ||
window val as (partition by cate order by val); | ||
|
||
-- WINDOW definition can be referred in the downstream SELECT clause. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Angryrou the query in this test case should fail now, since the window w
is not yet defined in the |> select
operator. Do we need to re-generate the golden files?
table windowTestData | ||
|> select cate, val, sum(val) over w1, first_value(cate) over w2 | ||
window w1 as (partition by cate) | ||
window w2 as (order by val) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is, it seems like the same bug for both regular SQL syntax and pipe operators, and the behavior would work the same for both (now, and if we were to change it later).
throw QueryParsingErrors.cannotFindWindowReferenceError(unresolvedWindowNames.head, ctx) | ||
} else { | ||
// Find any unresolved window names not defined in windowDefs | ||
unresolvedWindowNames.find(!windowDefs.contains(_)) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to not do analysis work in the parser, as we may miss something when re-implementing the analysis work. E.g. shall we consider case sensitivity here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Angryrou can you highlight the logical plan differences between classic and pipe SQL syntaxes? I'd like to understand why we can reference previously defined window frame in the pipe syntax and see how we can fix it in the analyzer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Differences @cloud-fan
- with the pipe syntax, the
unresolvedwindowexpression
is wrapped by apipeselect
expression - the classic syntax has an auto-generated subquery name for the subquery
In the pipe syntax, the WINDOW clause in the 2nd |> SELECT
leads to a WithWindowDefinition for w
at the top of the plan tree. The 1st |> SELECT
seems to be able to get the information as well.
In the classic syntax, the outer query defines w
and gets a WithWindowDefinition
at the top of the tree as well. So the inner subquery is able to get the window definition.
SQLs
-- pipe syntax
table windowTestData
|> select cate, val, first_value(cate) over w as first_val
|> select cate, val, sum(val) over w as sum_val
window w as (order by val);
-- classic syntax
select cate, val, sum(val) over w as sum_val
from (
select cate, val, first_value(cate) over w as first_val
from windowTestData
)
window w as (order by val);
Parsed Logical Plans (parser.parsePlan(...)
)
-- from pipe syntax
'WithWindowDefinition [w=windowspecdefinition('val ASC NULLS FIRST, unspecifiedframe$())]
+- 'Project ['cate, 'val, pipeselect(unresolvedwindowexpression('sum('val), WindowSpecReference(w))) AS sum_val#3]
+- 'Project ['cate, 'val, pipeselect(unresolvedwindowexpression('first_value('cate), WindowSpecReference(w))) AS first_val#2]
+- 'UnresolvedRelation [windowTestData], [], false
-- from classic syntax
'WithWindowDefinition [w=windowspecdefinition('val ASC NULLS FIRST, unspecifiedframe$())]
+- 'Project ['cate, 'val, unresolvedwindowexpression('sum('val), WindowSpecReference(w)) AS sum_val#1]
+- 'SubqueryAlias __auto_generated_subquery_name
+- 'Project ['cate, 'val, unresolvedwindowexpression('first_value('cate), WindowSpecReference(w)) AS first_val#0]
+- 'UnresolvedRelation [windowTestData], [], false
Analyzed Logical Plans (from the golden file)
https://docs.google.com/document/d/1qa1jUAoWa0aS5037oazydJRYQSAFFklUlPmnwLxOZDM/edit?usp=sharing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Angryrou thanks for the detailed explanation!
Looking at the analyzer rule WindowsSubstitution
, we actually allow all the child nodes to reference the window frame definitions inside WithWindowDefinition
. This makes sense for classic SQL, as the WINDOW clause is for the entire query, but it's pretty weird for the pipe SQL.
It looks like we want a variant of WithWindowDefinition
that only works for its direct child node. How about we add a flag forPipeSQL
in WithWindowDefinition
, and only resolve the window frames in the direct child.
object WindowsSubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(WITH_WINDOW_DEFINITION, UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
val windowSpecDefinition = windowDefinitions.getOrElse(windowName,
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
WindowExpression(c, windowSpecDefinition)
}
}
}
child.resolveExpressions
actually traverses down the entire plan tree, and we should call child.transformExpressions
to only resolve expressions in the current node, when the forPipeSQL
flag is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Thanks for the guidance! I have changed it accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution generally LGTM, if Wenchen agrees :)
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
thanks, merging to master! |
What changes were proposed in this pull request?
This PR adds SQL pipe syntax support for the WINDOW clause within the pipe SELECT operator
|> SELECT
.For example
Notes:
|> WHERE
is not extended to use the WINDOW clause because|> WHERE
does not support window functions in its expressions.Why are the changes needed?
The SQL pipe operator syntax will let users compose queries in a more flexible fashion.
Does this PR introduce any user-facing change?
Yes, see above.
How was this patch tested?
Yes
Was this patch authored or co-authored using generative AI tooling?
No