Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41405][SQL] Centralize the column resolution logic #38888

Closed
wants to merge 3 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Dec 2, 2022

What changes were proposed in this pull request?

This PR is a major refactor of how Spark resolves columns. Today, the column resolution logic is placed in several rules, which makes it hard to understand. It's also very fragile to maintain the resolution precedence, as you have to carefully deal with the interactions between these rules.

This PR centralizes the column resolution logic into a single rule: the existing ResolveReferences rule, so that we no longer need to worry about the interactions between multiple rules. The detailed resolution precedence is also documented.

Why are the changes needed?

code cleanup

Does this PR introduce any user-facing change?

no

How was this patch tested?

existing tests

@github-actions github-actions bot added the SQL label Dec 2, 2022
@cloud-fan cloud-fan changed the title [WIP] centralize the column resolution logic [SPARK-41405][SQL] Centralize the column resolution logic Dec 6, 2022
// is fully resolved, similar to the rule `ResolveAggregateFunctions`. However, Aggregate
// with GROUPING SETS is marked as unresolved and many analyzer rules can't apply to
// UnresolvedHaving because its child is not resolved. Here we explicitly resolve columns
// and subqueries of UnresolvedHaving so that the rewrite works in most cases.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This follows the previous code and has the same issues as before. For example:

create temp view t as select 1 a, 2 b, 3d c;
select max(a) from t group by grouping sets ((b, c), (b + c)) having b + c > 0;
org.apache.spark.sql.AnalysisException: Column 'b' does not exist. Did you mean one of the following? [max(a)]

This fails because b + c needs type coercion to be resolved, which will never happen as Aggregate with GROUPING SETS is marked as unresolved. Then Spark never knows that b + c is actually the grouping expression and can't rewrite HAVING.

// the missing attributes from the descendant node to the current node, and project them way
// at the end via an extra Project.
case s @ Sort(order, _, child) if !s.resolved || s.missingInput.nonEmpty =>
val resolvedNoOuter = order.map(resolveExpressionByPlanOutput(_, child))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't use resolveExpressionByPlanChildren to follow the previous code: https://github.com/apache/spark/pull/38888/files#diff-ed19f376a63eba52eea59ca71f3355d4495fad4fad4db9a3324aade0d4986a47L1469 , I'm not sure if it will make a difference but just want to be safe.

* or resolved attributes which are missing from child output. This method tries to find the
* missing attributes and add them into the projection.
*/
private def resolveExprsAndAddMissingAttrs(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Copy link
Contributor Author

cc @viirya @allisonwang-db @MaxGekk

Comment on lines +658 to +674
// If it has been tried to be resolved but failed, mark it as unresolved so that other rules can
// try to resolve it again.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This expression for its purpose is to hold original column name for a resolved column. So the column resolution can be undo. With this new hasTried, it becomes something that is resolved but also failed to resolve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll update the classdoc later. Now this expression can be used to undo column resolution, or redo it with a different priority.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR centralizes the column resolution logic into a single rule

I don't see new rule is added by this, but two rules were removed. Which single rule you referred? ResolveReferences?

@cloud-fan
Copy link
Contributor Author

Yes, I've updated the PR description to make it clear.

@@ -149,9 +149,9 @@ org.apache.spark.sql.AnalysisException
"queryContext" : [ {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error class

  "_LEGACY_ERROR_TEMP_2422" : {
    "message" : [
      "grouping expressions sequence is empty, and '<sqlExpr>' is not an aggregate function. Wrap '<aggExprs>' in windowing function(s) or wrap '<sqlExpr>' in first() (or first_value) if you don't care which value you get."
    ]
  },

The query context becomes more accurate actually.

Comment on lines 652 to 653
* resolution with a different priority if the analyzer has tried to resolve it with the default
* priority before but failed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* resolution with a different priority if the analyzer has tried to resolve it with the default
* priority before but failed.
* resolution with a different priority if the analyzer has tried to resolve it with the default
* priority before but failed (i.e. `hasTried` is true).

Comment on lines 4237 to 4238
* `ResolveAggregationFunctions` will replace [[TempResolvedColumn]] with [[AttributeReference]] if
* it's inside aggregate functions or group expressions, or mark it as `hasTried` otherwise, hoping
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I read this correctly. hasTried will be set to true, if the expression hosting TempResolvedColumn cannot be resolved, OR if it is not inside aggregate functions or group expressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is not inside aggregate functions or group expressions. Let me rephase the doc a bit more.

.map(_.asInstanceOf[NamedExpression])
a.copy(resolvedGroupingExprs, resolvedAggExprsWithOuter, a.child)

// Special case for Project as it supports literal column alias.
Copy link
Member

@viirya viirya Dec 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Special case for Project as it supports literal column alias.
// Special case for Project as it supports Lateral column alias.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far looks good to me. Although, there are some changes I've not read through yet.

@@ -547,8 +547,7 @@ class LateralColumnAliasSuite extends LateralColumnAliasSuiteBase {

test("Lateral alias of a complex type") {
// test both Project and Aggregate
// TODO(anchovyu): re-enable aggregate tests when fixed the having issue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug is fixed with this refactor

@cloud-fan
Copy link
Contributor Author

@viirya can you take another look when you have time? thanks!

@viirya
Copy link
Member

viirya commented Dec 29, 2022

I will take another look today.

// We should resolve the references normally based on child (agg.output) first.
val maybeResolved = resolveExpressionByPlanOutput(cond, agg)
resolveOperatorWithAggregate(Seq(maybeResolved), agg, (newExprs, newChild) => {
case Filter(cond, agg: Aggregate) if agg.resolved && cond.resolved =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case Filter(cond, agg: Aggregate) if agg.resolved && cond.resolved =>
case Filter(cond, agg: Aggregate) if agg.resolved && !cond.resolved =>

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nvm, I got it after reading existing code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. I think this looks much more clear than before.

@cloud-fan
Copy link
Contributor Author

The last commit is just a minor code simplification.

@cloud-fan
Copy link
Contributor Author

thanks for review, merging to master!

@dtenedor
Copy link
Contributor

Sorry for missing this earlier, late LGTM. Changes like this are moving in a good direction to move analysis logic closer to one pass. Ideally we could e.g. start making ResolveReferences immediately return an error if any unresolved reference could not be resolved, rather than waiting later until CheckAnalysis. We can make such improvements iteratively if we wish.

cloud-fan added a commit that referenced this pull request Feb 1, 2023
### What changes were proposed in this pull request?

This is a followup of #38888 .

When I search for all the matching of `UnresolvedAttribute`, I found that there are still a few rules doing column resolution:
1. ResolveAggAliasInGroupBy
2. ResolveGroupByAll
3. ResolveOrderByAll
4. ResolveDefaultColumns

This PR merges the first 3 into `ResolvedReferences`. The last one will be done with a separate PR, as it's more complicated.

To avoid making the rule `ResolvedReferences` bigger and bigger, this PR pulls out the resolution code for `Aggregate` to a separated virtual rule (only be used by `ResolvedReferences`). The same to `Sort`. We can refactor and add more virtual rules later.

### Why are the changes needed?

It's problematic to not centralize all the column resolution logic, as the execution order of the rules is not reliable. It actually leads to regression after #38888  : `select a from t where exists (select 1 as a group by a)`. The `group by a` should be resolved as `1 as a`, but now it's resolved as outer reference `a`. This is because `ResolveReferences` runs before `ResolveAggAliasInGroupBy`, and resolves outer references too early.

### Does this PR introduce _any_ user-facing change?

Fixes a bug, but the bug is not released yet.

### How was this patch tested?

new tests

Closes #39508 from cloud-fan/column.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Feb 1, 2023
### What changes were proposed in this pull request?

This is a followup of #38888 .

When I search for all the matching of `UnresolvedAttribute`, I found that there are still a few rules doing column resolution:
1. ResolveAggAliasInGroupBy
2. ResolveGroupByAll
3. ResolveOrderByAll
4. ResolveDefaultColumns

This PR merges the first 3 into `ResolvedReferences`. The last one will be done with a separate PR, as it's more complicated.

To avoid making the rule `ResolvedReferences` bigger and bigger, this PR pulls out the resolution code for `Aggregate` to a separated virtual rule (only be used by `ResolvedReferences`). The same to `Sort`. We can refactor and add more virtual rules later.

### Why are the changes needed?

It's problematic to not centralize all the column resolution logic, as the execution order of the rules is not reliable. It actually leads to regression after #38888  : `select a from t where exists (select 1 as a group by a)`. The `group by a` should be resolved as `1 as a`, but now it's resolved as outer reference `a`. This is because `ResolveReferences` runs before `ResolveAggAliasInGroupBy`, and resolves outer references too early.

### Does this PR introduce _any_ user-facing change?

Fixes a bug, but the bug is not released yet.

### How was this patch tested?

new tests

Closes #39508 from cloud-fan/column.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 40ca27c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request May 28, 2023
### What changes were proposed in this pull request?

This PR refactors the default column value resolution so that we don't need an extra DS v2 API for external v2 sources. The general idea is to split the default column value resolution into two parts:
1. resolve the column "DEFAULT" to the column default expression. This applies to `Project`/`UnresolvedInlineTable` under `InsertIntoStatement`, and assignment expressions in `UpdateTable`/`MergeIntoTable`.
2. fill missing columns with column default values for the input query. This does not apply to UPDATE and non-INSERT action of MERGE as they use the column from the target table as the default value.

The first part should be done for all the data sources, as it's part of column resolution. The second part should not be applied to v2 data sources with `ACCEPT_ANY_SCHEMA`, as they are free to define how to handle missing columns.

More concretely, this PR:
1. put the column "DEFAULT" resolution logic in the rule `ResolveReferences`, with two new virtual rules. This is to follow #38888
2. put the missing column handling in `TableOutputResolver`, which is shared by both the v1 and v2 insertion resolution rule. External v2 data sources can add custom catalyst rules to deal with missing columns for themselves.
3. Remove the old rule `ResolveDefaultColumns`. Note that, with the refactor, we no long need to manually look up the table. We will deal with column default values after the target table of INSERT/UPDATE/MERGE is resolved.
4. Remove the rule `ResolveUserSpecifiedColumns` and merge it to `PreprocessTableInsertion`. These two rules are both to resolve v1 insertion, and it's tricky to reason about their interactions. It's clearer to resolve the insertion with one pass.
### Why are the changes needed?

code cleanup and remove unneeded DS v2 API.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

updated tests

Closes #41262 from cloud-fan/def-val.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
### What changes were proposed in this pull request?

This PR refactors the default column value resolution so that we don't need an extra DS v2 API for external v2 sources. The general idea is to split the default column value resolution into two parts:
1. resolve the column "DEFAULT" to the column default expression. This applies to `Project`/`UnresolvedInlineTable` under `InsertIntoStatement`, and assignment expressions in `UpdateTable`/`MergeIntoTable`.
2. fill missing columns with column default values for the input query. This does not apply to UPDATE and non-INSERT action of MERGE as they use the column from the target table as the default value.

The first part should be done for all the data sources, as it's part of column resolution. The second part should not be applied to v2 data sources with `ACCEPT_ANY_SCHEMA`, as they are free to define how to handle missing columns.

More concretely, this PR:
1. put the column "DEFAULT" resolution logic in the rule `ResolveReferences`, with two new virtual rules. This is to follow apache#38888
2. put the missing column handling in `TableOutputResolver`, which is shared by both the v1 and v2 insertion resolution rule. External v2 data sources can add custom catalyst rules to deal with missing columns for themselves.
3. Remove the old rule `ResolveDefaultColumns`. Note that, with the refactor, we no long need to manually look up the table. We will deal with column default values after the target table of INSERT/UPDATE/MERGE is resolved.
4. Remove the rule `ResolveUserSpecifiedColumns` and merge it to `PreprocessTableInsertion`. These two rules are both to resolve v1 insertion, and it's tricky to reason about their interactions. It's clearer to resolve the insertion with one pass.
### Why are the changes needed?

code cleanup and remove unneeded DS v2 API.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

updated tests

Closes apache#41262 from cloud-fan/def-val.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?

This is a followup of apache#38888 .

When I search for all the matching of `UnresolvedAttribute`, I found that there are still a few rules doing column resolution:
1. ResolveAggAliasInGroupBy
2. ResolveGroupByAll
3. ResolveOrderByAll
4. ResolveDefaultColumns

This PR merges the first 3 into `ResolvedReferences`. The last one will be done with a separate PR, as it's more complicated.

To avoid making the rule `ResolvedReferences` bigger and bigger, this PR pulls out the resolution code for `Aggregate` to a separated virtual rule (only be used by `ResolvedReferences`). The same to `Sort`. We can refactor and add more virtual rules later.

### Why are the changes needed?

It's problematic to not centralize all the column resolution logic, as the execution order of the rules is not reliable. It actually leads to regression after apache#38888  : `select a from t where exists (select 1 as a group by a)`. The `group by a` should be resolved as `1 as a`, but now it's resolved as outer reference `a`. This is because `ResolveReferences` runs before `ResolveAggAliasInGroupBy`, and resolves outer references too early.

### Does this PR introduce _any_ user-facing change?

Fixes a bug, but the bug is not released yet.

### How was this patch tested?

new tests

Closes apache#39508 from cloud-fan/column.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 40ca27c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants