Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32638][SQL] Corrects references when adding aliases in WidenSetOperationTypes #29485

Closed
wants to merge 4 commits into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Aug 20, 2020

What changes were proposed in this pull request?

This PR intends to fix a bug where references can be missing when adding aliases to widen data types in WidenSetOperationTypes. For example,

CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
SELECT t.v FROM (
  SELECT v FROM t3
  UNION ALL
  SELECT v + v AS v FROM t3
) t;

org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1]  <------ the reference got missing
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#3]
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        +- SubqueryAlias tbl
      :           +- LocalRelation [v#1]
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            +- SubqueryAlias t3
               +- SubqueryAlias tbl
                  +- LocalRelation [v#1]

In the case, WidenSetOperationTypes added the alias cast(v#1 as decimal(11,0)) AS v#3, then the reference in the top Project got missing. This PR correct the reference (exprId and widen dataType) after adding aliases in the rule.

Why are the changes needed?

bugfixes

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests

@SparkQA
Copy link

SparkQA commented Aug 20, 2020

Test build #127678 has finished for PR 29485 at commit 02557be.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Aug 20, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Aug 20, 2020

Test build #127679 has finished for PR 29485 at commit 02557be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val (casted, newExprIds) = plan.output.zip(targetTypes).map {
case (e, dt) if e.dataType != dt =>
val alias = Alias(Cast(e, dt), e.name)()
(alias, Some(e.exprId -> (alias.exprId, dt)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just store the attribute of the Alias, i.e., alias.toAttribute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I'll update.

@SparkQA
Copy link

SparkQA commented Aug 21, 2020

Test build #127710 has finished for PR 29485 at commit 028140f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Looks right to me too

@maropu
Copy link
Member Author

maropu commented Aug 21, 2020

Thanks for the reviews, @viirya and @HyukjinKwon ! also cc: @cloud-fan

}

// Re-maps existing references to the new ones (exprId and dataType)
// for aliases added when widening columns' data types.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another common way to solve this issue is to create an Alias with the existing exprId, so that we don't need to rewrite the parent nodes.

I think it's safer than rewriting the parent nodes. We rewrite parent nodes in ResolveReferences.dedupRight, we hit bugs and end up with a complicated solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it too. But I'm not sure if duplicate exprId is okay. If this is common way, it sounds simple and safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant re-alias with exprId=1 in the example above like this?

org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1]  <------ the reference got missing
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#3] <----- !!!! re-alias with exprId=#1 ?!!!!!
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        +- SubqueryAlias tbl
      :           +- LocalRelation [v#1]
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            +- SubqueryAlias t3
               +- SubqueryAlias tbl
                  +- LocalRelation [v#1]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, like re-alias with exprId=1

Just did a quick search, rule TimeWindowing, Aggregation did it. AFAIK it's common when need to change the plan in the middle and don't want to affect the parent nodes.

Copy link
Member Author

@maropu maropu Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I tried it first, but RemoveNoopOperators will remove a Project with a rewritten alias https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L480
Because it assumes projects having common exprIds have the semantic-equally output. There may be a way to avoid the case and I'll check TimeWindowing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of TimeWindowing, it seems RemoveNoopOperators cannot remove a project having the same exprIds because the output numbers are different before/after transformation (L3622).

val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)(
exprId = windowAttr.exprId, explicitMetadata = Some(metadata))
val replacedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}
// For backwards compatibility we add a filter to filter out nulls
val filterExpr = IsNotNull(window.timeColumn)
replacedPlan.withNewChildren(
Filter(filterExpr,
Project(windowStruct +: child.output, child)) :: Nil)
} else {

Looked around the related code though, I couldn't find a solution to avoid the case. Any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't RemoveNoopOperators check if the outputs are semantically equal? Is cast(v#1 as decimal(11,0)) AS v#1 semantically equal to v#1? Canonicalize should keep cast and alias.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm I see. I'll check that approach.

Copy link
Member Author

@maropu maropu Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, we still need to update parent nodes even if we re-alias it. For example, in the example of the PR description;

!Project [v#1]  <------ this project already has `AttributeReference(v, decimal(10, 0))#1`, so
                        we need to update the data type, too
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#1] <----- re-alias with exprId=#1
      :  +- Project [v#1] <----- dataType=decimal(10, 0)
      :     +- SubqueryAlias t3
      :        +- SubqueryAlias tbl
      :           +- LocalRelation [v#1]

      +- Project [v#2] <----- dataType=decimal(11, 0)
         +- ...

the parent Project has a attribute reference with exprId=1 and dataType=decimal(10, 0). So, IIUC we need to update the data type, too. If we don't update it, plan integrity can break, e.g., in PushProjectionThroughUnion.

-- !query
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v)
-- !query schema
struct<>
-- !query output

-- !query
SELECT t.v FROM (
  SELECT v FROM t3
  UNION ALL
  SELECT v + v AS v FROM t3
) t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.errors.package$TreeNodeException
After applying rule org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion in batch Operator Optimization before Inferring Filters, the structural integrity of the plan is broken., tree:
'Union false, false
:- Project [v#183]
:  +- Project [cast(v#183 as decimal(11,0)) AS v#183]
:     +- Project [v#183]
:        +- LocalRelation [v#183]
+- Project [v#184]
   +- Project [v#184]
      +- Project [CheckOverflow((promote_precision(cast(v#183 as decimal(11,0))) + promote_precision(cast(v#183 as decimal(11,0)))), DecimalType(11,0), true) AS v#184]
         +- LocalRelation [v#183]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird. For all the type coercion rules, they extend TypeCoercionRule, which will update parent nodes with data type change automatically.

@@ -328,27 +328,46 @@ object TypeCoercion {
*/
object WidenSetOperationTypes extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this rule should extend TypeCoercionRule

Copy link
Member Author

@maropu maropu Aug 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I missed that. I'll replace it with TypeCoercionRule then check the re-alias approach again.

Comment on lines 480 to 481
case Project(projList, child) if projList.length == child.output.length &&
projList.zip(child.output).forall { case (e1, e2) => e1.semanticEquals(e2) } => child
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for Alias?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, yes. I modified the code to handle the case below;

:- Project [v#183]
:  +- Project [cast(v#183 as decimal(11,0)) AS v#183]
:     +- Project [v#183]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you a comment for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure when we add RemoveNoopOperators if we consider this case. But seems this change didn't cause any test failure.

@SparkQA
Copy link

SparkQA commented Aug 25, 2020

Test build #127893 has finished for PR 29485 at commit 2340afe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -85,14 +84,19 @@ case class RemoveRedundantProjects(conf: SQLConf) extends Rule[SparkPlan] {
// to convert the rows to UnsafeRow. See DataSourceV2Strategy for more details.
case d: DataSourceV2ScanExecBase if !d.supportsColumnar => false
case _ =>
def semanticEquals(exprs1: Seq[Expression], exprs2: Seq[Expression]): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also a comment here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we agree that aliasing an existing exprId shouldn't be removed by RemoveNoopOperators or RemoveRedundantProjects, this change is okay.

@maropu
Copy link
Member Author

maropu commented Aug 26, 2020

Thanks for the review, @viirya ! cc: @cloud-fan

@cloud-fan
Copy link
Contributor

It's a bit frustrating to see the issue in RemoveNoopOperators and RemoveRedundantProjects. It should be a valid assumption that attributes with the same exprId are the same attribute, but Spark doesn't guarantee it.

Maybe we should go with the other direction: create new attributes when necessary, and rewrite the parent nodes. We need to follow ResolveReferences.rewritePlan, which uses a custom way to traverse the plan tree, instead of the normal transform.

I'll check other places that create Alias with existing exprId, @maropu can you update this PR to go with the safer direction? Sorry for the back and forth!

@maropu
Copy link
Member Author

maropu commented Aug 26, 2020

NVM, @cloud-fan. okay, I'll update it to follow ResolveReferences.rewritePlan.

* @return a rewritten plan and updated references related to a root node of
* the given `plan` for rewriting it.
*/
def rewritePlan(plan: LogicalPlan, rewritePlanMap: Map[LogicalPlan, LogicalPlan])
Copy link
Member Author

@maropu maropu Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote the existing rewritePlan a bit, then just reused it for WidenSetOperationTypes . Does this udpate satisfy your intention? #29485 (comment)

@SparkQA
Copy link

SparkQA commented Aug 28, 2020

Test build #127971 has finished for PR 29485 at commit 6372515.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Sep 2, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Sep 3, 2020

Test build #128216 has finished for PR 29485 at commit 6372515.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case (e, _) =>
e -> e
}.unzip
Project(casted._1, plan) -> Project(casted._2, plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are we doing here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generates a rewrite map used for Analyzer.rewritePlan. The rewritePlan assumes a plan structure is the same before/after plan rewriting, so this WidenSetOperationTypes rule does two-phase transformation now as follows;

### Input Plan (Query described in the PR description) ###
Project [v#1]
+- SubqueryAlias t
   +- Union
      :+- Project [v#1]
      :   +- SubqueryAlias t3
      :      ...
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            +- SubqueryAlias t3
               ...

### Phase-1 (Adds Project, but not update ExprId) ###
Project [v#1]
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#1] <--- !!!Adds Project to widen a type!!!
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        ...
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            ...

### Phase-2 ###
// Analyzer.rewritePlan updates ExprIds based on a rewrite map:
// `Project [cast(v#1 as decimal(11,0)) AS v#1]` => Project [cast(v#1 as decimal(11,0)) AS v#3]
Project [v#3] <--- !!!Updates ExprId!!!
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#3] <--- !!!Updates ExprId!!!
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        ...
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            ...

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@maropu
Copy link
Member Author

maropu commented Sep 4, 2020

Thanks for the reviews, @cloud-fan @viirya !

maropu pushed a commit that referenced this pull request Sep 8, 2020
…Plan

### What changes were proposed in this pull request?

This is a followup of #29485

It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so that it can work with `SparkPlan` as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that `WidenSetOperationTypes` can rewrite the plan in one shot like before.

### Why are the changes needed?

Code cleanup and generalize.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test

Closes #29643 from cloud-fan/cleanup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
maropu pushed a commit to maropu/spark that referenced this pull request Sep 8, 2020
…Plan

### What changes were proposed in this pull request?

This is a followup of apache#29485

It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so that it can work with `SparkPlan` as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that `WidenSetOperationTypes` can rewrite the plan in one shot like before.

### Why are the changes needed?

Code cleanup and generalize.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test

Closes apache#29643 from cloud-fan/cleanup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
maropu added a commit that referenced this pull request Sep 8, 2020
…denSetOperationTypes

### What changes were proposed in this pull request?

This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example,
```
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
SELECT t.v FROM (
  SELECT v FROM t3
  UNION ALL
  SELECT v + v AS v FROM t3
) t;

org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1]  <------ the reference got missing
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#3]
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        +- SubqueryAlias tbl
      :           +- LocalRelation [v#1]
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            +- SubqueryAlias t3
               +- SubqueryAlias tbl
                  +- LocalRelation [v#1]
```
In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule.

This backport for 3.0 comes from #29485 and #29643

### Why are the changes needed?

bugfixes

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests

Closes #29680 from maropu/SPARK-32638-BRANCH3.0.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
maropu added a commit that referenced this pull request Sep 30, 2020
…ibute in logical plans

### What changes were proposed in this pull request?

Some plan transformations (e.g., `RemoveNoopOperators`) implicitly assume the same `ExprId` refers to the unique attribute. But, `RuleExecutor` does not check this integrity between logical plan transformations. So, this PR intends to add this check in `isPlanIntegral` of `Analyzer`/`Optimizer`.

This PR comes from the talk with cloud-fan viirya in #29485 (comment)

### Why are the changes needed?

For better logical plan integrity checking.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29585 from maropu/PlanIntegrityTest.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
a0x8o added a commit to a0x8o/spark that referenced this pull request Sep 30, 2020
…ibute in logical plans

### What changes were proposed in this pull request?

Some plan transformations (e.g., `RemoveNoopOperators`) implicitly assume the same `ExprId` refers to the unique attribute. But, `RuleExecutor` does not check this integrity between logical plan transformations. So, this PR intends to add this check in `isPlanIntegral` of `Analyzer`/`Optimizer`.

This PR comes from the talk with cloud-fan viirya in apache/spark#29485 (comment)

### Why are the changes needed?

For better logical plan integrity checking.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29585 from maropu/PlanIntegrityTest.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…denSetOperationTypes

### What changes were proposed in this pull request?

This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example,
```
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
SELECT t.v FROM (
  SELECT v FROM t3
  UNION ALL
  SELECT v + v AS v FROM t3
) t;

org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1]  <------ the reference got missing
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#3]
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        +- SubqueryAlias tbl
      :           +- LocalRelation [v#1]
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            +- SubqueryAlias t3
               +- SubqueryAlias tbl
                  +- LocalRelation [v#1]
```
In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule.

This backport for 3.0 comes from apache#29485 and apache#29643

### Why are the changes needed?

bugfixes

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests

Closes apache#29680 from maropu/SPARK-32638-BRANCH3.0.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants