From 83b2e6e066611b972e1dddb0b3eb1ae3f8389753 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 3 Jun 2016 08:20:44 +0000 Subject: [PATCH 1/4] Move Analyzer stuff to Analyzer from DataFrameWriter. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++++++++++++++--- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +----------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 02966796afdd7..7d0aebbdf5618 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -452,6 +452,17 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => + // A partitioned relation's schema can be different from the input logicalPlan, since + // partition columns are all moved after data columns. We Project to adjust the ordering. + val inputWhenPartsNonEmpty = if (parts.nonEmpty) { + val (inputPartCols, inputDataCols) = child.output.partition { attr => + parts.contains(attr.name) + } + Project(inputDataCols ++ inputPartCols, child) + } else { + child + } + val table = lookupTableFromCatalog(u) // adding the table's partitions or validate the query's partition info table match { @@ -467,8 +478,8 @@ class Analyzer( |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // Assume partition columns are correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table)) + // Partition columns are already correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table), child = inputWhenPartsNonEmpty) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -486,7 +497,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => - i.copy(table = EliminateSubqueryAliases(table)) + i.copy(table = EliminateSubqueryAliases(table), child = inputWhenPartsNonEmpty) } case u: UnresolvedRelation => val table = u.tableIdentifier diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1dd8818dedb2e..d6550176a0d41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -435,21 +435,11 @@ final class DataFrameWriter private[sql](df: DataFrame) { val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) val overwrite = mode == SaveMode.Overwrite - // A partitioned relation's schema can be different from the input logicalPlan, since - // partition columns are all moved after data columns. We Project to adjust the ordering. - // TODO: this belongs to the analyzer. - val input = normalizedParCols.map { parCols => - val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => - parCols.contains(attr.name) - } - Project(inputDataCols ++ inputPartCols, df.logicalPlan) - }.getOrElse(df.logicalPlan) - df.sparkSession.sessionState.executePlan( InsertIntoTable( UnresolvedRelation(tableIdent), partitions.getOrElse(Map.empty[String, Option[String]]), - input, + df.logicalPlan, overwrite, ifNotExists = false)).toRdd } From c809cc618b3fc30e75345b5d013f58a51c8e91cf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 3 Jun 2016 14:11:36 +0000 Subject: [PATCH 2/4] Fix test. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7d0aebbdf5618..a6aef7f50dca1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -458,7 +458,11 @@ class Analyzer( val (inputPartCols, inputDataCols) = child.output.partition { attr => parts.contains(attr.name) } - Project(inputDataCols ++ inputPartCols, child) + if (child.output == (inputDataCols ++ inputPartCols)) { + child + } else { + Project(inputDataCols ++ inputPartCols, child) + } } else { child } From 6888e0aef0da0946e057daee02078df9f0eac549 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 8 Jun 2016 18:20:07 +0000 Subject: [PATCH 3/4] Address comments. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +----- .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a6aef7f50dca1..7d0aebbdf5618 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -458,11 +458,7 @@ class Analyzer( val (inputPartCols, inputDataCols) = child.output.partition { attr => parts.contains(attr.name) } - if (child.output == (inputDataCols ++ inputPartCols)) { - child - } else { - Project(inputDataCols ++ inputPartCols, child) - } + Project(inputDataCols ++ inputPartCols, child) } else { child } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e0f6ccf04dd33..0a2bab4f5d1e1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1042,7 +1042,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .queryExecution.analyzed } - assertResult(1, "Duplicated project detected\n" + analyzedPlan) { + assertResult(2, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { case _: Project => () }.size @@ -1061,7 +1061,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .queryExecution.analyzed } - assertResult(1, "Duplicated project detected\n" + analyzedPlan) { + assertResult(2, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { case _: Project => () }.size From 40a7f315297d13e16aea4c370963867545c3408f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 8 Jun 2016 22:59:11 +0000 Subject: [PATCH 4/4] Address comment. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7d0aebbdf5618..d5a25e127f79d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -454,7 +454,7 @@ class Analyzer( case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => // A partitioned relation's schema can be different from the input logicalPlan, since // partition columns are all moved after data columns. We Project to adjust the ordering. - val inputWhenPartsNonEmpty = if (parts.nonEmpty) { + val input = if (parts.nonEmpty) { val (inputPartCols, inputDataCols) = child.output.partition { attr => parts.contains(attr.name) } @@ -479,7 +479,7 @@ class Analyzer( |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } // Partition columns are already correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table), child = inputWhenPartsNonEmpty) + i.copy(table = EliminateSubqueryAliases(table), child = input) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -497,7 +497,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => - i.copy(table = EliminateSubqueryAliases(table), child = inputWhenPartsNonEmpty) + i.copy(table = EliminateSubqueryAliases(table), child = input) } case u: UnresolvedRelation => val table = u.tableIdentifier