Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41985][SQL] Centralize more column resolution rules #39508

Closed
wants to merge 6 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This is a followup of #38888 .

When I search for all the matching of UnresolvedAttribute, I found that there are still a few rules doing column resolution:

  1. ResolveAggAliasInGroupBy
  2. ResolveGroupByAll
  3. ResolveOrderByAll
  4. ResolveDefaultColumns

This PR merges the first 3 into ResolvedReferences. The last one will be done with a separate PR, as it's more complicated.

To avoid making the rule ResolvedReferences bigger and bigger, this PR pulls out the resolution code for Aggregate to a separated virtual rule (only be used by ResolvedReferences). The same to Sort. We can refactor and add more virtual rules later.

Why are the changes needed?

It's problematic to not centralize all the column resolution logic, as the execution order of the rules is not reliable. It actually leads to regression after #38888 : select a from t where exists (select 1 as a group by a). The group by a should be resolved as 1 as a, but now it's resolved as outer reference a. This is because ResolveReferences runs before ResolveAggAliasInGroupBy, and resolves outer references too early.

Does this PR introduce any user-facing change?

Fixes a bug, but the bug is not released yet.

How was this patch tested?

new tests

@github-actions github-actions bot added the SQL label Jan 11, 2023
* or resolved attributes which are missing from child output. This method tries to find the
* missing attributes and add them into the projection.
*/
private def resolveExprsAndAddMissingAttrs(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these functions are pulled out to ColumnResolutionHelper without actual changes.

val finalGroupExprs = resolveGroupByAll(
resolvedAggExprsNoOuter,
resolveGroupByAlias(resolvedAggExprsNoOuter, resolvedGroupingExprs)
).map(resolveOuterRef)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual change. Now we resolve group by alias and group by all before resolving outer references.

@cloud-fan
Copy link
Contributor Author

cc @viirya

@viirya
Copy link
Member

viirya commented Jan 12, 2023

There are some conflicts.

import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

trait ColumnResolutionHelper extends Logging {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No new code, all code are moved from Analyzer.

Copy link
Contributor

@anchovYu anchovYu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented one issue with a failed case. Tricky, we need to note that once an expression is resolved to OuterReference there is no way back.

// Otherwise, we might incorrectly pull an actual aggregate expression over to the grouping
// expression list (because we don't know they would be aggregate expressions until resolved).
if (resolvedAggExprsNoOuter.forall(_.resolved)) {
val finalGroupExprs = resolveGroupByAll(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for this resolution order: 1) resolve group by alias 2) resolve group by all.
E.g. select .. as all .. group by all

if (resolvedAggExprsNoOuter.forall(_.resolved)) {
val finalGroupExprs = resolveGroupByAll(
resolvedAggExprsNoOuter,
resolveGroupByAlias(resolvedAggExprsNoOuter, resolvedGroupingExprs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, select a + 1 as b, b + 1 as c .. group by c will fail as LCA now doesn't handle grouping expr. Need a followup if we want to support above query (which I think we should).

val resolvedAggExprsWithLCA = resolveLateralColumnAlias(resolvedAggExprsNoOuter)
val resolvedAggExprsWithOuter = resolvedAggExprsWithLCA.map(resolveOuterRef)
.map(_.asInstanceOf[NamedExpression])
a.copy(resolvedGroupingExprs.map(resolveOuterRef), resolvedAggExprsWithOuter, a.child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be problematic, try with this query:

      sql(s"select * from values(1) as t(something) " +
        s"where exists (select salary * 1.5 as something from $testTable group by something)")

where testTable has a salary field. (I just wrote in LCA suite.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! It's important to remember that rules will be executed repeatedly and we need to make sure the rule won't conflict with its previous runs.

@dtenedor
Copy link
Contributor

Thanks @cloud-fan for doing this as well! Very useful for the health of the analyzer.

// resolve it after `aggregateExpressions` are all resolved. Note: the basic resolution is
// needed as `aggregateExpressions` may rely on `groupingExpressions` as well, for the session
// window feature. See the rule `SessionWindowing` for more details.
if (resolvedAggExprsWithOuter.forall(_.resolved)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the agg exprs are not changed from here.
Shall we abstract the code as

val resolvedGroupExprs = if (resolvedAggExprsWithOuter.forall(_.resolved)) {
  if (resolvedAggExprsWithOuter.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE))) {
    ...
  } else {
    ...
  }
} else {
...
}
a.copy(...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, even use single-layer if, because all conditions are on one variable resolvedAggExprsWithOuter

* includes metadata columns as well.
* 2. Resolves the column to a literal function which is allowed to be invoked without braces, e.g.
* `SELECT col, current_date FROM t`.
* 3. If `Aggregate.aggregateExpressions` are all resolved, resolve GROUP BY alias and GROUP BY ALL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use "aggregation expressions" instead of Aggregate.aggregateExpressions? The comment is using "grouping expressions" below

* 3.2. If the grouping expressions only have one single unresolved column named 'ALL', expanded it
* to include all non-aggregate columns in the SELECT list. This is to support SQL pattern like
* `SELECT col1, col2, agg_expr(...) FROM t GROUP BY ALL`.
* 4. Resolves the column in `Aggregate.aggregateExpressions` to [[LateralColumnAliasReference]] if
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: column => columns

* `ResolveLateralColumnAliasReference` will further resolve [[LateralColumnAliasReference]] and
* rewrite the plan. This is to support SQL pattern like
* `SELECT col1 + 1 AS x, x + 1 AS y, y + 1 AS z FROM t`.
* 5. Resolves the column to outer references with the outer plan if we are resolving subquery
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for putting the outer ref at the last order. outer ref can only be used in where/having so it makes sense to resolve as the other references first.

Comment on lines 51 to 52
-- GROUP BY alias is not triggered if SELECT list has outer reference.
SELECT * FROM testData WHERE a = 1 AND EXISTS (SELECT a AS k GROUP BY k);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not group by alias but group by outer reference, it works? From ResolveReferencesInAggregate seems so, just want to confirm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually reading the code I think group by alias is triggered, because the whole resolvedAggExprsWithOuter are resolved (to outer reference)..

Comment on lines 54 to 56
-- GROUP BY alias inside subquery expression with conflicting outer reference
SELECT * FROM testData WHERE a = 1 AND EXISTS (SELECT 1 AS a GROUP BY a);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GROUP BY alias takes precedence than outer reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@@ -45,6 +45,15 @@ SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS
SELECT a AS k, COUNT(b) FROM testData GROUP BY k;
SELECT a AS k, COUNT(b) FROM testData GROUP BY k HAVING k > 1;

-- GROUP BY alias is not triggered if SELECT list has lateral column alias.
SELECT 1 AS x, x + 1 AS k FROM testData GROUP BY k;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add test for group by all:

SELECT 1 AS x, x + 1 AS k FROM testData GROUP BY all;

// resolve it after `aggregateExpressions` are all resolved. Note: the basic resolution is
// needed as `aggregateExpressions` may rely on `groupingExpressions` as well, for the session
// window feature. See the rule `SessionWindowing` for more details.
if (resolvedAggExprsWithOuter.forall(_.resolved)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, even use single-layer if, because all conditions are on one variable resolvedAggExprsWithOuter

if (resolvedAggExprsWithOuter.forall(_.resolved)) {
// TODO: currently we don't support LCA in `groupingExpressions` yet.
if (resolvedAggExprsWithOuter.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE))) {
a.copy(resolvedGroupExprsNoOuter.map(resolveOuterRef), resolvedAggExprsWithOuter, a.child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add extra handling to report user-friendly error message, showing that queries of group by all and group by alias are not supported when there is LCA in the select clause? For example, check if the grouping expression is all or alias, throw exception immediately.

Currently I don't have a clear solution how to support the above case with the LCA design. So this unsupported situation may last for some time, and we better provide clean error messages.

Comment on lines 51 to 52
-- GROUP BY alias is not triggered if SELECT list has outer reference.
SELECT * FROM testData WHERE a = 1 AND EXISTS (SELECT a AS k GROUP BY k);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually reading the code I think group by alias is triggered, because the whole resolvedAggExprsWithOuter are resolved (to outer reference)..

val (missingAttrResolved, newChild) = resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
val orderByAllResolved = resolveOrderByAll(
s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder]))
val finalOrdering = orderByAllResolved.map(e => resolveOuterRef(e).asInstanceOf[SortOrder])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, do we require the child of Sort to be resolved? Or is it implied somewhere? Any risk of not doing so (the outer reference thing)? Though i can't think of any negative case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do, ResolveRefererences only invokes these sub-rules if all children are resolved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha I forgot ..

val resolvedNoOuter = s.order.map(resolveExpressionByPlanOutput(_, s.child))
val resolvedWithAgg = resolvedNoOuter.map(resolveColWithAgg(_, s.child))
val (missingAttrResolved, newChild) = resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
val orderByAllResolved = resolveOrderByAll(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any tests covering this order: resolveColWithAgg > resolveOrderByAll ?

@github-actions github-actions bot added the CORE label Jan 29, 2023
@@ -0,0 +1,30 @@
-- Tests covering column resolution priority in Aggregate.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anchovYu can you help review the new tests? I think I've covered all the cases.

@@ -0,0 +1,20 @@
--SET spark.sql.leafNodeDefaultParallelism=1
-- Tests covering column resolution priority in Sort.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

CREATE TEMPORARY VIEW v2 AS VALUES (1, 2, 2), (2, 1, 1) AS t(a, b, all);

-- Relation output columns have higher priority than missing reference.
-- Results will be [2, 1] if we order by the column `v1.b`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing reference won't be able to cover this case, it should only add grouping expressions to the Aggregate. v1.b is not a grouping expression, so if order by v1.b the query should throw exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is good. just comment needs correction.

val (missingAttrResolved, newChild) = resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
val orderByAllResolved = resolveOrderByAll(
s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder]))
val finalOrdering = orderByAllResolved.map(e => resolveOuterRef(e).asInstanceOf[SortOrder])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha I forgot ..

// can't find the grouping expressions via `semanticEquals` and the analysis will fail.
// Example rules: ResolveGroupingAnalytics (See SPARK-31670 for more details) and
// ResolveLateralColumnAliasReference.
groupingExpressions = resolvedGroupExprs.map(trimAliases),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this extends the fix of https://issues.apache.org/jira/browse/SPARK-31670.

Alias in grouping expressions is harmful. Semantically, alias in grouping expressions should be ignored. We trim aliases in grouping expressions at the end of analysis by the rule CleanupAliases, so that CheckAnalysis can correctly match grouping expressions with aggregate expressions. However, there are rules doing the same matching during analysis, and CleanupAliases is too late for them.

Copy link
Contributor

@anchovYu anchovYu Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For details in how it affects LCA:
Without the trim,

=== Applying Rule org.apache.spark.sql.catalyst.analysis.ResolveLateralColumnAliasReference ===
!Aggregate [salary#19 AS lca#15], [salary#19 AS lca#15, (lateralAliasReference(lca#15, lca, lca#15) + 1) AS col#16]   Project [lca#15, (lateralAliasReference(lca#15, lca, lca#15) + 1) AS col#16]
!+- SubqueryAlias spark_catalog.default.employee                                                                      +- Aggregate [salary#19 AS lca#15], [salary#19 AS lca#15]
!   +- Relation spark_catalog.default.employee[dept#17,name#18,salary#19,bonus#20,properties#21] orc                     +- SubqueryAlias spark_catalog.default.employee
!                                                                                                                           +- Relation spark_catalog.default.employee[dept#17,name#18,salary#19,bonus#20,properties#21] orc

ResolveLateralColumnAliasReference matches the whole salary#19 AS lca#15 same as the grouping expression and pushes it down to the Aggregate (this rule finds matches in grouping expression and aggregate functions to push down). It causes problem for later application of this rule on Project: there is no alias in the Project any more so the lateralAliasReference can't find an alias to match and be removed.

@cloud-fan
Copy link
Contributor Author

The pyspark failure is unrelated, I'm merging it to master/3.4 (as it fixes a regression), thanks for the review!

@cloud-fan cloud-fan closed this in 40ca27c Feb 1, 2023
cloud-fan added a commit that referenced this pull request Feb 1, 2023
### What changes were proposed in this pull request?

This is a followup of #38888 .

When I search for all the matching of `UnresolvedAttribute`, I found that there are still a few rules doing column resolution:
1. ResolveAggAliasInGroupBy
2. ResolveGroupByAll
3. ResolveOrderByAll
4. ResolveDefaultColumns

This PR merges the first 3 into `ResolvedReferences`. The last one will be done with a separate PR, as it's more complicated.

To avoid making the rule `ResolvedReferences` bigger and bigger, this PR pulls out the resolution code for `Aggregate` to a separated virtual rule (only be used by `ResolvedReferences`). The same to `Sort`. We can refactor and add more virtual rules later.

### Why are the changes needed?

It's problematic to not centralize all the column resolution logic, as the execution order of the rules is not reliable. It actually leads to regression after #38888  : `select a from t where exists (select 1 as a group by a)`. The `group by a` should be resolved as `1 as a`, but now it's resolved as outer reference `a`. This is because `ResolveReferences` runs before `ResolveAggAliasInGroupBy`, and resolves outer references too early.

### Does this PR introduce _any_ user-facing change?

Fixes a bug, but the bug is not released yet.

### How was this patch tested?

new tests

Closes #39508 from cloud-fan/column.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 40ca27c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
MaxGekk pushed a commit that referenced this pull request Feb 3, 2023
…xpr is resolved

### What changes were proposed in this pull request?

This is a followup of #39508 to fix a regression. We should not remove aliases from grouping expressions if they are not resolved, as the alias may be necessary for resolution, such as `CreateNamedStruct`.

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #39867 from cloud-fan/column.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
MaxGekk pushed a commit that referenced this pull request Feb 3, 2023
…xpr is resolved

### What changes were proposed in this pull request?

This is a followup of #39508 to fix a regression. We should not remove aliases from grouping expressions if they are not resolved, as the alias may be necessary for resolution, such as `CreateNamedStruct`.

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #39867 from cloud-fan/column.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 02b39f0)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?

This is a followup of apache#38888 .

When I search for all the matching of `UnresolvedAttribute`, I found that there are still a few rules doing column resolution:
1. ResolveAggAliasInGroupBy
2. ResolveGroupByAll
3. ResolveOrderByAll
4. ResolveDefaultColumns

This PR merges the first 3 into `ResolvedReferences`. The last one will be done with a separate PR, as it's more complicated.

To avoid making the rule `ResolvedReferences` bigger and bigger, this PR pulls out the resolution code for `Aggregate` to a separated virtual rule (only be used by `ResolvedReferences`). The same to `Sort`. We can refactor and add more virtual rules later.

### Why are the changes needed?

It's problematic to not centralize all the column resolution logic, as the execution order of the rules is not reliable. It actually leads to regression after apache#38888  : `select a from t where exists (select 1 as a group by a)`. The `group by a` should be resolved as `1 as a`, but now it's resolved as outer reference `a`. This is because `ResolveReferences` runs before `ResolveAggAliasInGroupBy`, and resolves outer references too early.

### Does this PR introduce _any_ user-facing change?

Fixes a bug, but the bug is not released yet.

### How was this patch tested?

new tests

Closes apache#39508 from cloud-fan/column.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 40ca27c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…xpr is resolved

### What changes were proposed in this pull request?

This is a followup of apache#39508 to fix a regression. We should not remove aliases from grouping expressions if they are not resolved, as the alias may be necessary for resolution, such as `CreateNamedStruct`.

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes apache#39867 from cloud-fan/column.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 02b39f0)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants