[SPARK-31561][SQL] Add QUALIFY Clause#55401
Conversation
12c132e to
0951b60
Compare
| ExtractWindowExpressions :: | ||
| GlobalAggregates :: | ||
| ResolveAggregateFunctions :: | ||
| ResolveQualify :: |
There was a problem hiding this comment.
Because ResolveQualify runs after ExtractWindowExpressions, a SELECT-list window can cause the child Project to carry only SELECT output plus window inputs before QUALIFY gets a chance to add predicate references. For example, SELECT row_number() OVER (ORDER BY b) AS rn FROM t QUALIFY a = 1 should be valid, but a is no longer available if it is not selected or used by the window. Could we resolve QUALIFY before extraction, or otherwise preserve QUALIFY predicate references before ExtractWindowExpressions rewrites the Project?
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
Good catch. The issue is that ExtractWindowExpressions uses resolveOperatorsDownWithPruning which descends into UnresolvedQualify's child and rewrites the inner Project, pruning columns only referenced by the QUALIFY predicate. Simply reordering rules doesn't help because descent happens regardless of rule order.
The fix is to make resolveQualifyCondition recurse through the post-extraction plan shape (Project → Window → Project → ...) all the way to the base relation, resolving at each level and propagating missing attributes upward. This is done in commit 73fbe28.
| if (windowExpressionToAliasMap.size() > 0) { | ||
| val projectList = | ||
| windowExpressionToAliasMap.values().asScala.toSeq | ||
| Filter(newCond, Project(child.output ++ projectList, newChild)) |
There was a problem hiding this comment.
When QUALIFY itself contains a window expression, resolveQualifyCondition may add missing predicate attributes to newChild, but this Project uses the original child.output, dropping those added attributes before the Filter. A query like SELECT a FROM t QUALIFY row_number() OVER (ORDER BY b) = 1 AND c > 0 leaves the Filter referencing c above a Project that only outputs a and the generated window column. Could this use newChild.output or otherwise keep the non-window predicate references until the outer trim Project?
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
Correct. Two changes were needed: (1) use newChild.output instead of child.output so the inner Project includes the missing attributes added by resolveQualifyCondition, and (2) save originalOutput before resolveQualifyCondition modifies the child, then use it for the final trim Project to ensure added predicate-only columns don't leak into query output. Fixed in the same commit.
|
Thanks @viirya ! Just asked Codex to do the first round of review |
| new ResolveFetchCursor(catalogManager) :: | ||
| new ResolveCursors() :: | ||
| ResolveQualify :: | ||
| ExtractWindowExpressions :: |
There was a problem hiding this comment.
IIUC, we had better run ResolveQualify after ExtractWindowExpressions, doesn't it?
There was a problem hiding this comment.
Yes, you're right. ResolveQualify should run after ExtractWindowExpressions. It was moved before as an earlier attempt to fix the column pruning issue raised by @sunchao , but the actual fix (recursing through post-extraction plan shapes in resolveQualifyCondition) works regardless of ordering, and placing it after ExtractWindowExpressions is more correct since ResolveQualify checks for existing Window nodes. Moved it back to after ResolveAggregateFunctions in b8eb250
| * in the plan and add the missing references to the Project/Aggregate and preserve | ||
| * them in all intermediate operators. | ||
| */ | ||
| /** Resolve subqueries in the condition expression using the fake Project pattern. */ |
There was a problem hiding this comment.
Could you merge the above two function descriptions?
There was a problem hiding this comment.
Done in b8eb250. Cleaned up the duplicate doc comments — resolveConditionSubqueries and resolveQualifyCondition now each have a single clear doc comment.
| |QUALIFY b > 1 | ||
| """.stripMargin).queryExecution.analyzed | ||
| } | ||
| assert(e.getMessage.contains("b")) |
There was a problem hiding this comment.
Could you revise the assertion to have more longer exception message because a single letter b could be too general for any error messages.
There was a problem hiding this comment.
Good point. Changed to assert the specific error condition UNRESOLVED_COLUMN.WITH_SUGGESTION and verify that the objectName parameter contains b. Fixed in b8eb250.
| def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning( | ||
| _.containsPattern(WINDOW_EXPRESSION), ruleId) { | ||
|
|
||
|
|
|
|
||
| case p: Project => | ||
| // First resolve against this Project's children output. | ||
| val resolved = resolveExpressionByPlanChildren(cond, p) |
There was a problem hiding this comment.
QUALIFY aliases are only consulted after this children-output resolution fails, so a same-named input column silently wins over the SELECT alias. For example:
SELECT a AS b, ROW_NUMBER() OVER (ORDER BY c) AS rn
FROM t(a, b, c)
QUALIFY b = 1This should filter on the visible SELECT alias a AS b, but this path resolves b against the child output first, so it filters on the input column t.b instead. That contradicts the documented alias use case and can produce wrong results without an error. Please resolve visible SELECT aliases before falling back to non-selected child attributes, and add a collision test.
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
The current resolution order (input columns first, SELECT aliases as fallback) is consistent with the existing Databricks SQL QUALIFY implementation.
In your example:
SELECT a AS b, ROW_NUMBER() OVER (ORDER BY c) AS rn
FROM t(a, b, c)
QUALIFY b = 1
b resolves to the input column t.b, which is the intended behavior.
You're right that our documentation could be more precise. The QUALIFY doc says "It can refer to window functions in the SELECT list by alias" — this works correctly when there is no name collision with input columns (e.g., QUALIFY rank = 1 referencing RANK() OVER (...) AS rank, or QUALIFY total > 1 referencing SUM(b) AS total). When a SELECT alias collides with an input column name, the input column takes precedence. I'll update the documentation to clarify this precedence rule.
583d8aa to
0000182
Compare
Add QUALIFY clause to Spark SQL using UnresolvedQualify LogicalPlan node and a self-contained ResolveQualify rule in Analyzer, with structured error conditions. Co-authored-by: Claude Code Co-authored-by: Chao Sun <chao@openai.com>
…dation, and strict error handling to ResolveQualify - Add resolveConditionSubqueries to handle correlated subqueries in QUALIFY conditions, using the same fake-Project pattern as HAVING resolution. - Validate that resolved attributes in the Aggregate case are present in grouping expressions or aggregate output, rejecting invalid references. - Change the catch-all case in resolveQualifyCondition to throw SparkException.internalError instead of silently returning. Co-authored-by: Claude Code Co-authored-by: Chao Sun <chao@openai.com>
…on and aggregate validation - Add test for correlated subquery in QUALIFY condition (EXISTS). - Add test that non-grouping column references with GROUP BY are rejected. Co-authored-by: Claude Code Co-authored-by: Chao Sun <chao@openai.com>
…r test for QUALIFY clause - Generate qualify.sql.out and analyzer-results/qualify.sql.out via SPARK_GENERATE_GOLDEN_FILES=1. - Fix SparkSqlParserSuite QUALIFY test to assert node types instead of full plan tree comparison. - Fix scalastyle issues: non-ASCII em-dash, import line length, unused import. Co-authored-by: Claude Code Co-authored-by: Chao Sun <chao@openai.com>
…LECT list and post-window-extraction plan shapes - In ResolveQualify.resolveQualifyCondition, recurse through Project children to resolve QUALIFY predicate references against deeper plan outputs, propagating missing attributes through intermediate nodes. This handles the case where ExtractWindowExpressions has already rewritten the child Project and pruned columns only referenced by QUALIFY. - Use originalOutput to trim the final plan, ensuring columns added for QUALIFY predicate evaluation are not leaked into query output. - Handle base plan nodes (SubqueryAlias, View) in resolveQualifyCondition by resolving against their output instead of throwing an error. - Add tests for QUALIFY referencing columns not in the SELECT list and QUALIFY with window functions combined with non-selected column references. Co-authored-by: Claude Code Co-authored-by: Chao Sun <chao@openai.com>
- Move ResolveQualify back to after ResolveAggregateFunctions (after ExtractWindowExpressions) since resolveQualifyCondition now handles post-extraction plan shapes. - Merge duplicate doc comments on ResolveQualify helper methods. - Remove extra blank line in ExtractWindowExpressions.apply(). - Strengthen QUALIFY non-grouping column test assertion to check the specific error condition and parameter. Co-authored-by: Claude Code Co-authored-by: Chao Sun <chao@openai.com>
…ence in documentation Document that input columns take precedence over SELECT aliases in the QUALIFY condition. When an alias has the same name as an input column, the input column is used. Co-authored-by: Claude Code Co-authored-by: Chao Sun <chao@openai.com>
|
Merged to master. Thanks @sunchao @dongjoon-hyun |
What changes were proposed in this pull request?
Add QUALIFY clause to Spark SQL using UnresolvedQualify LogicalPlan node and a self-contained ResolveQualify rule in Analyzer, with structured error conditions.
PR #55019 models QUALIFY as a marker expression (QualifyExpression) wrapped inside a Filter. The design forces QUALIFY handling logic to be scattered across four Analyzer rules. This PR models QUALIFY as a LogicalPlan node (UnresolvedQualify), resolved by a single self-contained ResolveQualify rule. ResolveQualify completes all work in one pass once the child plan is resolved.
Why are the changes needed?
QUALIFY is supported by several popular SQL engines including Snowflake, Databricks SQL etc, and users expect it when porting SQL that filters on window-function results. Without it, equivalent Spark queries need an extra subquery or CTE just to filter on a window alias.
This change closes that gap and makes Spark SQL more compatible with existing SQL workloads while preserving clear analyzer rules around window and aggregate semantics.
Does this PR introduce any user-facing change?
Yes. Spark SQL can now parse and analyze queries that use QUALIFY, for example:
This PR also introduces user-visible analysis errors for invalid QUALIFY usage, such as using aggregate functions directly in the QUALIFY predicate.
How was this patch tested?
Unit tests and e2e tests
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code