From ca269a4ef12496975e00c2a6f4bdbd04fd41568e Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 31 May 2016 19:41:20 +0800 Subject: [PATCH 1/2] Throw exception if columns number of outputs mismatch the inputs --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 7 +++++++ .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b3896484da178..cc2b6376b8eda 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -170,6 +170,13 @@ case class InsertIntoHiveTable( val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) + // All column names in the format of ",,..." + val columnsCnt = tableDesc.getProperties.getProperty("columns").split(",").size + + if (columnsCnt + numDynamicPartitions != child.output.size) { + throw new SparkException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg) + } + // By this time, the partition map must match the table's partition columns if (partitionColumnNames.toSet != partition.keySet) { throw new SparkException( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e0f6ccf04dd33..4075a50dc90e3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1057,7 +1057,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("SET hive.exec.dynamic.partition.mode=nonstrict") sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart") - sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value FROM src") + sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value, 'a', 'b' FROM src") .queryExecution.analyzed } From c46e5d0d2d04737ea51e39c8c0f239882f1e4f57 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 1 Jun 2016 00:26:07 +0800 Subject: [PATCH 2/2] more specified error message and a unit test --- .../sql/hive/execution/InsertIntoHiveTable.scala | 5 ++++- .../spark/sql/hive/execution/HiveQuerySuite.scala | 13 +++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index cc2b6376b8eda..2d0dfae5c4892 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -174,7 +174,10 @@ case class InsertIntoHiveTable( val columnsCnt = tableDesc.getProperties.getProperty("columns").split(",").size if (columnsCnt + numDynamicPartitions != child.output.size) { - throw new SparkException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg) + throw new SparkException(s"Cannot insert into target table ${tableDesc.getTableName} " + + s"because column number are different: target table has $columnsCnt column(s) and " + + s"$numDynamicPartitions dynamic partition column(s), but input has ${child.output.size} " + + s"column(s).") } // By this time, the partition map must match the table's partition columns diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 4075a50dc90e3..21e52117b8a05 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1068,6 +1068,19 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } } + test("SPARK-15667: Check column number matching with static & dynamic partition insert") { + intercept[SparkException] { + loadTestTable("srcpart") + sql("DROP TABLE IF EXISTS withparts") + sql("CREATE TABLE withparts LIKE srcpart") + sql("SET hive.exec.dynamic.partition.mode=nonstrict") + + sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart") + sql("INSERT INTO TABLE withparts PARTITION(ds='a', hr) SELECT key, value FROM src") + .queryExecution.sparkPlan + } + } + test("parse HQL set commands") { // Adapted from its SQL counterpart. val testKey = "spark.sql.key.usedfortestonly"