From ff0e7c7c84c12e2cd5277440538f453f019a6f8e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 27 Sep 2016 11:25:37 -0700 Subject: [PATCH 1/2] fix. --- .../org/apache/spark/sql/types/StructType.scala | 9 +++++++++ .../datasources/DataSourceStrategy.scala | 5 +++-- .../spark/sql/execution/datasources/rules.scala | 3 ++- .../apache/spark/sql/sources/InsertSuite.scala | 17 +++++++++++++++++ 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index dd4c88c4c43bc..18f6faf9bc13d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -384,6 +384,15 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru StructType(newFields) } + private[sql] def cleanNullableAndMetadata: StructType = { + val newFields = fields.map { + case StructField(name, dataType, nullable, _) => + StructField(name, dataType.asNullable, nullable = true) + } + + StructType(newFields) + } + override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = { f(this) || fields.exists(field => field.dataType.existsRecursively(f)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 63f01c5bb9e3c..6017f96cbcf65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -159,8 +159,9 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) - if query.resolved && t.schema.asNullable == query.schema.asNullable => + l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) + if query.resolved && + t.schema.cleanNullableAndMetadata == query.schema.cleanNullableAndMetadata => // Sanity checks if (t.location.paths.size != 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index bd6eb6e0535ab..c1059fc0a3aac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -243,7 +243,6 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { } } - // TODO: do we really need to rename? def castAndRenameChildOutput( insert: InsertIntoTable, expectedOutput: Seq[Attribute]): InsertIntoTable = { @@ -252,6 +251,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name) { actual } else { + // Renaming is needed for handling the INSERT statement like + // INSERT INTO TABLE tab1 SELECT 1, 2 Alias(Cast(actual, expected.dataType), expected.name)() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 5eb54643f204f..f14fd02d327d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -185,6 +185,23 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { ) } + test("INSERT INTO TABLE with Comment in columns") { + val tabName = "tab1" + withTable(tabName) { + sql( + s""" + |CREATE TABLE $tabName(col1 int COMMENT 'a', col2 int) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tabName SELECT 1, 2") + + checkAnswer( + sql(s"SELECT col1, col2 FROM $tabName"), + Row(1, 2) :: Nil + ) + } + } + test("it is not allowed to write to a table while querying it.") { val message = intercept[AnalysisException] { sql( From 3557b261e65738f7a93eac401e33530a52538da8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 2 Oct 2016 00:23:55 -0700 Subject: [PATCH 2/2] test case for struct --- .../spark/sql/sources/InsertSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index f14fd02d327d6..4a85b5975ea53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -202,6 +202,31 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } } + test("INSERT INTO TABLE - complex type but different names") { + val tab1 = "tab1" + val tab2 = "tab2" + withTable(tab1, tab2) { + sql( + s""" + |CREATE TABLE $tab1 (s struct) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tab1 SELECT named_struct('col1','1','col2','2')") + + sql( + s""" + |CREATE TABLE $tab2 (p struct) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tab2 SELECT * FROM $tab1") + + checkAnswer( + spark.table(tab1), + spark.table(tab2) + ) + } + } + test("it is not allowed to write to a table while querying it.") { val message = intercept[AnalysisException] { sql(