Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34354][SQL] Fix failure when apply CostBasedJoinReorder on self-join #31470

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fccb34a
deduplicate relations
cloud-fan Jan 4, 2021
2ed60b0
fix
Ngone51 Feb 4, 2021
06bb709
fix SPARK-34319
Ngone51 Feb 4, 2021
2e440e9
fix SPARK-25278
Ngone51 Feb 8, 2021
fc05305
fix stream self join
Ngone51 Feb 8, 2021
77c9b05
regen golden
Ngone51 Feb 8, 2021
9052170
fix regression of reuse exchange
Ngone51 Mar 9, 2021
ad2499d
shortcut import
Ngone51 Mar 9, 2021
fec4694
fix typo
Ngone51 Mar 9, 2021
33f5969
add jira id
Ngone51 Mar 9, 2021
272afdf
add comment
Ngone51 Mar 9, 2021
f17557e
fix AnalysisSuite
Ngone51 Mar 15, 2021
53cc894
fix LeftSemiPushdownSuite
Ngone51 Mar 15, 2021
29c00d4
fix
Ngone51 Mar 18, 2021
1acaea5
fix
Ngone51 Mar 19, 2021
ba875b3
fix
Ngone51 Mar 22, 2021
a67444c
regen
Ngone51 Mar 22, 2021
1dbba32
fix SPARK-22748
Ngone51 Mar 23, 2021
0a41d26
gen golden files
Ngone51 Mar 23, 2021
1a9f23b
Revert "gen golden files"
Ngone51 Mar 23, 2021
eb23ddf
Revert "regen"
Ngone51 Mar 23, 2021
89cd341
Revert "fix regression of reuse exchange"
Ngone51 Mar 23, 2021
4dc1ea7
Revert "regen golden"
Ngone51 Mar 23, 2021
2f03409
add back subquery fix
Ngone51 Mar 23, 2021
f1160f2
gen golden files
Ngone51 Mar 23, 2021
6fcf259
add comment
Ngone51 Mar 24, 2021
e4c53e5
Merge branch 'master' of github.com:apache/spark into join-reorder
Ngone51 Mar 24, 2021
9211170
gen golden
Ngone51 Mar 25, 2021
ace12e8
Merge branch 'master' of github.com:apache/spark into join-reorder
Ngone51 Mar 25, 2021
7e46f72
Revert "gen golden"
Ngone51 Mar 25, 2021
f0a2754
revert
Ngone51 Mar 25, 2021
39f1615
regen golden files
Ngone51 Mar 25, 2021
36a74ad
fix hasConflictingAttrs
Ngone51 Mar 26, 2021
8bbacf6
compare exprId
Ngone51 Mar 26, 2021
a04c4e7
skip DeduplicateRelations if no conflicting attributes
Ngone51 Mar 26, 2021
b27d4d3
remove uncessary pattern match
Ngone51 Mar 26, 2021
31599d3
Merge branch 'master' of github.com:apache/spark into join-reorder
Ngone51 Mar 26, 2021
6243b43
revert skip if no conflicting attrs
Ngone51 Mar 26, 2021
c6f9714
gen golden file
Ngone51 Mar 26, 2021
5d85df3
match SubqueryExpression
Ngone51 Mar 29, 2021
f0c7ce4
fix scala2.13 error
Ngone51 Mar 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -253,6 +253,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveTables ::
ResolvePartitionSpec ::
AddMetadataColumns ::
DeduplicateRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
Expand Down Expand Up @@ -1433,124 +1434,30 @@ class Analyzer(override val catalogManager: CatalogManager)
* a logical plan node's children.
*/
object ResolveReferences extends Rule[LogicalPlan] {
/**
* Generate a new logical plan for the right child with different expression IDs
* for all conflicting attributes.
*/
private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = {
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " +
s"between $left and $right")

/**
* For LogicalPlan likes MultiInstanceRelation, Project, Aggregate, etc, whose output doesn't
* inherit directly from its children, we could just stop collect on it. Because we could
* always replace all the lower conflict attributes with the new attributes from the new
* plan. Theoretically, we should do recursively collect for Generate and Window but we leave
* it to the next batch to reduce possible overhead because this should be a corner case.
*/
def collectConflictPlans(plan: LogicalPlan): Seq[(LogicalPlan, LogicalPlan)] = plan match {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.newInstance()
newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))

case oldVersion: SerializeFromObject
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(
serializer = oldVersion.serializer.map(_.newInstance()))))

// Handle projects that create conflicting aliases.
case oldVersion @ Project(projectList, _)
if findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(projectList = newAliases(projectList))))

// We don't need to search child plan recursively if the projectList of a Project
// is only composed of Alias and doesn't contain any conflicting attributes.
// Because, even if the child plan has some conflicting attributes, the attributes
// will be aliased to non-conflicting attributes by the Project at the end.
case _ @ Project(projectList, _)
if findAliases(projectList).size == projectList.size =>
Nil

case oldVersion @ Aggregate(_, aggregateExpressions, _)
if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(
aggregateExpressions = newAliases(aggregateExpressions))))

// We don't search the child plan recursively for the same reason as the above Project.
case _ @ Aggregate(_, aggregateExpressions, _)
if findAliases(aggregateExpressions).size == aggregateExpressions.size =>
Nil

case oldVersion @ FlatMapGroupsInPandas(_, _, output, _)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))

case oldVersion @ FlatMapCoGroupsInPandas(_, _, _, output, _, _)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))

case oldVersion @ MapInPandas(_, output, _)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))

case oldVersion: Generate
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
Seq((oldVersion, oldVersion.copy(generatorOutput = newOutput)))

case oldVersion: Expand
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
val producedAttributes = oldVersion.producedAttributes
val newOutput = oldVersion.output.map { attr =>
if (producedAttributes.contains(attr)) {
attr.newInstance()
} else {
attr
}
}
Seq((oldVersion, oldVersion.copy(output = newOutput)))

case oldVersion @ Window(windowExpressions, _, _, child)
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
Seq((oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions))))

case oldVersion @ ScriptTransformation(_, _, output, _, _)
if AttributeSet(output).intersect(conflictingAttributes).nonEmpty =>
Seq((oldVersion, oldVersion.copy(output = output.map(_.newInstance()))))

case _ => plan.children.flatMap(collectConflictPlans)
}

val conflictPlans = collectConflictPlans(right)

/*
* Note that it's possible `conflictPlans` can be empty which implies that there
* is a logical plan node that produces new references that this rule cannot handle.
* When that is the case, there must be another rule that resolves these conflicts.
* Otherwise, the analysis will fail.
*/
if (conflictPlans.isEmpty) {
right
} else {
val planMapping = conflictPlans.toMap
right.transformUpWithNewOutput {
case oldPlan =>
val newPlanOpt = planMapping.get(oldPlan)
newPlanOpt.map { newPlan =>
newPlan -> oldPlan.output.zip(newPlan.output)
}.getOrElse(oldPlan -> Nil)
/** Return true if there're conflicting attributes among children's outputs of a plan */
def hasConflictingAttrs(p: LogicalPlan): Boolean = {
p.children.length > 1 && {
// Note that duplicated attributes are allowed within a single node,
// e.g., df.select($"a", $"a"), so we should only check conflicting
// attributes between nodes.
val uniqueAttrs = mutable.HashSet[ExprId]()
p.children.head.outputSet.foreach(a => uniqueAttrs.add(a.exprId))
p.children.tail.exists { child =>
val uniqueSize = uniqueAttrs.size
val childSize = child.outputSet.size
child.outputSet.foreach(a => uniqueAttrs.add(a.exprId))
uniqueSize + childSize > uniqueAttrs.size
}
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case p: LogicalPlan if !p.childrenResolved => p

// Wait for the rule `DeduplicateRelations` to resolve conflicting attrs first.
case p: LogicalPlan if hasConflictingAttrs(p) => p

// If the projection list contains Stars, expand it.
case p: Project if containsStar(p.projectList) =>
p.copy(projectList = buildExpandedProjectList(p.projectList, p.child))
Expand All @@ -1572,37 +1479,12 @@ class Analyzer(override val catalogManager: CatalogManager)
case g: Generate if containsStar(g.generator.children) =>
throw QueryCompilationErrors.invalidStarUsageError("explode/json_tuple/UDTF")

// To resolve duplicate expression IDs for Join and Intersect
case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
case f @ FlatMapCoGroupsInPandas(leftAttributes, rightAttributes, _, _, left, right) =>
val leftRes = leftAttributes
.map(x => resolveExpressionByPlanOutput(x, left).asInstanceOf[Attribute])
val rightRes = rightAttributes
.map(x => resolveExpressionByPlanOutput(x, right).asInstanceOf[Attribute])
f.copy(leftAttributes = leftRes, rightAttributes = rightRes)
// intersect/except will be rewritten to join at the beginning of optimizer. Here we need to
// deduplicate the right side plan, so that we won't produce an invalid self-join later.
case i @ Intersect(left, right, _) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))
case e @ Except(left, right, _) if !e.duplicateResolved =>
e.copy(right = dedupRight(left, right))
// Only after we finish by-name resolution for Union
case u: Union if !u.byName && !u.duplicateResolved =>
// Use projection-based de-duplication for Union to avoid breaking the checkpoint sharing
// feature in streaming.
val newChildren = u.children.foldRight(Seq.empty[LogicalPlan]) { (head, tail) =>
head +: tail.map {
case child if head.outputSet.intersect(child.outputSet).isEmpty =>
child
case child =>
val projectList = child.output.map { attr =>
Alias(attr, attr.name)()
}
Project(projectList, child)
}
}
u.copy(children = newChildren)

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on its descendants
Expand Down Expand Up @@ -1676,9 +1558,6 @@ class Analyzer(override val catalogManager: CatalogManager)
// implementation and should be resolved based on the table schema.
o.copy(deleteExpr = resolveExpressionByPlanOutput(o.deleteExpr, o.table))

case m @ MergeIntoTable(targetTable, sourceTable, _, _, _) if !m.duplicateResolved =>
m.copy(sourceTable = dedupRight(targetTable, sourceTable))

case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
if !m.resolved && targetTable.resolved && sourceTable.resolved =>

Expand Down Expand Up @@ -1759,17 +1638,6 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = {
expressions.map {
case a: Alias => Alias(a.child, a.name)()
case other => other
}
}

def findAliases(projectList: Seq[NamedExpression]): AttributeSet = {
AttributeSet(projectList.collect { case a: Alias => a.toAttribute })
}

// This method is used to trim groupByExpressions/selectedGroupByExpressions's top-level
// GetStructField Alias. Since these expression are not NamedExpression originally,
// we are safe to trim top-level GetStructField Alias.
Expand Down