From 90f99fc6e645a65232cc52c81717402df1fd97df Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 26 Jul 2018 23:24:47 +0800 Subject: [PATCH 1/5] Datasource partition table should load empty partitions. --- .../InsertIntoHadoopFsRelationCommand.scala | 7 ++++++- .../datasources/PartitioningUtils.scala | 5 +++++ .../sql/execution/command/DDLSuite.scala | 21 +++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index dd7ef0d15c140..a21c20090d9ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -166,7 +166,12 @@ case class InsertIntoHadoopFsRelationCommand( // update metastore partition metadata - refreshUpdatedPartitions(updatedPartitionPaths) + if (staticPartitions.nonEmpty) { + // Avoid empty partition can't loaded. + refreshUpdatedPartitions(Set(PartitioningUtils.getPathFragment(staticPartitions))) + } else { + refreshUpdatedPartitions(updatedPartitionPaths) + } // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f9a24806953e6..ad3d269ab8971 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -284,6 +284,11 @@ object PartitioningUtils { }.mkString("/") } + def getPathFragment(partitions: TablePartitionSpec): String = partitions.map { + case (k, v) => + escapePathName(k) + "=" + escapePathName(v) + }.mkString("/") + /** * Normalize the column names in partition specification, w.r.t. the real partition column names * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ca95aad3976e6..6687eda26271b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2249,6 +2249,27 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test("Datasource partition table should load empty partitions") { + withTable("t", "t1") { + withTempPath { dir => + spark.sql("CREATE TABLE t(a int) USING parquet") + spark.sql("CREATE TABLE t1(a int, b string, c string) " + + s"USING parquet PARTITIONED BY(b, c) LOCATION '${dir.toURI}'") + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) + + spark.sql("INSERT INTO TABLE t1 PARTITION(b='b', c='c') SELECT * FROM t WHERE 1 = 0") + + assert(spark.sql("SHOW PARTITIONS t1").count() == 1) + + assert(new File(dir, "b=b/c=c").exists()) + + checkAnswer(spark.table("t1"), Nil) + } + } + } + Seq(true, false).foreach { shouldDelete => val tcName = if (shouldDelete) "non-existing" else "existed" test(s"CTAS for external data source table with a $tcName location") { From d684d38c6731f84d568a8e42f6ff0455cf6ba700 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 27 Jul 2018 05:56:03 +0800 Subject: [PATCH 2/5] Fix test error. --- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index a21c20090d9ba..f256f6b6b3afd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -166,8 +166,8 @@ case class InsertIntoHadoopFsRelationCommand( // update metastore partition metadata - if (staticPartitions.nonEmpty) { - // Avoid empty partition can't loaded. + if (staticPartitions.nonEmpty && updatedPartitionPaths.isEmpty) { + // Avoid empty static partition can't loaded to datasource table. refreshUpdatedPartitions(Set(PartitioningUtils.getPathFragment(staticPartitions))) } else { refreshUpdatedPartitions(updatedPartitionPaths) From b44e578cf5cf1ac0e25dab779739ef253786c366 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 27 Jul 2018 09:03:24 +0800 Subject: [PATCH 3/5] Add more test case --- .../InsertIntoHadoopFsRelationCommand.scala | 3 ++- .../sql/execution/command/DDLSuite.scala | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index f256f6b6b3afd..303a967c6b2c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -166,7 +166,8 @@ case class InsertIntoHadoopFsRelationCommand( // update metastore partition metadata - if (staticPartitions.nonEmpty && updatedPartitionPaths.isEmpty) { + if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty + && partitionColumns.length == staticPartitions.size) { // Avoid empty static partition can't loaded to datasource table. refreshUpdatedPartitions(Set(PartitioningUtils.getPathFragment(staticPartitions))) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 6687eda26271b..0355847f90838 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2250,6 +2250,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } test("Datasource partition table should load empty partitions") { + // All static partitions withTable("t", "t1") { withTempPath { dir => spark.sql("CREATE TABLE t(a int) USING parquet") @@ -2259,6 +2260,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) + assert(spark.sql("SHOW PARTITIONS t1").count() == 0) + spark.sql("INSERT INTO TABLE t1 PARTITION(b='b', c='c') SELECT * FROM t WHERE 1 = 0") assert(spark.sql("SHOW PARTITIONS t1").count() == 1) @@ -2268,6 +2271,28 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { checkAnswer(spark.table("t1"), Nil) } } + + // Partial dynamic partitions + withTable("t", "t1") { + withTempPath { dir => + spark.sql("CREATE TABLE t(a int) USING parquet") + spark.sql("CREATE TABLE t1(a int, b string, c string) " + + s"USING parquet PARTITIONED BY(b, c) LOCATION '${dir.toURI}'") + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) + + assert(spark.sql("SHOW PARTITIONS t1").count() == 0) + + spark.sql("INSERT INTO TABLE t1 PARTITION(b='b', c) SELECT *, 'c' FROM t WHERE 1 = 0") + + assert(spark.sql("SHOW PARTITIONS t1").count() == 0) + + assert(!new File(dir, "b=b/c=c").exists()) + + checkAnswer(spark.table("t1"), Nil) + } + } } Seq(true, false).foreach { shouldDelete => From 46bfcc6ebece630a8da3a22240889ac5a8ec9d5f Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 31 Jul 2018 18:45:00 +0800 Subject: [PATCH 4/5] Fix order --- .../InsertIntoHadoopFsRelationCommand.scala | 4 +++- .../execution/datasources/PartitioningUtils.scala | 9 ++++----- .../spark/sql/execution/command/DDLSuite.scala | 12 ++++++------ 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 303a967c6b2c5..5fd0de802f7a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -169,7 +169,9 @@ case class InsertIntoHadoopFsRelationCommand( if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty && partitionColumns.length == staticPartitions.size) { // Avoid empty static partition can't loaded to datasource table. - refreshUpdatedPartitions(Set(PartitioningUtils.getPathFragment(staticPartitions))) + val staticPathFragment = + PartitioningUtils.getPathFragment(staticPartitions, partitionColumns) + refreshUpdatedPartitions(Set(staticPathFragment)) } else { refreshUpdatedPartitions(updatedPartitionPaths) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index ad3d269ab8971..d9a722b74cf5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -284,10 +284,9 @@ object PartitioningUtils { }.mkString("/") } - def getPathFragment(partitions: TablePartitionSpec): String = partitions.map { - case (k, v) => - escapePathName(k) + "=" + escapePathName(v) - }.mkString("/") + def getPathFragment(spec: TablePartitionSpec, partitionColumns: Seq[Attribute]): String = { + getPathFragment(spec, StructType.fromAttributes(partitionColumns)) + } /** * Normalize the column names in partition specification, w.r.t. the real partition column names diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0355847f90838..c6e33f89cb70c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2254,8 +2254,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { withTable("t", "t1") { withTempPath { dir => spark.sql("CREATE TABLE t(a int) USING parquet") - spark.sql("CREATE TABLE t1(a int, b string, c string) " + - s"USING parquet PARTITIONED BY(b, c) LOCATION '${dir.toURI}'") + spark.sql("CREATE TABLE t1(a int, c string, b string) " + + s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) @@ -2266,7 +2266,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { assert(spark.sql("SHOW PARTITIONS t1").count() == 1) - assert(new File(dir, "b=b/c=c").exists()) + assert(new File(dir, "c=c/b=b").exists()) checkAnswer(spark.table("t1"), Nil) } @@ -2277,18 +2277,18 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { withTempPath { dir => spark.sql("CREATE TABLE t(a int) USING parquet") spark.sql("CREATE TABLE t1(a int, b string, c string) " + - s"USING parquet PARTITIONED BY(b, c) LOCATION '${dir.toURI}'") + s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) assert(spark.sql("SHOW PARTITIONS t1").count() == 0) - spark.sql("INSERT INTO TABLE t1 PARTITION(b='b', c) SELECT *, 'c' FROM t WHERE 1 = 0") + spark.sql("INSERT INTO TABLE t1 PARTITION(c='c', b) SELECT *, 'b' FROM t WHERE 1 = 0") assert(spark.sql("SHOW PARTITIONS t1").count() == 0) - assert(!new File(dir, "b=b/c=c").exists()) + assert(!new File(dir, "c=c/b=b").exists()) checkAnswer(spark.table("t1"), Nil) } From 536346e60ed24ee447f991aacf58cafe9415a020 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 1 Aug 2018 09:36:41 +0800 Subject: [PATCH 5/5] Add test case to test hive table --- .../sql/execution/command/DDLSuite.scala | 62 ++++++++++++------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index c6e33f89cb70c..78df1db93692b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2249,48 +2249,64 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - test("Datasource partition table should load empty partitions") { + test("Partition table should load empty static partitions") { // All static partitions - withTable("t", "t1") { + withTable("t", "t1", "t2") { withTempPath { dir => spark.sql("CREATE TABLE t(a int) USING parquet") spark.sql("CREATE TABLE t1(a int, c string, b string) " + s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'") - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) - - assert(spark.sql("SHOW PARTITIONS t1").count() == 0) - - spark.sql("INSERT INTO TABLE t1 PARTITION(b='b', c='c') SELECT * FROM t WHERE 1 = 0") + // datasource table + validateStaticPartitionTable("t1") - assert(spark.sql("SHOW PARTITIONS t1").count() == 1) - - assert(new File(dir, "c=c/b=b").exists()) + // hive table + if (isUsingHiveMetastore) { + spark.sql("CREATE TABLE t2(a int) " + + s"PARTITIONED BY(c string, b string) LOCATION '${dir.toURI}'") + validateStaticPartitionTable("t2") + } - checkAnswer(spark.table("t1"), Nil) + def validateStaticPartitionTable(tableName: String): Unit = { + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) + assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0) + spark.sql( + s"INSERT INTO TABLE $tableName PARTITION(b='b', c='c') SELECT * FROM t WHERE 1 = 0") + assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 1) + assert(new File(dir, "c=c/b=b").exists()) + checkAnswer(spark.table(tableName), Nil) + } } } // Partial dynamic partitions - withTable("t", "t1") { + withTable("t", "t1", "t2") { withTempPath { dir => spark.sql("CREATE TABLE t(a int) USING parquet") spark.sql("CREATE TABLE t1(a int, b string, c string) " + s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'") - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) - - assert(spark.sql("SHOW PARTITIONS t1").count() == 0) - - spark.sql("INSERT INTO TABLE t1 PARTITION(c='c', b) SELECT *, 'b' FROM t WHERE 1 = 0") + // datasource table + validatePartialStaticPartitionTable("t1") - assert(spark.sql("SHOW PARTITIONS t1").count() == 0) - - assert(!new File(dir, "c=c/b=b").exists()) + // hive table + if (isUsingHiveMetastore) { + spark.sql("CREATE TABLE t2(a int) " + + s"PARTITIONED BY(c string, b string) LOCATION '${dir.toURI}'") + validatePartialStaticPartitionTable("t2") + } - checkAnswer(spark.table("t1"), Nil) + def validatePartialStaticPartitionTable(tableName: String): Unit = { + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) + assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0) + spark.sql( + s"INSERT INTO TABLE $tableName PARTITION(c='c', b) SELECT *, 'b' FROM t WHERE 1 = 0") + assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0) + assert(!new File(dir, "c=c/b=b").exists()) + checkAnswer(spark.table(tableName), Nil) + } } } }