Skip to content

Commit

Permalink
[SPARK-27314][SQL] Deduplicate exprIds for Union.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

We have been having a potential problem with `Union` when the children have the same expression id in their outputs, which happens when self-union.

## How was this patch tested?

Modified some tests to adjust plan changes.

Closes #24236 from ueshin/issues/SPARK-27314/dedup_union.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
ueshin authored and cloud-fan committed Mar 29, 2019
1 parent 61561c1 commit f176dd3
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,22 @@ class Analyzer(
i.copy(right = dedupRight(left, right))
case e @ Except(left, right, _) if !e.duplicateResolved =>
e.copy(right = dedupRight(left, right))
case u @ Union(children) if !u.duplicateResolved =>
// Use projection-based de-duplication for Union to avoid breaking the checkpoint sharing
// feature in streaming.
val newChildren = children.foldRight(Seq.empty[LogicalPlan]) { (head, tail) =>
head +: tail.map {
case child if head.outputSet.intersect(child.outputSet).isEmpty =>
child
case child =>
val projectList = child.output.map { attr =>
Alias(attr, attr.name)()
}
Project(projectList, child)
}
}
u.copy(children = newChildren)

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on its descendants
case s @ Sort(ordering, global, child) if child.resolved && !s.resolved =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
}
}

def duplicateResolved: Boolean = {
children.map(_.outputSet.size).sum ==
AttributeSet.fromAttributeSets(children.map(_.outputSet)).size
}

// updating nullability to make all the children consistent
override def output: Seq[Attribute] =
children.map(_.output).transpose.map(attrs =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ class FoldablePropagationSuite extends PlanTest {

test("Propagate in inner join") {
val ta = testRelation.select('a, Literal(1).as('tag))
.union(testRelation.select('a, Literal(2).as('tag)))
.union(testRelation.select('a.as('a), Literal(2).as('tag)))
.subquery('ta)
val tb = testRelation.select('a, Literal(1).as('tag))
.union(testRelation.select('a, Literal(2).as('tag)))
.union(testRelation.select('a.as('a), Literal(2).as('tag)))
.subquery('tb)
val query = ta.join(tb, Inner,
Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag".attr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
}

test("Pushing a single nested field projection - negative") {
val ops = Array(
val ops = Seq(
(input: LogicalPlan) => input.distribute('name)(1),
(input: LogicalPlan) => input.distribute($"name.middle")(1),
(input: LogicalPlan) => input.orderBy('name.asc),
Expand All @@ -156,11 +156,15 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
.analyze
}

val optimizedQueries = queries.map(Optimize.execute)
val expectedQueries = queries
val optimizedQueries :+ optimizedUnion = queries.map(Optimize.execute)
val expectedQueries = queries.init
optimizedQueries.zip(expectedQueries).foreach { case (optimized, expected) =>
comparePlans(optimized, expected)
}
val expectedUnion =
contact.select('name).union(contact.select('name.as('name)))
.select(GetStructField('name, 1, Some("middle"))).analyze
comparePlans(optimizedUnion, expectedUnion)
}

test("Pushing a single nested field projection through filters - negative") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,14 @@ class OptimizerRuleExclusionSuite extends PlanTest {
PropagateEmptyRelation.ruleName,
CombineUnions.ruleName)

val testRelation1 = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation2 = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation3 = LocalRelation('a.int, 'b.int, 'c.int)

withSQLConf(
OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) {
val optimizer = new SimpleTestOptimizer()
val originalQuery = testRelation.union(testRelation.union(testRelation)).analyze
val originalQuery = testRelation1.union(testRelation2.union(testRelation3)).analyze
val optimized = optimizer.execute(originalQuery)
comparePlans(originalQuery, optimized)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SetOperationSuite extends PlanTest {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Union Pushdown", Once,
Batch("Union Pushdown", FixedPoint(5),
CombineUnions,
PushProjectionThroughUnion,
PushDownPredicate,
Expand All @@ -44,8 +44,8 @@ class SetOperationSuite extends PlanTest {
val testUnion = Union(testRelation :: testRelation2 :: testRelation3 :: Nil)

test("union: combine unions into one unions") {
val unionQuery1 = Union(Union(testRelation, testRelation2), testRelation)
val unionQuery2 = Union(testRelation, Union(testRelation2, testRelation))
val unionQuery1 = Union(Union(testRelation, testRelation2), testRelation3)
val unionQuery2 = Union(testRelation, Union(testRelation2, testRelation3))
val unionOptimized1 = Optimize.execute(unionQuery1.analyze)
val unionOptimized2 = Optimize.execute(unionQuery2.analyze)

Expand Down Expand Up @@ -93,7 +93,7 @@ class SetOperationSuite extends PlanTest {
val unionQuery1 = Distinct(Union(Distinct(Union(query1, query2)), query3)).analyze
val optimized1 = Optimize.execute(unionQuery1)
val distinctUnionCorrectAnswer1 =
Distinct(Union(query1 :: query2 :: query3 :: Nil)).analyze
Distinct(Union(query1 :: query2 :: query3 :: Nil))
comparePlans(distinctUnionCorrectAnswer1, optimized1)

// query1
Expand All @@ -107,7 +107,7 @@ class SetOperationSuite extends PlanTest {
Distinct(Union(query2, query3)))).analyze
val optimized2 = Optimize.execute(unionQuery2)
val distinctUnionCorrectAnswer2 =
Distinct(Union(query1 :: query2 :: query2 :: query3 :: Nil)).analyze
Distinct(Union(query1 :: query2 :: query2 :: query3 :: Nil))
comparePlans(distinctUnionCorrectAnswer2, optimized2)
}

Expand Down

0 comments on commit f176dd3

Please sign in to comment.