Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN #31666

Closed
wants to merge 44 commits into from

Conversation

karenfeng
Copy link
Contributor

@karenfeng karenfeng commented Feb 26, 2021

What changes were proposed in this pull request?

Adds the duplicated common columns as hidden columns to the Projection used to rewrite NATURAL/USING JOINs.

Why are the changes needed?

Allows users to resolve either side of the NATURAL/USING JOIN's common keys.
Previously, the user could only resolve the following columns:

Join type Left key columns Right key columns
Inner Yes No
Left Yes No
Right No Yes
Outer No No

Does this PR introduce any user-facing change?

Yes. The user can now symmetrically resolve the common columns from a NATURAL/USING JOIN.

How was this patch tested?

SQL-side tests. The behavior matches PostgreSQL and MySQL.

Signed-off-by: Karen Feng <karen.feng@databricks.com>
… column

Signed-off-by: Karen Feng <karen.feng@databricks.com>
…4527

Signed-off-by: Karen Feng <karen.feng@databricks.com>
@github-actions github-actions bot added the SQL label Feb 26, 2021
@SparkQA
Copy link

SparkQA commented Feb 27, 2021

Test build #135521 has finished for PR 31666 at commit 2c261bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait PartitionSpec extends LeafExpression with Unevaluable
  • trait V2PartitionCommand extends Command
  • case class TruncateTable(table: LogicalPlan) extends Command
  • case class TruncatePartition(
  • case class TruncatePartitionExec(

Signed-off-by: Karen Feng <karen.feng@databricks.com>
@SparkQA
Copy link

SparkQA commented Feb 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40108/

@SparkQA
Copy link

SparkQA commented Feb 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40108/

@SparkQA
Copy link

SparkQA commented Feb 27, 2021

Test build #135527 has finished for PR 31666 at commit 80beda8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Signed-off-by: Karen Feng <karen.feng@databricks.com>
@SparkQA
Copy link

SparkQA commented Feb 27, 2021

Test build #135531 has finished for PR 31666 at commit e1719d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…4527

Signed-off-by: Karen Feng <karen.feng@databricks.com>
@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40256/

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40256/

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Test build #135674 has finished for PR 31666 at commit 6fa70ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class JavaModelSelectionViaRandomHyperparametersExample
  • class GangliaSink(
  • case class Limits[T: Numeric](x: T, y: T)
  • abstract class Generator[T: Numeric]
  • class ParamRandomBuilder extends ParamGridBuilder
  • class ParamRandomBuilder(ParamGridBuilder):
  • case class Product(child: Expression)
  • case class AnalyzeTables(
  • case class AnalyzeTablesCommand(

@@ -94,6 +94,8 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
}
}
newNode.copyTagsFrom(this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exists in transformUp, but not in resolveOperatorsUp - was the difference intentional or unintentional? Without the tags, the metadata cannot be resolved properly (isMetadataCol is always false).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a mistake.

lazy val childMetadataOutput = plan.children.flatMap(_.metadataOutput)
plan.expressions.flatMap(_.collect {
case a: Attribute if a.isMetadataCol => a
case a: Attribute if childMetadataOutput.exists(_.exprId == a.exprId) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This occurs in the case that a column is resolved below the level at which it becomes labeled as metadata. For the NATURAL/USING JOIN, this occurs when the column is resolved at the level of the root table - it is only labeled as hidden when it is used as a key column in the join.

@cloud-fan
Copy link
Contributor

This fix applies in SQL but does not apply in Scala; this seems to be related to the metadata column framework in the DSv2 API.

This is still true? I think your previous bug fix PR solved it. We can add some tests to verify it (even if it fails, we need to show people the behavior of the Scala API)

@karenfeng
Copy link
Contributor Author

This fix applies in SQL but does not apply in Scala; this seems to be related to the metadata column framework in the DSv2 API.

This is still true? I think your previous bug fix PR solved it. We can add some tests to verify it (even if it fails, we need to show people the behavior of the Scala API)

Whoops, I forgot to change the PR description. This no longer holds. Thanks for the catch @cloud-fan!

testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter")
val dfQuery = joinDf.select(
$"a", $"testData2.a", $"testData2.b", $"testData3.a", $"testData3.b")
val dfQuery2 = joinDf.select(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These demonstrate that the behavior now works in Scala.

@@ -3370,54 +3435,6 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

private def commonNaturalJoinProcessing(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we move this method? It creates a lot of code diff and makes it harder to review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move it back - I just wasn't sure why it lived outside of this class, given that it's not shared.


override def metadataOutput: Seq[Attribute] = {
child.metadataOutput ++
getTagValue(hiddenOutputTag).getOrElse(Seq.empty[Attribute])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that we need to use TreeNodeTag to store the extra information in Project, but I don't have a better idea without changing the Project constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this more generic by adding this LogicalPlan's metadataOutput, but that would complicate how we can add these hidden columns in AddMetadataColumns.

Signed-off-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Karen Feng <karen.feng@databricks.com>
@SparkQA
Copy link

SparkQA commented Mar 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40359/

@SparkQA
Copy link

SparkQA commented Apr 13, 2021

Test build #137253 has finished for PR 31666 at commit 9e62d7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -957,6 +946,36 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
}

private def getMetadataAttributes(plan: LogicalPlan): Seq[Attribute] = {
lazy val childMetadataOutput = plan.children.flatMap(_.metadataOutput)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can avoid building a new Seq frequently. The check can be
plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same to hasMetadataCol


/**
* Hidden columns are a type of metadata column that are candidates during qualified star
* star expansions. They are propagated through Projects that have hidden children output,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment needs update again.

* star expansions. They are propagated through Projects that have hidden children output,
* so that nested hidden output is not lost.
*/
val HIDDEN_COL_ATTR_KEY = "__hidden_col"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic is clear now, let's refine the naming.

We only have metadata column, and metadata column can be included in qualified star if required. We can just add a new property to metadata columns to indicate it.

The property name can be __support_qualified_star, and the helper class can be

implicit class MetadataColumnHelper(attr: Attribute) {
  def isMetadataCol: Boolean ...
  def supportQualifiedStar: Boolean ...
  def markAsSupportQualifiedStar: Attribute ...
}

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minor comments

Signed-off-by: Karen Feng <karen.feng@databricks.com>
…4527

Signed-off-by: Karen Feng <karen.feng@databricks.com>
@SparkQA
Copy link

SparkQA commented Apr 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41881/

@SparkQA
Copy link

SparkQA commented Apr 13, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41881/

@SparkQA
Copy link

SparkQA commented Apr 13, 2021

Test build #137301 has finished for PR 31666 at commit 8f70c2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WriteToDataSourceV2(
  • case class WriteToMicroBatchDataSource(

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 816f6dd Apr 14, 2021
cloud-fan pushed a commit that referenced this pull request Jun 6, 2022
…ery alias from NATURAL/USING JOIN

### What changes were proposed in this pull request?

Follows up from #31666. This PR introduced a bug where the qualified star expansion of a subquery alias containing a NATURAL/USING output duplicated columns.

### Why are the changes needed?

Duplicated, hidden columns should not be output from a star expansion.

### Does this PR introduce _any_ user-facing change?

The query

```
val df1 = Seq((3, 8)).toDF("a", "b")
val df2 = Seq((8, 7)).toDF("b", "d")
val joinDF = df1.join(df2, "b")
joinDF.alias("r").select("r.*")
```

Now outputs a single column `b`, instead of two (duplicate) columns for `b`.

### How was this patch tested?

UTs

Closes #36763 from karenfeng/SPARK-39376.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 18ca369)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jun 6, 2022
…ery alias from NATURAL/USING JOIN

### What changes were proposed in this pull request?

Follows up from #31666. This PR introduced a bug where the qualified star expansion of a subquery alias containing a NATURAL/USING output duplicated columns.

### Why are the changes needed?

Duplicated, hidden columns should not be output from a star expansion.

### Does this PR introduce _any_ user-facing change?

The query

```
val df1 = Seq((3, 8)).toDF("a", "b")
val df2 = Seq((8, 7)).toDF("b", "d")
val joinDF = df1.join(df2, "b")
joinDF.alias("r").select("r.*")
```

Now outputs a single column `b`, instead of two (duplicate) columns for `b`.

### How was this patch tested?

UTs

Closes #36763 from karenfeng/SPARK-39376.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jun 6, 2022
…ery alias from NATURAL/USING JOIN

### What changes were proposed in this pull request?

Follows up from #31666. This PR introduced a bug where the qualified star expansion of a subquery alias containing a NATURAL/USING output duplicated columns.

### Why are the changes needed?

Duplicated, hidden columns should not be output from a star expansion.

### Does this PR introduce _any_ user-facing change?

The query

```
val df1 = Seq((3, 8)).toDF("a", "b")
val df2 = Seq((8, 7)).toDF("b", "d")
val joinDF = df1.join(df2, "b")
joinDF.alias("r").select("r.*")
```

Now outputs a single column `b`, instead of two (duplicate) columns for `b`.

### How was this patch tested?

UTs

Closes #36763 from karenfeng/SPARK-39376.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


-- !query
SELECT k FROM nt1 inner join nt2 using (k)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT t.* FROM (SELECT k FROM nt1 inner join nt2 using (k)) as t
That used to return the same results, but now it returns the results of the query on line 302.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should add that I haven't tested this exact scenario, but I have run into a very similar issue when I attempted to switch to spark 3.2.0. I'll open a bug report when I get a chance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried #36763 ? The issue should have been fixed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not know about that, thank you. Now I just have to wait for EMR to support Spark 3.2.2.

cloud-fan added a commit that referenced this pull request Sep 7, 2022
### What changes were proposed in this pull request?

This PR fixes a regression caused by #32017 .

In #32017 , we tried to be more conservative and decided to not propagate metadata columns in certain operators, including `Project`. However, the decision was made only considering SQL API, not DataFrame API. In fact, it's very common to chain `Project` operators in DataFrame, e.g. `df.withColumn(...).withColumn(...)...`, and it's very inconvenient if metadata columns are not propagated through `Project`.

This PR makes 2 changes:
1. Project should propagate metadata columns
2. SubqueryAlias should only propagate metadata columns if the child is a leaf node or also a SubqueryAlias

The second change is needed to still forbid weird queries like `SELECT m from (SELECT a from t)`, which is the main motivation of #32017 .

After propagating metadata columns, a problem from #31666 is exposed: the natural join metadata columns may confuse the analyzer and lead to wrong analyzed plan. For example, `SELECT t1.value FROM t1 LEFT JOIN t2 USING (key) ORDER BY key`, how shall we resolve `ORDER BY key`? It should be resolved to `t1.key` via the rule `ResolveMissingReferences`, which is in the output of the left join. However, if `Project` can propagate metadata columns, `ORDER BY key` will be resolved to `t2.key`.

To solve this problem, this PR only allows qualified access for metadata columns of natural join. This has no breaking change, as people can only do qualified access for natural join metadata columns before, in the `Project` right after `Join`. This actually enables more use cases, as people can now access natural join metadata columns in ORDER BY. I've added a test for it.

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

For SQL API, there is no change, as a `SubqueryAlias` always comes with a `Project` or `Aggregate`, so we still don't propagate metadata columns through a SELECT group.

For DataFrame API, the behavior becomes more lenient. The only breaking case is an operator that can propagate metadata columns then follows a `SubqueryAlias`, e.g. `df.filter(...).as("t").select("t.metadata_col")`. But this is a weird use case and I don't think we should support it at the first place.

### How was this patch tested?

new tests

Closes #37758 from cloud-fan/metadata.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Sep 7, 2022
This PR fixes a regression caused by #32017 .

In #32017 , we tried to be more conservative and decided to not propagate metadata columns in certain operators, including `Project`. However, the decision was made only considering SQL API, not DataFrame API. In fact, it's very common to chain `Project` operators in DataFrame, e.g. `df.withColumn(...).withColumn(...)...`, and it's very inconvenient if metadata columns are not propagated through `Project`.

This PR makes 2 changes:
1. Project should propagate metadata columns
2. SubqueryAlias should only propagate metadata columns if the child is a leaf node or also a SubqueryAlias

The second change is needed to still forbid weird queries like `SELECT m from (SELECT a from t)`, which is the main motivation of #32017 .

After propagating metadata columns, a problem from #31666 is exposed: the natural join metadata columns may confuse the analyzer and lead to wrong analyzed plan. For example, `SELECT t1.value FROM t1 LEFT JOIN t2 USING (key) ORDER BY key`, how shall we resolve `ORDER BY key`? It should be resolved to `t1.key` via the rule `ResolveMissingReferences`, which is in the output of the left join. However, if `Project` can propagate metadata columns, `ORDER BY key` will be resolved to `t2.key`.

To solve this problem, this PR only allows qualified access for metadata columns of natural join. This has no breaking change, as people can only do qualified access for natural join metadata columns before, in the `Project` right after `Join`. This actually enables more use cases, as people can now access natural join metadata columns in ORDER BY. I've added a test for it.

fix a regression

For SQL API, there is no change, as a `SubqueryAlias` always comes with a `Project` or `Aggregate`, so we still don't propagate metadata columns through a SELECT group.

For DataFrame API, the behavior becomes more lenient. The only breaking case is an operator that can propagate metadata columns then follows a `SubqueryAlias`, e.g. `df.filter(...).as("t").select("t.metadata_col")`. But this is a weird use case and I don't think we should support it at the first place.

new tests

Closes #37758 from cloud-fan/metadata.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 99ae1d9)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit to cloud-fan/spark that referenced this pull request Sep 7, 2022
This PR fixes a regression caused by apache#32017 .

In apache#32017 , we tried to be more conservative and decided to not propagate metadata columns in certain operators, including `Project`. However, the decision was made only considering SQL API, not DataFrame API. In fact, it's very common to chain `Project` operators in DataFrame, e.g. `df.withColumn(...).withColumn(...)...`, and it's very inconvenient if metadata columns are not propagated through `Project`.

This PR makes 2 changes:
1. Project should propagate metadata columns
2. SubqueryAlias should only propagate metadata columns if the child is a leaf node or also a SubqueryAlias

The second change is needed to still forbid weird queries like `SELECT m from (SELECT a from t)`, which is the main motivation of apache#32017 .

After propagating metadata columns, a problem from apache#31666 is exposed: the natural join metadata columns may confuse the analyzer and lead to wrong analyzed plan. For example, `SELECT t1.value FROM t1 LEFT JOIN t2 USING (key) ORDER BY key`, how shall we resolve `ORDER BY key`? It should be resolved to `t1.key` via the rule `ResolveMissingReferences`, which is in the output of the left join. However, if `Project` can propagate metadata columns, `ORDER BY key` will be resolved to `t2.key`.

To solve this problem, this PR only allows qualified access for metadata columns of natural join. This has no breaking change, as people can only do qualified access for natural join metadata columns before, in the `Project` right after `Join`. This actually enables more use cases, as people can now access natural join metadata columns in ORDER BY. I've added a test for it.

fix a regression

For SQL API, there is no change, as a `SubqueryAlias` always comes with a `Project` or `Aggregate`, so we still don't propagate metadata columns through a SELECT group.

For DataFrame API, the behavior becomes more lenient. The only breaking case is an operator that can propagate metadata columns then follows a `SubqueryAlias`, e.g. `df.filter(...).as("t").select("t.metadata_col")`. But this is a weird use case and I don't think we should support it at the first place.

new tests

Closes apache#37758 from cloud-fan/metadata.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 99ae1d9)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Sep 7, 2022
backport #37758 to 3.2

### What changes were proposed in this pull request?

This PR fixes a regression caused by #32017 .

In #32017 , we tried to be more conservative and decided to not propagate metadata columns in certain operators, including `Project`. However, the decision was made only considering SQL API, not DataFrame API. In fact, it's very common to chain `Project` operators in DataFrame, e.g. `df.withColumn(...).withColumn(...)...`, and it's very inconvenient if metadata columns are not propagated through `Project`.

This PR makes 2 changes:
1. Project should propagate metadata columns
2. SubqueryAlias should only propagate metadata columns if the child is a leaf node or also a SubqueryAlias

The second change is needed to still forbid weird queries like `SELECT m from (SELECT a from t)`, which is the main motivation of #32017 .

After propagating metadata columns, a problem from #31666 is exposed: the natural join metadata columns may confuse the analyzer and lead to wrong analyzed plan. For example, `SELECT t1.value FROM t1 LEFT JOIN t2 USING (key) ORDER BY key`, how shall we resolve `ORDER BY key`? It should be resolved to `t1.key` via the rule `ResolveMissingReferences`, which is in the output of the left join. However, if `Project` can propagate metadata columns, `ORDER BY key` will be resolved to `t2.key`.

To solve this problem, this PR only allows qualified access for metadata columns of natural join. This has no breaking change, as people can only do qualified access for natural join metadata columns before, in the `Project` right after `Join`. This actually enables more use cases, as people can now access natural join metadata columns in ORDER BY. I've added a test for it.

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

For SQL API, there is no change, as a `SubqueryAlias` always comes with a `Project` or `Aggregate`, so we still don't propagate metadata columns through a SELECT group.

For DataFrame API, the behavior becomes more lenient. The only breaking case is an operator that can propagate metadata columns then follows a `SubqueryAlias`, e.g. `df.filter(...).as("t").select("t.metadata_col")`. But this is a weird use case and I don't think we should support it at the first place.

### How was this patch tested?

new tests

Closes #37818 from cloud-fan/backport.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…ery alias from NATURAL/USING JOIN

### What changes were proposed in this pull request?

Follows up from apache#31666. This PR introduced a bug where the qualified star expansion of a subquery alias containing a NATURAL/USING output duplicated columns.

### Why are the changes needed?

Duplicated, hidden columns should not be output from a star expansion.

### Does this PR introduce _any_ user-facing change?

The query

```
val df1 = Seq((3, 8)).toDF("a", "b")
val df2 = Seq((8, 7)).toDF("b", "d")
val joinDF = df1.join(df2, "b")
joinDF.alias("r").select("r.*")
```

Now outputs a single column `b`, instead of two (duplicate) columns for `b`.

### How was this patch tested?

UTs

Closes apache#36763 from karenfeng/SPARK-39376.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
backport apache#37758 to 3.2

This PR fixes a regression caused by apache#32017 .

In apache#32017 , we tried to be more conservative and decided to not propagate metadata columns in certain operators, including `Project`. However, the decision was made only considering SQL API, not DataFrame API. In fact, it's very common to chain `Project` operators in DataFrame, e.g. `df.withColumn(...).withColumn(...)...`, and it's very inconvenient if metadata columns are not propagated through `Project`.

This PR makes 2 changes:
1. Project should propagate metadata columns
2. SubqueryAlias should only propagate metadata columns if the child is a leaf node or also a SubqueryAlias

The second change is needed to still forbid weird queries like `SELECT m from (SELECT a from t)`, which is the main motivation of apache#32017 .

After propagating metadata columns, a problem from apache#31666 is exposed: the natural join metadata columns may confuse the analyzer and lead to wrong analyzed plan. For example, `SELECT t1.value FROM t1 LEFT JOIN t2 USING (key) ORDER BY key`, how shall we resolve `ORDER BY key`? It should be resolved to `t1.key` via the rule `ResolveMissingReferences`, which is in the output of the left join. However, if `Project` can propagate metadata columns, `ORDER BY key` will be resolved to `t2.key`.

To solve this problem, this PR only allows qualified access for metadata columns of natural join. This has no breaking change, as people can only do qualified access for natural join metadata columns before, in the `Project` right after `Join`. This actually enables more use cases, as people can now access natural join metadata columns in ORDER BY. I've added a test for it.

fix a regression

For SQL API, there is no change, as a `SubqueryAlias` always comes with a `Project` or `Aggregate`, so we still don't propagate metadata columns through a SELECT group.

For DataFrame API, the behavior becomes more lenient. The only breaking case is an operator that can propagate metadata columns then follows a `SubqueryAlias`, e.g. `df.filter(...).as("t").select("t.metadata_col")`. But this is a weird use case and I don't think we should support it at the first place.

new tests

Closes apache#37818 from cloud-fan/backport.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit d566017)
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
backport apache#37758 to 3.2

This PR fixes a regression caused by apache#32017 .

In apache#32017 , we tried to be more conservative and decided to not propagate metadata columns in certain operators, including `Project`. However, the decision was made only considering SQL API, not DataFrame API. In fact, it's very common to chain `Project` operators in DataFrame, e.g. `df.withColumn(...).withColumn(...)...`, and it's very inconvenient if metadata columns are not propagated through `Project`.

This PR makes 2 changes:
1. Project should propagate metadata columns
2. SubqueryAlias should only propagate metadata columns if the child is a leaf node or also a SubqueryAlias

The second change is needed to still forbid weird queries like `SELECT m from (SELECT a from t)`, which is the main motivation of apache#32017 .

After propagating metadata columns, a problem from apache#31666 is exposed: the natural join metadata columns may confuse the analyzer and lead to wrong analyzed plan. For example, `SELECT t1.value FROM t1 LEFT JOIN t2 USING (key) ORDER BY key`, how shall we resolve `ORDER BY key`? It should be resolved to `t1.key` via the rule `ResolveMissingReferences`, which is in the output of the left join. However, if `Project` can propagate metadata columns, `ORDER BY key` will be resolved to `t2.key`.

To solve this problem, this PR only allows qualified access for metadata columns of natural join. This has no breaking change, as people can only do qualified access for natural join metadata columns before, in the `Project` right after `Join`. This actually enables more use cases, as people can now access natural join metadata columns in ORDER BY. I've added a test for it.

fix a regression

For SQL API, there is no change, as a `SubqueryAlias` always comes with a `Project` or `Aggregate`, so we still don't propagate metadata columns through a SELECT group.

For DataFrame API, the behavior becomes more lenient. The only breaking case is an operator that can propagate metadata columns then follows a `SubqueryAlias`, e.g. `df.filter(...).as("t").select("t.metadata_col")`. But this is a weird use case and I don't think we should support it at the first place.

new tests

Closes apache#37818 from cloud-fan/backport.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit d566017)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants