New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28386][SQL] Cannot resolve ORDER BY columns with GROUP BY and HAVING #44352
Conversation
// a table `t` has columns `c1` and `c2`, for query `SELECT ... FROM t GROUP BY c1 HAVING c2 = 0`, | ||
// even though we can resolve column `c2` here, we should undo it and fail with | ||
// "Column c2 not found". | ||
protected def resolveColWithAgg(e: Expression, plan: LogicalPlan): Expression = plan match { | ||
case Filter(_, agg: Aggregate) => resolveColWithAgg(e, agg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cann we move this into ResolveReferencesInSort
. I think it is more clearer to make the call side resolve aggregate through filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, changed as suggested
@@ -102,12 +102,11 @@ Project [udf(b)#x, udf(c)#x] | |||
SELECT udf(b), udf(c) FROM test_having | |||
GROUP BY b, c HAVING udf(b) = 3 ORDER BY udf(b), udf(c) | |||
-- !query analysis | |||
Project [udf(b)#x, udf(c)#x] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know why the plan is changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the previous resolution matches item 4 of ResolveReferencesInSort
comments.
- Resolves the column to [[AttributeReference]] with the output of a descendant plan node.
Spark will propagate the missing attributes from the descendant plan node to the Sort node.
This is to allow users to ORDER BY columns that are not in the SELECT clause, which is
widely supported in other SQL dialects. For example,SELECT a FROM t ORDER BY b
.
With this patch, it should match item 3
- If the child plan is Aggregate or Filter(_, Aggregate), resolves the column to
[[TempResolvedColumn]] with the output of Aggregate's child plan.
This is to allow Sort to host grouping expressions and aggregate functions, which can
be pushed down to the Aggregate later. For example,
SELECT max(a) FROM t GROUP BY b HAVING max(a) > 1 ORDER BY min(a)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh so the plan is actually more efficient now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think so, the plan shows it eliminates some unnecessary column propagation across operators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is an analyzed plan, the optimized plan should be same with pr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both Analyzed plan and Optimized plan are changed :)
before:
== Analyzed Logical Plan ==
udf(b): int, udf(c): double
Project [udf(b)#24, udf(c)#25]
+- Sort [udf(b#21) ASC NULLS FIRST, udf(cast(c#22 as double)) ASC NULLS FIRST], true
+- Filter (udf(b)#24 = 3)
+- Aggregate [b#21, c#22], [udf(b#21) AS udf(b)#24, udf(cast(c#22 as double)) AS udf(c)#25, b#21, c#22]
+- SubqueryAlias spark_catalog.default.test_having
+- Relation spark_catalog.default.test_having[a#20,b#21,c#22,d#23] parquet
== Optimized Logical Plan ==
Project [udf(b)#24, udf(c)#25]
+- Sort [udf(b#21) ASC NULLS FIRST, udf(cast(c#22 as double)) ASC NULLS FIRST], true
+- Aggregate [b#21, c#22], [udf(b#21) AS udf(b)#24, udf(cast(c#22 as double)) AS udf(c)#25, b#21, c#22]
+- Project [b#21, c#22]
+- Filter (isnotnull(b#21) AND (udf(b#21) = 3))
+- Relation spark_catalog.default.test_having[a#20,b#21,c#22,d#23] parquet
after:
== Analyzed Logical Plan ==
udf(b): int, udf(c): double
Sort [udf(b)#9 ASC NULLS FIRST, udf(c)#10 ASC NULLS FIRST], true
+- Filter (udf(b)#9 = 3)
+- Aggregate [b#6, c#7], [udf(b#6) AS udf(b)#9, udf(cast(c#7 as double)) AS udf(c)#10]
+- SubqueryAlias spark_catalog.default.test_having
+- Relation spark_catalog.default.test_having[a#5,b#6,c#7,d#8] parquet
== Optimized Logical Plan ==
Sort [udf(b)#9 ASC NULLS FIRST, udf(c)#10 ASC NULLS FIRST], true
+- Aggregate [b#6, c#7], [udf(b#6) AS udf(b)#9, udf(cast(c#7 as double)) AS udf(c)#10]
+- Project [b#6, c#7]
+- Filter (isnotnull(b#6) AND (udf(b#6) = 3))
+- Relation spark_catalog.default.test_having[a#5,b#6,c#7,d#8] parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
does this query work in other databases? |
assertAnalysisSuccess( | ||
parsePlan( | ||
""" | ||
|WITH t1 as (SELECT 1 id, 'one' name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can put this in golden file test as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this just checks analysis success, we can remove it if the test is already in golden file.
@@ -33,3 +33,6 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY GROUPING SETS(t.c1) HAVING t. | |||
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY CUBE(t.c1) HAVING t.c1 = 1; | |||
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY ROLLUP(t.c1) HAVING t.c1 = 1; | |||
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1; | |||
|
|||
-- SPARK-28386: Cannot resolve ORDER BY columns with GROUP BY and HAVING | |||
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it work if we order by a different aggregate function (different from HAVING)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question!
* 3. If the child plan is Aggregate, resolves the column to [[TempResolvedColumn]] with the output | ||
* of Aggregate's child plan. This is to allow Sort to host grouping expressions and aggregate | ||
* functions, which can be pushed down to the Aggregate later. For example, | ||
* `SELECT max(a) FROM t GROUP BY b ORDER BY min(a)`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we obtain the example?
@@ -33,3 +33,6 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY GROUPING SETS(t.c1) HAVING t. | |||
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY CUBE(t.c1) HAVING t.c1 = 1; | |||
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY ROLLUP(t.c1) HAVING t.c1 = 1; | |||
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1; | |||
|
|||
-- SPARK-28386: Cannot resolve ORDER BY columns with GROUP BY and HAVING | |||
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question!
Actually, it was reported by my colleague that the same SQL works on Impala but not Spark. I will investigate other popular RDBMS. |
In the SQL standard, ORDER BY can only reference columns in the SELECT list, but many databases extend it to support other cases. I think in Spark the extension is we can push down grouping expressions and aggregate functions from ORDER BY to SELECT. |
I think we should dig into #44352 (comment) more. It seems we have an optimization that if the ORDER BY expression directly matches something from the SELECT list, we replace it with |
@cloud-fan I believe Spark already supports it when HVAING is absent, but does not work if HAVING is present, that's why I say "maybe we can call it a bugfix" SELECT xxx
FROM xxx
GROUP BY xxx
+ HAVING xxx
ORDER BY xxx
Let me try. |
The plan change log shows it indeed caused by master plan change log
this patch plan change log
BTW, I think this may be out scope of this PR. The key point of this PR is, SELECT xxx
FROM xxx
GROUP BY xxx
+ HAVING xxx
ORDER BY xxx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, please address the review comments regarding tests
@cloud-fan @beliefer I have updated the test cases as requested, please take another look when you have time |
CI failure seems irrelevant
|
|
||
-- SPARK-28386: Resolve ORDER BY column with/without HAVING clause, while the column presents on SELECT list | ||
SELECT k FROM hav GROUP BY k ORDER BY k; | ||
SELECT k FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY k; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these two tests can pass without this PR? This is the basic functionality of ORDER BY: referencing columns in the SELECT list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, should I only retain the fixed SQL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated, only retain the fixed SQL now
|
||
-- SPARK-28386: Resolve ORDER BY scalar function with/without HAVING clause, while the scalar function does not present on SELECT list | ||
SELECT k FROM hav GROUP BY k ORDER BY length(k); | ||
SELECT k FROM hav GROUP BY k HAVING max(v) > 2 ORDER BY length(k); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for these two tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Thanks, merged to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late LGTM.
What changes were proposed in this pull request?
This PR enhanced the analyzer to handle the following pattern properly.
Why are the changes needed?
The above code demonstrates the failure case, the query failed during the analysis phase when both
HAVING
andORDER BY
clauses are present, but successful if only one is present.Does this PR introduce any user-facing change?
Yes, maybe we can call it a bugfix.
How was this patch tested?
New UTs are added
Was this patch authored or co-authored using generative AI tooling?
No.