From 95afce5f02f86c1aa3cf22f81c79d8d9e46c1e0a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 6 Feb 2016 16:52:48 -0800 Subject: [PATCH 1/4] "intersect all" --- .../spark/sql/catalyst/parser/SparkSqlParser.g | 7 +++++-- .../apache/spark/sql/catalyst/CatalystQl.scala | 6 ++++-- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/HiveTypeCoercion.scala | 13 +++++++++---- .../apache/spark/sql/catalyst/dsl/package.scala | 3 ++- .../spark/sql/catalyst/optimizer/Optimizer.scala | 15 ++++++++++----- .../catalyst/plans/logical/basicOperators.scala | 5 ++++- .../catalyst/analysis/HiveTypeCoercionSuite.scala | 8 ++++---- .../catalyst/optimizer/ReplaceOperatorSuite.scala | 2 +- .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 5 +++-- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 9 ++++++++- 13 files changed, 53 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g index 9935678ca2ca2..e0931f8e30f20 100644 --- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g +++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g @@ -89,7 +89,8 @@ TOK_SORTBY; TOK_UNIONALL; TOK_UNIONDISTINCT; TOK_EXCEPT; -TOK_INTERSECT; +TOK_INTERSECTALL; +TOK_INTERSECTDISTINCT; TOK_JOIN; TOK_LEFTOUTERJOIN; TOK_RIGHTOUTERJOIN; @@ -2222,7 +2223,9 @@ setOperator : KW_UNION KW_ALL -> ^(TOK_UNIONALL) | KW_UNION KW_DISTINCT? -> ^(TOK_UNIONDISTINCT) | KW_EXCEPT -> ^(TOK_EXCEPT) - | KW_INTERSECT -> ^(TOK_INTERSECT) + | KW_INTERSECT (all=KW_ALL | distinct=KW_DISTINCT)? + -> {$all == null}? ^(TOK_INTERSECTDISTINCT) + -> ^(TOK_INTERSECTALL) ; queryStatementExpression[boolean topLevel] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala index a42360d5629f8..a4fdfdf5926ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala @@ -439,8 +439,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C Distinct(Union(nodeToPlan(left), nodeToPlan(right))) case Token("TOK_EXCEPT", left :: right :: Nil) => Except(nodeToPlan(left), nodeToPlan(right)) - case Token("TOK_INTERSECT", left :: right :: Nil) => - Intersect(nodeToPlan(left), nodeToPlan(right)) + case Token("TOK_INTERSECTALL", left :: right :: Nil) => + Intersect(nodeToPlan(left), nodeToPlan(right), distinct = false) + case Token("TOK_INTERSECTDISTINCT", left :: right :: Nil) => + Intersect(nodeToPlan(left), nodeToPlan(right), distinct = true) case _ => noParseRule("Plan", node) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b59eb12419c45..77cbcd2f81f50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -451,7 +451,7 @@ class Analyzer( // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, _) if !j.duplicateResolved => j.copy(right = dedupRight(left, right)) - case i @ Intersect(left, right) if !i.duplicateResolved => + case i @ Intersect(left, right, _) if !i.duplicateResolved => i.copy(right = dedupRight(left, right)) // When resolve `SortOrder`s in Sort based on child, don't report errors as diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 57bdb164e1a0d..c7f6e505318dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -24,7 +24,6 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types._ @@ -223,16 +222,22 @@ object HiveTypeCoercion { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case p if p.analyzed => p - case s @ SetOperation(left, right) if s.childrenResolved && + case s @ Except(left, right) if s.childrenResolved && left.output.length == right.output.length && !s.resolved => val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil) assert(newChildren.length == 2) - s.makeCopy(Array(newChildren.head, newChildren.last)) + Except(newChildren.head, newChildren.last) + + case s @ Intersect(left, right, distinct) if s.childrenResolved && + left.output.length == right.output.length && !s.resolved => + val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil) + assert(newChildren.length == 2) + Intersect(newChildren.head, newChildren.last, distinct) case s: Union if s.childrenResolved && s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children) - s.makeCopy(Array(newChildren)) + Union(newChildren) } /** Build new children with the widest types for each attribute among all the children */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 5ac1984043d87..5ef87cd3a72df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -272,7 +272,8 @@ package object dsl { def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan) - def intersect(otherPlan: LogicalPlan): LogicalPlan = Intersect(logicalPlan, otherPlan) + def intersect(otherPlan: LogicalPlan): LogicalPlan = + Intersect(logicalPlan, otherPlan, distinct = true) def unionAll(otherPlan: LogicalPlan): LogicalPlan = Union(logicalPlan, otherPlan) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a1ac93073916c..bf019c05bc5e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1059,19 +1059,24 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { * {{{ * SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2 * ==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2 + * SELECT a1, a2 FROM Tab1 INTERSECT ALL SELECT b1, b2 FROM Tab2 + * ==> SELECT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2 * }}} * - * Note: - * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL. - * 2. This rule has to be done after de-duplicating the attributes; otherwise, the generated + * Note: This rule has to be done after de-duplicating the attributes; otherwise, the generated * join conditions will be incorrect. */ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Intersect(left, right) => + case Intersect(left, right, distinct) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } - Distinct(Join(left, right, LeftSemi, joinCond.reduceLeftOption(And))) + if (distinct) { + Distinct(Join(left, right, LeftSemi, joinCond.reduceLeftOption(And))) + } + else { + Join(left, right, LeftSemi, joinCond.reduceLeftOption(And)) + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 03a79520cbd3a..438bb904f41d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -113,7 +113,10 @@ private[sql] object SetOperation { def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right)) } -case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { +case class Intersect( + left: LogicalPlan, + right: LogicalPlan, + distinct: Boolean) extends SetOperation(left, right) { def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala index c30434a0063b0..5fc11a575af3d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala @@ -409,7 +409,7 @@ class HiveTypeCoercionSuite extends PlanTest { val expectedTypes = Seq(StringType, DecimalType.SYSTEM_DEFAULT, FloatType, DoubleType) val r1 = wt(Except(firstTable, secondTable)).asInstanceOf[Except] - val r2 = wt(Intersect(firstTable, secondTable)).asInstanceOf[Intersect] + val r2 = wt(Intersect(firstTable, secondTable, distinct = true)).asInstanceOf[Intersect] checkOutput(r1.left, expectedTypes) checkOutput(r1.right, expectedTypes) checkOutput(r2.left, expectedTypes) @@ -486,7 +486,7 @@ class HiveTypeCoercionSuite extends PlanTest { val r1 = dp(Union(left1, right1)).asInstanceOf[Union] val r2 = dp(Except(left1, right1)).asInstanceOf[Except] - val r3 = dp(Intersect(left1, right1)).asInstanceOf[Intersect] + val r3 = dp(Intersect(left1, right1, distinct = true)).asInstanceOf[Intersect] checkOutput(r1.children.head, expectedType1) checkOutput(r1.children.last, expectedType1) @@ -507,7 +507,7 @@ class HiveTypeCoercionSuite extends PlanTest { val r1 = dp(Union(plan1, plan2)).asInstanceOf[Union] val r2 = dp(Except(plan1, plan2)).asInstanceOf[Except] - val r3 = dp(Intersect(plan1, plan2)).asInstanceOf[Intersect] + val r3 = dp(Intersect(plan1, plan2, distinct = true)).asInstanceOf[Intersect] checkOutput(r1.children.last, Seq(expectedType)) checkOutput(r2.right, Seq(expectedType)) @@ -515,7 +515,7 @@ class HiveTypeCoercionSuite extends PlanTest { val r4 = dp(Union(plan2, plan1)).asInstanceOf[Union] val r5 = dp(Except(plan2, plan1)).asInstanceOf[Except] - val r6 = dp(Intersect(plan2, plan1)).asInstanceOf[Intersect] + val r6 = dp(Intersect(plan2, plan1, distinct = true)).asInstanceOf[Intersect] checkOutput(r4.children.last, Seq(expectedType)) checkOutput(r5.left, Seq(expectedType)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index f8ae5d9be2084..f69dabf8fa71b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -36,7 +36,7 @@ class ReplaceOperatorSuite extends PlanTest { val table1 = LocalRelation('a.int, 'b.int) val table2 = LocalRelation('c.int, 'd.int) - val query = Intersect(table1, table2) + val query = Intersect(table1, table2, distinct = true) val optimized = Optimize.execute(query.analyze) val correctAnswer = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f15b926bd27cf..be59f39235e90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1016,7 +1016,7 @@ class DataFrame private[sql]( * @since 1.3.0 */ def intersect(other: DataFrame): DataFrame = withPlan { - Intersect(logicalPlan, other.logicalPlan) + Intersect(logicalPlan, other.logicalPlan, distinct = true) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f182270a08729..a4776df7d6d91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -594,8 +594,9 @@ class Dataset[T] private[sql]( * and thus is not affected by a custom `equals` function defined on `T`. * @since 1.6.0 */ - def intersect(other: Dataset[T]): Dataset[T] = withPlan[T](other)(Intersect) - + def intersect(other: Dataset[T]): Dataset[T] = withPlan[T](other){ (left, right) => + Intersect(left, right, distinct = true) + } /** * Returns a new [[Dataset]] that contains the elements of both this and the `other` [[Dataset]] * combined. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 9293e55141757..3ac4a41df6c8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -298,7 +298,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => throw new IllegalStateException( "logical distinct operator should have been replaced by aggregate in the optimizer") - case logical.Intersect(left, right) => + case logical.Intersect(left, right, _) => throw new IllegalStateException( "logical intersect operator should have been replaced by semi-join in the optimizer") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8ef7b61314a56..a2a5c995ae63b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -931,7 +931,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData"), Nil) } - test("INTERSECT") { + test("INTERSECT ALL/DISTINCT") { checkAnswer( sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"), Row(1, "a") :: @@ -940,6 +940,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(4, "d") :: Nil) checkAnswer( sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData"), Nil) + + checkAnswer( + sql("SELECT a FROM testData2 INTERSECT DISTINCT SELECT a FROM testData2"), + Row(1) :: Row(2) :: Row(3) :: Nil) + checkAnswer( + sql("SELECT a FROM testData2 INTERSECT ALL SELECT a FROM testData2"), + Row(1) :: Row(1) :: Row(2) :: Row(2) :: Row(3) :: Row(3) ::Nil) } test("SET commands semantics using sql()") { From 796e725955e95505a5c2108ee3691be8beecd8a7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 7 Feb 2016 17:01:02 -0800 Subject: [PATCH 2/4] address comments. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bf019c05bc5e4..1ab4586b46792 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1070,7 +1070,7 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Intersect(left, right, distinct) => assert(left.output.size == right.output.size) - val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } + val joinCond = left.output.zip(right.output).map(EqualNullSafe.tupled) if (distinct) { Distinct(Join(left, right, LeftSemi, joinCond.reduceLeftOption(And))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a4776df7d6d91..0fddafbee611c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -594,7 +594,7 @@ class Dataset[T] private[sql]( * and thus is not affected by a custom `equals` function defined on `T`. * @since 1.6.0 */ - def intersect(other: Dataset[T]): Dataset[T] = withPlan[T](other){ (left, right) => + def intersect(other: Dataset[T]): Dataset[T] = withPlan[T](other) { (left, right) => Intersect(left, right, distinct = true) } /** @@ -605,7 +605,7 @@ class Dataset[T] private[sql]( * duplicate items. As such, it is analogous to `UNION ALL` in SQL. * @since 1.6.0 */ - def union(other: Dataset[T]): Dataset[T] = withPlan[T](other){ (left, right) => + def union(other: Dataset[T]): Dataset[T] = withPlan[T](other) { (left, right) => // This breaks caching, but it's usually ok because it addresses a very specific use case: // using union to union many files or partitions. CombineUnions(Union(left, right)) From afd1725e815087244c753abe4198fa1746b00f9f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 8 Feb 2016 10:20:33 -0800 Subject: [PATCH 3/4] addressed the comments. --- .../org/apache/spark/sql/catalyst/parser/SparkSqlParser.g | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g index e0931f8e30f20..5d4601c3e12de 100644 --- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g +++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g @@ -2223,9 +2223,8 @@ setOperator : KW_UNION KW_ALL -> ^(TOK_UNIONALL) | KW_UNION KW_DISTINCT? -> ^(TOK_UNIONDISTINCT) | KW_EXCEPT -> ^(TOK_EXCEPT) - | KW_INTERSECT (all=KW_ALL | distinct=KW_DISTINCT)? - -> {$all == null}? ^(TOK_INTERSECTDISTINCT) - -> ^(TOK_INTERSECTALL) + | KW_INTERSECT KW_ALL -> ^(TOK_INTERSECTALL) + | KW_INTERSECT KW_DISTINCT? -> ^(TOK_INTERSECTDISTINCT) ; queryStatementExpression[boolean topLevel] From 8a0a0b23ad1b553ad558adf61c3d73774b89e7db Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 9 Feb 2016 06:58:16 -0800 Subject: [PATCH 4/4] addressed the comments. --- .../spark/sql/catalyst/analysis/HiveTypeCoercion.scala | 10 ++++------ .../spark/sql/catalyst/optimizer/Optimizer.scala | 6 +++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index c7f6e505318dc..de7c912452933 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -224,15 +224,13 @@ object HiveTypeCoercion { case s @ Except(left, right) if s.childrenResolved && left.output.length == right.output.length && !s.resolved => - val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil) - assert(newChildren.length == 2) - Except(newChildren.head, newChildren.last) + val Seq(newLeft, newRight) = buildNewChildrenWithWiderTypes(left :: right :: Nil) + Except(newLeft, newRight) case s @ Intersect(left, right, distinct) if s.childrenResolved && left.output.length == right.output.length && !s.resolved => - val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil) - assert(newChildren.length == 2) - Intersect(newChildren.head, newChildren.last, distinct) + val Seq(newLeft, newRight) = buildNewChildrenWithWiderTypes(left :: right :: Nil) + Intersect(newLeft, newRight, distinct) case s: Union if s.childrenResolved && s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1ab4586b46792..4a08fe3998831 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1070,12 +1070,12 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Intersect(left, right, distinct) => assert(left.output.size == right.output.size) - val joinCond = left.output.zip(right.output).map(EqualNullSafe.tupled) + val joinCond = left.output.zip(right.output).map(EqualNullSafe.tupled).reduceLeftOption(And) if (distinct) { - Distinct(Join(left, right, LeftSemi, joinCond.reduceLeftOption(And))) + Distinct(Join(left, right, LeftSemi, joinCond)) } else { - Join(left, right, LeftSemi, joinCond.reduceLeftOption(And)) + Join(left, right, LeftSemi, joinCond) } } }