Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN #31666

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
1c5ab03
Only use metadata columns for resolution as last resort
karenfeng Feb 26, 2021
2fe733f
Resolve deduplicated common columns in NATURAL/USING JOIN with hidden…
karenfeng Feb 26, 2021
2c261bb
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Feb 26, 2021
80beda8
Fix behavior in Scala
karenfeng Feb 27, 2021
e1719d3
Fix nested expression
karenfeng Feb 27, 2021
6fa70ba
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Mar 2, 2021
0ba1916
Address comments
karenfeng Mar 4, 2021
2b7e730
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Mar 4, 2021
0c116a5
Push SQL output
karenfeng Mar 4, 2021
bf87f55
Re-do changes
karenfeng Mar 7, 2021
b5dc44f
Fixup tag
karenfeng Mar 8, 2021
6e32b3d
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Mar 8, 2021
7c3f5df
Add another failing test
karenfeng Mar 8, 2021
181751a
Merge add metadata and resolve missing references
karenfeng Mar 29, 2021
ad5e824
Resolve down to avoid prematurely projecting out
karenfeng Mar 29, 2021
e36e853
Remove printlns
karenfeng Mar 29, 2021
db44c53
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Mar 29, 2021
73b7c8a
Clean up
karenfeng Mar 29, 2021
1eb01e2
Style fixup
karenfeng Mar 29, 2021
9fd2490
Formatting fix
karenfeng Mar 29, 2021
f5cc3ae
Address comments
karenfeng Mar 30, 2021
fa7207e
Metadata output should be empty by default
karenfeng Mar 31, 2021
7af12ae
Clean up
karenfeng Mar 31, 2021
07f9ad5
Always resolve with metadata
karenfeng Mar 31, 2021
c474745
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 6, 2021
0f267e7
Revert accidental metadata changes
karenfeng Apr 6, 2021
ed0270c
Revert DatasourceV2SQLSuite
karenfeng Apr 6, 2021
66ad572
Revert unneeded style changes
karenfeng Apr 6, 2021
fc3b16d
Revert accidental metadata output change
karenfeng Apr 6, 2021
f665030
Add childrens' hidden output to Project metadataOutput
karenfeng Apr 7, 2021
85b81b1
Retrigger tests
karenfeng Apr 7, 2021
eab7964
Add comments
karenfeng Apr 7, 2021
44ee9f8
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 7, 2021
c84f396
Address comments and fix propagation through Projects
karenfeng Apr 7, 2021
0fe04a2
Retrigger tests
karenfeng Apr 7, 2021
c7c3df6
Retrigger tests
karenfeng Apr 7, 2021
b1bf28d
Retrigger tests
karenfeng Apr 8, 2021
47be66d
Wording
karenfeng Apr 9, 2021
8c5144e
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 9, 2021
333a815
propagate hidden columns from nested NATURAL/USING JOINs
karenfeng Apr 12, 2021
49de5c5
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 12, 2021
9e62d7d
Retrigger tests
karenfeng Apr 12, 2021
446d4bc
address comments
karenfeng Apr 13, 2021
8f70c2d
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ class Analyzer(override val catalogManager: CatalogManager)
*
* References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
* but the relation's output does not include the metadata columns until the relation is replaced
* using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
* with a copy adding them to the output. Unless this rule adds metadata to the relation's output,
* relation's output, the analyzer will detect that nothing produces the columns.
karenfeng marked this conversation as resolved.
Show resolved Hide resolved
*
* This rule only adds metadata columns when a node is resolved but is missing input from its
Expand All @@ -988,31 +988,43 @@ class Analyzer(override val catalogManager: CatalogManager)
* columns are not accidentally selected by *.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
import org.apache.spark.sql.catalyst.util._

private def getMetadataAttributes(plan: LogicalPlan): Seq[Attribute] = {
lazy val childMetadataOutput = plan.children.flatMap(_.metadataOutput)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can avoid building a new Seq frequently. The check can be
plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same to hasMetadataCol

plan.expressions.flatMap(_.collect {
case a: Attribute if a.isMetadataCol => a
case a: Attribute if childMetadataOutput.exists(_.exprId == a.exprId) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This occurs in the case that a column is resolved below the level at which it becomes labeled as metadata. For the NATURAL/USING JOIN, this occurs when the column is resolved at the level of the root table - it is only labeled as hidden when it is used as a key column in the join.

childMetadataOutput.find(_.exprId == a.exprId).get
})
}

private def hasMetadataCol(plan: LogicalPlan): Boolean = {
lazy val childMetadataOutput = plan.children.flatMap(_.metadataOutput)
plan.expressions.exists(_.find {
case a: Attribute => a.isMetadataCol
case a: Attribute =>
a.isMetadataCol || childMetadataOutput.exists(_.exprId == a.exprId)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
case _ => false
}.isDefined)
}

private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
case r: DataSourceV2Relation => r.withMetadataColumns()
case p: Project => p.copy(
projectList = p.metadataOutput ++ p.projectList,
child = addMetadataCol(p.child))
case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) =>
val inputAttrs = AttributeSet(node.children.flatMap(_.output))
val metaCols = node.expressions.flatMap(_.collect {
case a: Attribute if a.isMetadataCol && !inputAttrs.contains(a) => a
})
val metaCols = getMetadataAttributes(node).filterNot(inputAttrs.contains)
if (metaCols.isEmpty) {
node
} else {
val newNode = addMetadataCol(node)
// We should not change the output schema of the plan. We should project away the extr
// We should not change the output schema of the plan. We should project away the extra
// metadata columns if necessary.
if (newNode.sameOutput(node)) {
newNode
Expand Down Expand Up @@ -3283,6 +3295,59 @@ class Analyzer(override val catalogManager: CatalogManager)
* Then apply a Project on a normal Join to eliminate natural or using join.
*/
object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
private def commonNaturalJoinProcessing(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
joinNames: Seq[String],
condition: Option[Expression],
hint: JoinHint): LogicalPlan = {
import org.apache.spark.sql.catalyst.util._

val leftKeys = joinNames.map { keyName =>
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, left, "left")
}
}
val rightKeys = joinNames.map { keyName =>
right.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, right, "right")
}
}
val joinPairs = leftKeys.zip(rightKeys)

val newCondition = (condition ++ joinPairs.map(EqualTo.tupled)).reduceOption(And)

// columns not in joinPairs
val lUniqueOutput = left.output.filterNot(att => leftKeys.contains(att))
val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att))

// the output list looks like: join keys, columns from left, columns from right
val (projectList, hiddenList) = joinType match {
case LeftOuter =>
(leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)), rightKeys)
case LeftExistence(_) =>
(leftKeys ++ lUniqueOutput, Seq.empty)
case RightOuter =>
(rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput, leftKeys)
case FullOuter =>
// in full outer join, joinCols should be non-null if there is.
val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() }
(joinedCols ++
lUniqueOutput.map(_.withNullability(true)) ++
rUniqueOutput.map(_.withNullability(true)),
leftKeys ++ rightKeys)
case _ : InnerLike =>
(leftKeys ++ lUniqueOutput ++ rUniqueOutput, rightKeys)
case _ =>
sys.error("Unsupported natural join type " + joinType)
}
// use Project to hide duplicated common keys
val project = Project(projectList, Join(left, right, joinType, newCondition, hint))
project.setTagValue(project.hiddenOutputTag, hiddenList.map(_.asHiddenCol()))
project
}

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case j @ Join(left, right, UsingJoin(joinType, usingCols), _, hint)
if left.resolved && right.resolved && j.duplicateResolved =>
Expand Down Expand Up @@ -3370,54 +3435,6 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

private def commonNaturalJoinProcessing(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we move this method? It creates a lot of code diff and makes it harder to review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move it back - I just wasn't sure why it lived outside of this class, given that it's not shared.

left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
joinNames: Seq[String],
condition: Option[Expression],
hint: JoinHint) = {
val leftKeys = joinNames.map { keyName =>
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, left, "left")
}
}
val rightKeys = joinNames.map { keyName =>
right.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, right, "right")
}
}
val joinPairs = leftKeys.zip(rightKeys)

val newCondition = (condition ++ joinPairs.map(EqualTo.tupled)).reduceOption(And)

// columns not in joinPairs
val lUniqueOutput = left.output.filterNot(att => leftKeys.contains(att))
val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att))

// the output list looks like: join keys, columns from left, columns from right
val projectList = joinType match {
case LeftOuter =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
case LeftExistence(_) =>
leftKeys ++ lUniqueOutput
case RightOuter =>
rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
case FullOuter =>
// in full outer join, joinCols should be non-null if there is.
val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() }
joinedCols ++
lUniqueOutput.map(_.withNullability(true)) ++
rUniqueOutput.map(_.withNullability(true))
case _ : InnerLike =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput
case _ =>
sys.error("Unsupported natural join type " + joinType)
}
// use Project to trim unnecessary fields
Project(projectList, Join(left, right, joinType, newCondition, hint))
}

/**
* Replaces [[UnresolvedDeserializer]] with the deserialization expression that has been resolved
* to the given input attributes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.parser.ParserUtils
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
Expand Down Expand Up @@ -316,11 +316,11 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
* Returns true if the nameParts is a subset of the last elements of qualifier of the attribute.
*
* For example, the following should all return true:
* - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", "ns2", "t") and
* - `SELECT ns1.ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns1", "ns2", "t") and
* qualifier is Seq("ns1", "ns2", "t").
* - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and
* - `SELECT ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns2", "t") and
* qualifier is Seq("ns1", "ns2", "t").
* - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and
* - `SELECT t.* FROM ns1.ns2.t` where nameParts is Seq("t") and
* qualifier is Seq("ns1", "ns2", "t").
*/
private def matchedQualifier(
Expand All @@ -342,10 +342,13 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
override def expand(
input: LogicalPlan,
resolver: Resolver): Seq[NamedExpression] = {
// If there is no table specified, use all input attributes.
// If there is no table specified, use all non-hidden input attributes.
if (target.isEmpty) return input.output

val expandedAttributes = input.output.filter(matchedQualifier(_, target.get, resolver))
// If there is a table specified, use hidden input attributes as well
val hiddenOutput = input.metadataOutput.filter(_.isHiddenCol)
val expandedAttributes = (hiddenOutput ++ input.output).filter(
matchedQualifier(_, target.get, resolver))

if (expandedAttributes.nonEmpty) return expandedAttributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
if (!analyzed) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
if (self fastEquals afterRuleOnChildren) {
val newNode = if (self fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(self, identity[LogicalPlan])
}
Expand All @@ -94,6 +94,8 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
}
}
newNode.copyTagsFrom(this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exists in transformUp, but not in resolveOperatorsUp - was the difference intentional or unintentional? Without the tags, the metadata cannot be resolved properly (isMetadataCol is always false).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a mistake.

newNode
}
} else {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.random.RandomSampler
Expand Down Expand Up @@ -76,6 +77,13 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)

override lazy val validConstraints: ExpressionSet =
getAllValidConstraints(projectList)

val hiddenOutputTag: TreeNodeTag[Seq[Attribute]] = TreeNodeTag[Seq[Attribute]]("hiddenOutput")
karenfeng marked this conversation as resolved.
Show resolved Hide resolved

override def metadataOutput: Seq[Attribute] = {
child.metadataOutput ++
getTagValue(hiddenOutputTag).getOrElse(Seq.empty[Attribute])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that we need to use TreeNodeTag to store the extra information in Project, but I don't have a better idea without changing the Project constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this more generic by adding this LogicalPlan's metadataOutput, but that would complicate how we can add these hidden columns in AddMetadataColumns.

}
}

/**
Expand Down Expand Up @@ -950,7 +958,7 @@ case class SubqueryAlias(

override def metadataOutput: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
child.metadataOutput.map(_.withQualifier(qualifierList))
child.metadataOutput.filterNot(_.isHiddenCol).map(_.withQualifier(qualifierList))
}

override def doCanonicalize(): LogicalPlan = child.canonicalized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{NumericType, StringType}
import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -193,4 +193,29 @@ package object util extends Logging {
def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
truncatedString(seq, "", sep, "", maxFields)
}

val METADATA_COL_ATTR_KEY = "__metadata_col"
implicit class MetadataColumnHelper(attr: Attribute) {
def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
}

/**
* Hidden columns are a type of metadata column that are not propagated through subquery aliases,
* and are candidates during qualified star expansions.
karenfeng marked this conversation as resolved.
Show resolved Hide resolved
*/
val HIDDEN_COL_ATTR_KEY = "__hidden_col"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic is clear now, let's refine the naming.

We only have metadata column, and metadata column can be included in qualified star if required. We can just add a new property to metadata columns to indicate it.

The property name can be __support_qualified_star, and the helper class can be

implicit class MetadataColumnHelper(attr: Attribute) {
  def isMetadataCol: Boolean ...
  def supportQualifiedStar: Boolean ...
  def markAsSupportQualifiedStar: Attribute ...
}

implicit class HiddenColumnHelper(attr: Attribute) {
karenfeng marked this conversation as resolved.
Show resolved Hide resolved
def isHiddenCol: Boolean = attr.isMetadataCol &&
attr.metadata.contains(HIDDEN_COL_ATTR_KEY) &&
attr.metadata.getBoolean(HIDDEN_COL_ATTR_KEY)

def asHiddenCol(): Attribute = attr.withMetadata(
new MetadataBuilder()
.withMetadata(attr.metadata)
.putBoolean(METADATA_COL_ATTR_KEY, true)
.putBoolean(HIDDEN_COL_ATTR_KEY, true)
.build()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable}
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object DataSourceV2Implicits {
private val METADATA_COL_ATTR_KEY = "__metadata_col"

implicit class TableHelper(table: Table) {
def asReadable: SupportsRead = {
table match {
Expand Down Expand Up @@ -103,11 +102,6 @@ object DataSourceV2Implicits {
def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
}

implicit class MetadataColumnHelper(attr: Attribute) {
def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
}

implicit class OptionsHelper(options: Map[String, String]) {
def asOptions: CaseInsensitiveStringMap = {
new CaseInsensitiveStringMap(options.asJava)
Expand Down
Loading