Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN #31666

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
1c5ab03
Only use metadata columns for resolution as last resort
karenfeng Feb 26, 2021
2fe733f
Resolve deduplicated common columns in NATURAL/USING JOIN with hidden…
karenfeng Feb 26, 2021
2c261bb
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Feb 26, 2021
80beda8
Fix behavior in Scala
karenfeng Feb 27, 2021
e1719d3
Fix nested expression
karenfeng Feb 27, 2021
6fa70ba
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Mar 2, 2021
0ba1916
Address comments
karenfeng Mar 4, 2021
2b7e730
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Mar 4, 2021
0c116a5
Push SQL output
karenfeng Mar 4, 2021
bf87f55
Re-do changes
karenfeng Mar 7, 2021
b5dc44f
Fixup tag
karenfeng Mar 8, 2021
6e32b3d
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Mar 8, 2021
7c3f5df
Add another failing test
karenfeng Mar 8, 2021
181751a
Merge add metadata and resolve missing references
karenfeng Mar 29, 2021
ad5e824
Resolve down to avoid prematurely projecting out
karenfeng Mar 29, 2021
e36e853
Remove printlns
karenfeng Mar 29, 2021
db44c53
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Mar 29, 2021
73b7c8a
Clean up
karenfeng Mar 29, 2021
1eb01e2
Style fixup
karenfeng Mar 29, 2021
9fd2490
Formatting fix
karenfeng Mar 29, 2021
f5cc3ae
Address comments
karenfeng Mar 30, 2021
fa7207e
Metadata output should be empty by default
karenfeng Mar 31, 2021
7af12ae
Clean up
karenfeng Mar 31, 2021
07f9ad5
Always resolve with metadata
karenfeng Mar 31, 2021
c474745
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 6, 2021
0f267e7
Revert accidental metadata changes
karenfeng Apr 6, 2021
ed0270c
Revert DatasourceV2SQLSuite
karenfeng Apr 6, 2021
66ad572
Revert unneeded style changes
karenfeng Apr 6, 2021
fc3b16d
Revert accidental metadata output change
karenfeng Apr 6, 2021
f665030
Add childrens' hidden output to Project metadataOutput
karenfeng Apr 7, 2021
85b81b1
Retrigger tests
karenfeng Apr 7, 2021
eab7964
Add comments
karenfeng Apr 7, 2021
44ee9f8
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 7, 2021
c84f396
Address comments and fix propagation through Projects
karenfeng Apr 7, 2021
0fe04a2
Retrigger tests
karenfeng Apr 7, 2021
c7c3df6
Retrigger tests
karenfeng Apr 7, 2021
b1bf28d
Retrigger tests
karenfeng Apr 8, 2021
47be66d
Wording
karenfeng Apr 9, 2021
8c5144e
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 9, 2021
333a815
propagate hidden columns from nested NATURAL/USING JOINs
karenfeng Apr 12, 2021
49de5c5
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 12, 2021
9e62d7d
Retrigger tests
karenfeng Apr 12, 2021
446d4bc
address comments
karenfeng Apr 13, 2021
8f70c2d
Merge branch 'master' of https://github.com/apache/spark into spark-3…
karenfeng Apr 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -914,41 +914,30 @@ class Analyzer(override val catalogManager: CatalogManager)
* Adds metadata columns to output for child relations when nodes are missing resolved attributes.
*
* References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
* but the relation's output does not include the metadata columns until the relation is replaced
* using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
* relation's output, the analyzer will detect that nothing produces the columns.
* but the relation's output does not include the metadata columns until the relation is replaced.
* Unless this rule adds metadata to the relation's output, the analyzer will detect that nothing
* produces the columns.
*
* This rule only adds metadata columns when a node is resolved but is missing input from its
* children. This ensures that metadata columns are not added to the plan unless they are used. By
* checking only resolved nodes, this ensures that * expansion is already done so that metadata
* columns are not accidentally selected by *.
* columns are not accidentally selected by *. This rule resolves operators downwards to avoid
* projecting away metadata columns prematurely.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._

private def hasMetadataCol(plan: LogicalPlan): Boolean = {
plan.expressions.exists(_.find {
case a: Attribute => a.isMetadataCol
case _ => false
}.isDefined)
}
import org.apache.spark.sql.catalyst.util._

private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
case r: DataSourceV2Relation => r.withMetadataColumns()
case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
// Add metadata output to all node types
case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) =>
val inputAttrs = AttributeSet(node.children.flatMap(_.output))
val metaCols = node.expressions.flatMap(_.collect {
case a: Attribute if a.isMetadataCol && !inputAttrs.contains(a) => a
})
val metaCols = getMetadataAttributes(node).filterNot(inputAttrs.contains)
if (metaCols.isEmpty) {
node
} else {
val newNode = addMetadataCol(node)
// We should not change the output schema of the plan. We should project away the extr
// We should not change the output schema of the plan. We should project away the extra
// metadata columns if necessary.
if (newNode.sameOutput(node)) {
newNode
Expand All @@ -957,6 +946,36 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
}

private def getMetadataAttributes(plan: LogicalPlan): Seq[Attribute] = {
lazy val childMetadataOutput = plan.children.flatMap(_.metadataOutput)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can avoid building a new Seq frequently. The check can be
plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same to hasMetadataCol

plan.expressions.flatMap(_.collect {
case a: Attribute if a.isMetadataCol => a
case a: Attribute if childMetadataOutput.exists(_.exprId == a.exprId) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This occurs in the case that a column is resolved below the level at which it becomes labeled as metadata. For the NATURAL/USING JOIN, this occurs when the column is resolved at the level of the root table - it is only labeled as hidden when it is used as a key column in the join.

childMetadataOutput.find(_.exprId == a.exprId).get
})
}

private def hasMetadataCol(plan: LogicalPlan): Boolean = {
lazy val childMetadataOutput = plan.children.flatMap(_.metadataOutput)
val hasMetaCol = plan.expressions.exists(_.find {
case a: Attribute =>
// If an attribute is resolved before being labeled as metadata
// (i.e. from the originating Dataset), we check with expression ID
a.isMetadataCol || childMetadataOutput.exists(_.exprId == a.exprId)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
case _ => false
}.isDefined)
hasMetaCol
}

private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
case r: DataSourceV2Relation => r.withMetadataColumns()
case p: Project =>
p.copy(
projectList = p.metadataOutput ++ p.projectList,
child = addMetadataCol(p.child))
case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
}
}

/**
Expand Down Expand Up @@ -1897,10 +1916,10 @@ class Analyzer(override val catalogManager: CatalogManager)
}

/**
* This method tries to resolve expressions and find missing attributes recursively. Specially,
* when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved
* attributes which are missed from child output. This method tries to find the missing
* attributes out and add into the projection.
* This method tries to resolve expressions and find missing attributes recursively.
* Specifically, when the expressions used in `Sort` or `Filter` contain unresolved attributes
* or resolved attributes which are missing from child output. This method tries to find the
* missing attributes and add them into the projection.
*/
private def resolveExprsAndAddMissingAttrs(
exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = {
Expand Down Expand Up @@ -3144,7 +3163,9 @@ class Analyzer(override val catalogManager: CatalogManager)
joinType: JoinType,
joinNames: Seq[String],
condition: Option[Expression],
hint: JoinHint) = {
hint: JoinHint): LogicalPlan = {
import org.apache.spark.sql.catalyst.util._

val leftKeys = joinNames.map { keyName =>
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, left, "left")
Expand All @@ -3164,26 +3185,32 @@ class Analyzer(override val catalogManager: CatalogManager)
val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att))

// the output list looks like: join keys, columns from left, columns from right
val projectList = joinType match {
val (projectList, hiddenList) = joinType match {
case LeftOuter =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
(leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)), rightKeys)
case LeftExistence(_) =>
leftKeys ++ lUniqueOutput
(leftKeys ++ lUniqueOutput, Seq.empty)
case RightOuter =>
rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
(rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput, leftKeys)
case FullOuter =>
// in full outer join, joinCols should be non-null if there is.
val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() }
joinedCols ++
(joinedCols ++
lUniqueOutput.map(_.withNullability(true)) ++
rUniqueOutput.map(_.withNullability(true))
rUniqueOutput.map(_.withNullability(true)),
leftKeys ++ rightKeys)
case _ : InnerLike =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput
(leftKeys ++ lUniqueOutput ++ rUniqueOutput, rightKeys)
case _ =>
sys.error("Unsupported natural join type " + joinType)
}
// use Project to trim unnecessary fields
Project(projectList, Join(left, right, joinType, newCondition, hint))
// use Project to hide duplicated common keys
// propagate hidden columns from nested USING/NATURAL JOINs
val project = Project(projectList, Join(left, right, joinType, newCondition, hint))
project.setTagValue(
Project.hiddenOutputTag,
hiddenList.map(_.asHiddenCol()) ++ project.child.metadataOutput.filter(_.isHiddenCol))
project
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
Expand Down Expand Up @@ -340,11 +340,11 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
* Returns true if the nameParts is a subset of the last elements of qualifier of the attribute.
*
* For example, the following should all return true:
* - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", "ns2", "t") and
* - `SELECT ns1.ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns1", "ns2", "t") and
* qualifier is Seq("ns1", "ns2", "t").
* - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and
* - `SELECT ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns2", "t") and
* qualifier is Seq("ns1", "ns2", "t").
* - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and
* - `SELECT t.* FROM ns1.ns2.t` where nameParts is Seq("t") and
* qualifier is Seq("ns1", "ns2", "t").
*/
private def matchedQualifier(
Expand All @@ -366,10 +366,13 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
override def expand(
input: LogicalPlan,
resolver: Resolver): Seq[NamedExpression] = {
// If there is no table specified, use all input attributes.
// If there is no table specified, use all non-hidden input attributes.
if (target.isEmpty) return input.output

val expandedAttributes = input.output.filter(matchedQualifier(_, target.get, resolver))
// If there is a table specified, use hidden input attributes as well
val hiddenOutput = input.metadataOutput.filter(_.isHiddenCol)
val expandedAttributes = (hiddenOutput ++ input.output).filter(
matchedQualifier(_, target.get, resolver))

if (expandedAttributes.nonEmpty) return expandedAttributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
if (!analyzed) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
if (self fastEquals afterRuleOnChildren) {
val newNode = if (self fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(self, identity[LogicalPlan])
}
Expand All @@ -96,6 +96,8 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
}
}
newNode.copyTagsFrom(this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exists in transformUp, but not in resolveOperatorsUp - was the difference intentional or unintentional? Without the tags, the metadata cannot be resolved properly (isMetadataCol is always false).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a mistake.

newNode
}
} else {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.trees.TreePattern.{INNER_LIKE_JOIN, JOIN,
LEFT_SEMI_OR_ANTI_JOIN, OUTER_JOIN, TreePattern}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.random.RandomSampler

/**
* When planning take() or collect() operations, this special node that is inserted at the top of
* When planning take() or collect() operations, this special node is inserted at the top of
* the logical plan before invoking the query planner.
*
* Rules can pattern-match on this node in order to apply transformations that only take effect
Expand Down Expand Up @@ -68,7 +69,6 @@ object Subquery {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def metadataOutput: Seq[Attribute] = Nil
override def maxRows: Option[Long] = child.maxRows

override lazy val resolved: Boolean = {
Expand All @@ -85,10 +85,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
override lazy val validConstraints: ExpressionSet =
getAllValidConstraints(projectList)

override def metadataOutput: Seq[Attribute] =
getTagValue(Project.hiddenOutputTag).getOrElse(Nil)

override protected def withNewChildInternal(newChild: LogicalPlan): Project =
copy(child = newChild)
}

object Project {
val hiddenOutputTag: TreeNodeTag[Seq[Attribute]] = TreeNodeTag[Seq[Attribute]]("hidden_output")
}

/**
* Applies a [[Generator]] to a stream of input rows, combining the
* output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
Expand Down Expand Up @@ -1077,7 +1084,7 @@ case class SubqueryAlias(

override def metadataOutput: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
child.metadataOutput.map(_.withQualifier(qualifierList))
child.metadataOutput.filterNot(_.isHiddenCol).map(_.withQualifier(qualifierList))
}

override def doCanonicalize(): LogicalPlan = child.canonicalized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{NumericType, StringType}
import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -201,4 +201,30 @@ package object util extends Logging {
def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
truncatedString(seq, "", sep, "", maxFields)
}

val METADATA_COL_ATTR_KEY = "__metadata_col"

/**
* Hidden columns are a type of metadata column that are candidates during qualified star
* star expansions. They are propagated through Projects that have hidden children output,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment needs update again.

* so that nested hidden output is not lost.
*/
val HIDDEN_COL_ATTR_KEY = "__hidden_col"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic is clear now, let's refine the naming.

We only have metadata column, and metadata column can be included in qualified star if required. We can just add a new property to metadata columns to indicate it.

The property name can be __support_qualified_star, and the helper class can be

implicit class MetadataColumnHelper(attr: Attribute) {
  def isMetadataCol: Boolean ...
  def supportQualifiedStar: Boolean ...
  def markAsSupportQualifiedStar: Attribute ...
}


implicit class SpecialColumnHelper(attr: Attribute) {
def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)

def isHiddenCol: Boolean = attr.isMetadataCol &&
attr.metadata.contains(HIDDEN_COL_ATTR_KEY) &&
attr.metadata.getBoolean(HIDDEN_COL_ATTR_KEY)

def asHiddenCol(): Attribute = attr.withMetadata(
new MetadataBuilder()
.withMetadata(attr.metadata)
.putBoolean(METADATA_COL_ATTR_KEY, true)
.putBoolean(HIDDEN_COL_ATTR_KEY, true)
.build()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object DataSourceV2Implicits {
private val METADATA_COL_ATTR_KEY = "__metadata_col"

implicit class TableHelper(table: Table) {
def asReadable: SupportsRead = {
table match {
Expand Down Expand Up @@ -101,11 +100,6 @@ object DataSourceV2Implicits {
def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
}

implicit class MetadataColumnHelper(attr: Attribute) {
def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
}

implicit class OptionsHelper(options: Map[String, String]) {
def asOptions: CaseInsensitiveStringMap = {
new CaseInsensitiveStringMap(options.asJava)
Expand Down
Loading