New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31561][SQL] Add QUALIFY clause #39691
Conversation
In fact databricks also supports this clause. |
CALCITE added support for it too: https://issues.apache.org/jira/browse/CALCITE-5268. |
@@ -1729,6 +1729,23 @@ class Analyzer(override val catalogManager: CatalogManager) | |||
resolveExpressionByPlanChildren(resolvedWithAgg, u, allowOuter = true) | |||
} | |||
|
|||
case u @ UnresolvedQualify(cond, child) if !u.resolved && child.resolved => | |||
if (!u.containsPattern(WINDOW_EXPRESSION)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we have a subquery that contains a window function but not the main query:
SELECT (SELECT window_func w FROM t QUALIFY t.c1 > 0) FROM t QUALIFY t.c1 > 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CREATE TABLE dealer (id INT, city STRING, car_model STRING, quantity INT) using parquet;
SELECT * from (SELECT *, ROW_NUMBER() OVER(PARTITION BY city ORDER BY id) AS rn FROM dealer QUALIFY id > 0) t QUALIFY t.id > 0;
The analyzed logical plan:
== Analyzed Logical Plan ==
id: int, city: string, car_model: string, quantity: int, rn: int
Filter (id#2 > 0)
+- Project [id#2, city#3, car_model#4, quantity#5, rn#0]
+- SubqueryAlias t
+- Filter (id#2 > 0)
+- Project [id#2, city#3, car_model#4, quantity#5, rn#0]
+- Project [id#2, city#3, car_model#4, quantity#5, rn#0, rn#0]
+- Window [row_number() windowspecdefinition(city#3, id#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#0], [city#3], [id#2 ASC NULLS FIRST]
+- Project [id#2, city#3, car_model#4, quantity#5]
+- SubqueryAlias spark_catalog.default.dealer
+- Relation spark_catalog.default.dealer[id#2,city#3,car_model#4,quantity#5] parquet
throw QueryCompilationErrors.expressionWithoutWindowExpressionError(cond) | ||
} else { | ||
if (u.missingInput.nonEmpty) { | ||
val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we treat the resolution order for QUALIFY the same as GROUP BY, then we need to resolve the attributes in the QUALIFY clause first using the columns in the FROM clause and then using columns and aliases in the SELECT clause.
For example: SELECT c1, window_func AS c2 FROM t QUALIFY c2 > 0;
here c2
should be resolved as t.c2
instead of the window_func as c2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Behavior looks consistent with aggregate:
CREATE TABLE dealer (id INT, city STRING, car_model STRING, quantity INT) using parquet;
SELECT ROW_NUMBER() OVER(PARTITION BY city ORDER BY id) AS id FROM dealer QUALIFY id > 0;
SELECT city, max(id) AS id FROM dealer GROUP BY city HAVING id > 0;
== Analyzed Logical Plan ==
id: int
Filter (id#0 > 0)
+- Project [id#0]
+- Project [city#3, id#2, id#0, id#0]
+- Window [row_number() windowspecdefinition(city#3, id#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id#0], [city#3], [id#2 ASC NULLS FIRST]
+- Project [city#3, id#2]
+- SubqueryAlias spark_catalog.default.dealer
+- Relation spark_catalog.default.dealer[id#2,city#3,car_model#4,quantity#5] parquet
== Analyzed Logical Plan ==
city: string, id: int
Filter (id#0 > 0)
+- Aggregate [city#2], [city#2, max(id#1) AS id#0]
+- SubqueryAlias spark_catalog.default.dealer
+- Relation spark_catalog.default.dealer[id#1,city#2,car_model#3,quantity#4] parquet
@@ -571,7 +571,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { | |||
messageParameters = Map.empty) | |||
} | |||
|
|||
def expressionWithoutWindowExpressionError(expr: NamedExpression): Throwable = { | |||
def expressionWithoutWindowExpressionError(expr: Expression): Throwable = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a new error for this? Currnetly the error message is not very user-friendly. E.g.:
SELECT * FROM t QUALIFY c1 > 0
('c1 > 0) does not have any WindowExpression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new message:
[QUALIFY_EXPRESSION_MUST_CONTAIN_WINDOW_FUNCTION] QUALIFY expression '(id > 1)' must contain a window function.; line 1 pos 21
@@ -465,3 +465,60 @@ SELECT | |||
SUM(salary) OVER w sum_salary | |||
FROM | |||
basic_pays; | |||
|
|||
-- Test QUALIFY clause |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can split these tests into a new file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@cloud-fan @allisonwang-db take another look? |
@@ -1303,6 +1303,11 @@ | |||
"Protobuf type not yet supported: <protobufType>." | |||
] | |||
}, | |||
"QUALIFY_EXPRESSION_MUST_CONTAIN_WINDOW_FUNCTION" : { | |||
"message" : [ | |||
"QUALIFY expression '<sqlExpr>' must contain a window function." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: QUALIFY
is a clause so what this expression
refers to?
@@ -693,6 +693,19 @@ case class UnresolvedHaving( | |||
copy(child = newChild) | |||
} | |||
|
|||
/** | |||
* Represents unresolved qualify clause, It is turned by the analyzer into a Filter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: QUALIFY (all uppercase) + "that becomes Filter after query analysis"
qualifyCondition: Expression, | ||
child: LogicalPlan) | ||
extends UnaryNode { | ||
override lazy val resolved: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lazy is not needed here, is it?
Is this feature still on the planning? |
^ Same question -- Any plan to release this feature? |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
The
QUALIFY
clause is used to filter the results of window functions. For example:Why are the changes needed?
To easily migrate SQL queries to Spark SQL.
The following databases support this clause:
https://docs.databricks.com/sql/language-manual/sql-ref-syntax-qry-select-qualify.html
https://docs.snowflake.com/en/sql-reference/constructs/qualify.html
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#qualify_clause
https://duckdb.org/docs/sql/query_syntax/qualify.html
https://docs.teradata.com/reader/2_MC9vCtAJRlKle2Rpb0mA/19NnI91neorAi7LX6SJXBw
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.