From 3b549f4309497ecbe9f0b7a20d22a9a4417abb8b Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 6 Jun 2022 20:58:23 +0800 Subject: [PATCH] [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN ### What changes were proposed in this pull request? Follows up from https://github.com/apache/spark/pull/31666. This PR introduced a bug where the qualified star expansion of a subquery alias containing a NATURAL/USING output duplicated columns. ### Why are the changes needed? Duplicated, hidden columns should not be output from a star expansion. ### Does this PR introduce _any_ user-facing change? The query ``` val df1 = Seq((3, 8)).toDF("a", "b") val df2 = Seq((8, 7)).toDF("b", "d") val joinDF = df1.join(df2, "b") joinDF.alias("r").select("r.*") ``` Now outputs a single column `b`, instead of two (duplicate) columns for `b`. ### How was this patch tested? UTs Closes #36763 from karenfeng/SPARK-39376. Authored-by: Karen Feng Signed-off-by: Wenchen Fan (cherry picked from commit 18ca369f01905b421a658144e23b5a4e60702655) Signed-off-by: Wenchen Fan --- .../plans/logical/basicLogicalOperators.scala | 3 ++- .../apache/spark/sql/DataFrameJoinSuite.scala | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 692601be75d10..774f6956162e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1328,7 +1328,8 @@ case class SubqueryAlias( override def metadataOutput: Seq[Attribute] = { val qualifierList = identifier.qualifier :+ alias - child.metadataOutput.map(_.withQualifier(qualifierList)) + val nonHiddenMetadataOutput = child.metadataOutput.filter(!_.supportsQualifiedStar) + nonHiddenMetadataOutput.map(_.withQualifier(qualifierList)) } override def maxRows: Option[Long] = child.maxRows diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index a803fa88ed313..1fda13f996a47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -499,4 +499,26 @@ class DataFrameJoinSuite extends QueryTest ) } } + + test("SPARK-39376: Hide duplicated columns in star expansion of subquery alias from USING JOIN") { + val joinDf = testData2.as("testData2").join( + testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter") + val equivalentQueries = Seq( + joinDf.select($"*"), + joinDf.as("r").select($"*"), + joinDf.as("r").select($"r.*") + ) + equivalentQueries.foreach { query => + checkAnswer(query, + Seq( + Row(1, 1, null), + Row(1, 2, null), + Row(2, 1, 2), + Row(2, 2, 2), + Row(3, 1, null), + Row(3, 2, null) + ) + ) + } + } }