From 326d759c0a407a2ca9ea8a946ff84764031929b3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Jul 2015 19:33:50 -0700 Subject: [PATCH 1/4] Fix SPARK-9293 --- .../sql/catalyst/analysis/CheckAnalysis.scala | 6 ++++ .../catalyst/analysis/HiveTypeCoercion.scala | 14 +++----- .../plans/logical/basicOperators.scala | 36 +++++++++---------- .../analysis/AnalysisErrorSuite.scala | 17 +++++++++ 4 files changed, 44 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index c203fcecf20fb..4154777c8af1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -98,6 +98,12 @@ trait CheckAnalysis { aggregateExprs.foreach(checkValidAggregateExpression) + case s @ SetOperation(left, right) if left.output.length != right.output.length => + failAnalysis( + s"${s.nodeName} can only be performed on tables with the same number of columns, " + + s"but the left table has ${left.output.length} columns and the right has " + + s"${right.output.length}") + case _ => // Fallbacks to the following checks } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index d56ceeadc9e85..eef76615f1fe8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -181,6 +181,7 @@ object HiveTypeCoercion { planName: String, left: LogicalPlan, right: LogicalPlan): (LogicalPlan, LogicalPlan) = { + require(left.output.length == right.output.length) val castedTypes = left.output.zip(right.output).map { case (lhs, rhs) if lhs.dataType != rhs.dataType => @@ -218,15 +219,10 @@ object HiveTypeCoercion { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case u @ Union(left, right) if u.childrenResolved && !u.resolved => - val (newLeft, newRight) = widenOutputTypes(u.nodeName, left, right) - Union(newLeft, newRight) - case e @ Except(left, right) if e.childrenResolved && !e.resolved => - val (newLeft, newRight) = widenOutputTypes(e.nodeName, left, right) - Except(newLeft, newRight) - case i @ Intersect(left, right) if i.childrenResolved && !i.resolved => - val (newLeft, newRight) = widenOutputTypes(i.nodeName, left, right) - Intersect(newLeft, newRight) + case s @ SetOperation(left, right) if s.childrenResolved + && left.output.length == right.output.length && !s.resolved => + val (newLeft, newRight) = widenOutputTypes(s.nodeName, left, right) + s.makeCopy(Array(newLeft, newRight)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 6aefa9f67556a..f2a49194de8d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -89,13 +89,21 @@ case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } -case class Union(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { +abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { // TODO: These aren't really the same attributes as nullability etc might change. - override def output: Seq[Attribute] = left.output + final override def output: Seq[Attribute] = left.output - override lazy val resolved: Boolean = + final override lazy val resolved: Boolean = childrenResolved && - left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } + left.output.length == right.output.length && + left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } +} + +private[sql] object SetOperation { + def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right)) +} + +case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { override def statistics: Statistics = { val sizeInBytes = left.statistics.sizeInBytes + right.statistics.sizeInBytes @@ -103,6 +111,10 @@ case class Union(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { } } +case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) + +case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) + case class Join( left: LogicalPlan, right: LogicalPlan, @@ -139,15 +151,6 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } - -case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - override def output: Seq[Attribute] = left.output - - override lazy val resolved: Boolean = - childrenResolved && - left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } -} - case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], @@ -440,10 +443,3 @@ case object OneRowRelation extends LeafNode { override def statistics: Statistics = Statistics(sizeInBytes = 1) } -case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - override def output: Seq[Attribute] = left.output - - override lazy val resolved: Boolean = - childrenResolved && - left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index dca8c881f21ab..bc2c696f33c14 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -147,6 +147,23 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter { UnresolvedTestPlan(), "unresolved" :: Nil) + errorTest( + "union with unequal number of columns", + testRelation.unionAll(testRelation2), + "union" :: "number of columns" :: testRelation2.output.length.toString :: + testRelation.output.length.toString :: Nil) + + errorTest( + "intersect with unequal number of columns", + testRelation.intersect(testRelation2), + "intersect" :: "number of columns" :: testRelation2.output.length.toString :: + testRelation.output.length.toString :: Nil) + + errorTest( + "except with unequal number of columns", + testRelation.except(testRelation2), + "except" :: "number of columns" :: testRelation2.output.length.toString :: + testRelation.output.length.toString :: Nil) test("SPARK-6452 regression test") { // CheckAnalysis should throw AnalysisException when Aggregate contains missing attribute(s) From a0bd50c064d76893696746f3e01db4e099403205 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 22 Aug 2015 15:26:00 -0700 Subject: [PATCH 2/4] Declare empty output for InsertIntoHiveTable --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 12c667e6e92da..62efda613a176 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -61,7 +61,7 @@ case class InsertIntoHiveTable( serializer } - def output: Seq[Attribute] = child.output + def output: Seq[Attribute] = Seq.empty def saveAsHiveFile( rdd: RDD[InternalRow], From a036dddef5090df3808e5931b47a51bd27440b20 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 22 Aug 2015 15:35:08 -0700 Subject: [PATCH 3/4] Also update logical plan --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index bbe8c1911bf86..98d21aa76d64e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -751,7 +751,7 @@ private[hive] case class InsertIntoHiveTable( extends LogicalPlan { override def children: Seq[LogicalPlan] = child :: Nil - override def output: Seq[Attribute] = child.output + override def output: Seq[Attribute] = Seq.empty val numDynamicPartitions = partition.values.count(_.isEmpty) From 362e7490329df6fa39550f1479a7cb41fcbcfdd3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 22 Aug 2015 15:36:41 -0700 Subject: [PATCH 4/4] Also update InsertIntoTable --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index d2613f9143029..722f69cdca827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -163,7 +163,7 @@ case class InsertIntoTable( extends LogicalPlan { override def children: Seq[LogicalPlan] = child :: Nil - override def output: Seq[Attribute] = child.output + override def output: Seq[Attribute] = Seq.empty assert(overwrite || !ifNotExists) override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {