From 81dbeb19e61a67a287a5762e391517eb55a20721 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 27 Oct 2016 09:29:16 +0000 Subject: [PATCH 1/4] Drop partition before insert overwrite to Hive table. --- .../hive/execution/InsertIntoHiveTable.scala | 23 +++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 33 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c3c4e2925b90c..010a34282f2df 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.command.AlterTableAddPartitionCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException @@ -258,6 +259,28 @@ case class InsertIntoHiveTable( partitionSpec) if (oldPart.isEmpty || !ifNotExists) { + // SPARK-18107: Insert overwrite runs much slower than hive-client. + // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive + // version and we may not want to catch up new Hive version every time. We delete the + // Hive partition first and then load data file into the Hive partition. + if (oldPart.nonEmpty && overwrite) { + externalCatalog.dropPartitions( + table.catalogTable.database, + table.catalogTable.identifier.table, + parts = Seq(partitionSpec), + ignoreIfNotExists = true, + purge = false) + // Although `externalCatalog.loadPartition` will create the partition, + // the old partition may use custom properties such as location, so we need to create + // it manually. + val partitionSpecAndLocation = Seq((partitionSpec, oldPart.get.storage.locationUri)) + AlterTableAddPartitionCommand( + tableName = table.catalogTable.identifier, + partitionSpecsAndLocs = partitionSpecAndLocation, + ifNotExists = true + ).run(sqlContext.sparkSession) + } + // inheritTableSpecs is set to true. It should be set to false for an IMPORT query // which is currently considered as a Hive native command. val inheritTableSpecs = true diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 495b4f874a1d6..bd45b66a3da0d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1947,6 +1947,39 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("Insert overwrite with partition") { + withTable("tableWithPartition") { + sql( + """ + |CREATE TABLE tableWithPartition (key int, value STRING) + |PARTITIONED BY (part STRING) + """.stripMargin) + sql( + """ + |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1') + |SELECT * FROM default.src + """.stripMargin) + checkAnswer( + sql("SELECT part, key, value FROM tableWithPartition"), + sql("SELECT '1' AS part, key, value FROM default.src") + ) + + sql( + """ + |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1') + |SELECT * FROM VALUES (1, "one"), (2, "two"), (3, null) AS data(key, value) + """.stripMargin) + checkAnswer( + sql("SELECT part, key, value FROM tableWithPartition"), + sql( + """ + |SELECT '1' AS part, key, value FROM VALUES + |(1, "one"), (2, "two"), (3, null) AS data(key, value) + """.stripMargin) + ) + } + } + def testCommandAvailable(command: String): Boolean = { val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue()) attempt.isSuccess && attempt.get == 0 From 5257d7fed6d3a002912754d7845cfc0069291881 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 30 Oct 2016 02:21:42 +0000 Subject: [PATCH 2/4] Reset overwrite flag. --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 010a34282f2df..2fead156e49b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -258,6 +258,8 @@ case class InsertIntoHiveTable( table.catalogTable.identifier.table, partitionSpec) + var doOverwrite = overwrite + if (oldPart.isEmpty || !ifNotExists) { // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive @@ -279,6 +281,9 @@ case class InsertIntoHiveTable( partitionSpecsAndLocs = partitionSpecAndLocation, ifNotExists = true ).run(sqlContext.sparkSession) + + // Don't let Hive do overwrite operation since it is slower. + doOverwrite = false } // inheritTableSpecs is set to true. It should be set to false for an IMPORT query @@ -289,7 +294,7 @@ case class InsertIntoHiveTable( table.catalogTable.identifier.table, outputPath.toString, partitionSpec, - isOverwrite = overwrite, + isOverwrite = doOverwrite, holdDDLTime = holdDDLTime, inheritTableSpecs = inheritTableSpecs) } From 74f0f3e6308908c3d5c2a6262f675b0ddbacad71 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 31 Oct 2016 03:04:35 +0000 Subject: [PATCH 3/4] Fix test. --- .../hive/execution/InsertIntoHiveTable.scala | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 2fead156e49b1..c767ad3385b65 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.command.AlterTableAddPartitionCommand +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException @@ -266,24 +266,21 @@ case class InsertIntoHiveTable( // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. if (oldPart.nonEmpty && overwrite) { - externalCatalog.dropPartitions( - table.catalogTable.database, - table.catalogTable.identifier.table, - parts = Seq(partitionSpec), - ignoreIfNotExists = true, - purge = false) - // Although `externalCatalog.loadPartition` will create the partition, - // the old partition may use custom properties such as location, so we need to create - // it manually. - val partitionSpecAndLocation = Seq((partitionSpec, oldPart.get.storage.locationUri)) - AlterTableAddPartitionCommand( - tableName = table.catalogTable.identifier, - partitionSpecsAndLocs = partitionSpecAndLocation, - ifNotExists = true - ).run(sqlContext.sparkSession) - - // Don't let Hive do overwrite operation since it is slower. - doOverwrite = false + oldPart.get.storage.locationUri.map { uri => + val partitionPath = new Path(uri) + val fs = partitionPath.getFileSystem(hadoopConf) + if (fs.exists(partitionPath)) { + val pathPermission = fs.getFileStatus(partitionPath).getPermission() + if (!fs.delete(partitionPath, true)) { + throw new RuntimeException( + "Cannot remove partition directory '" + partitionPath.toString) + } else { + fs.mkdirs(partitionPath, pathPermission) + } + // Don't let Hive do overwrite operation since it is slower. + doOverwrite = false + } + } } // inheritTableSpecs is set to true. It should be set to false for an IMPORT query From bd22150823ff9ce6a0b80ae61fae6477ad135ef8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 1 Nov 2016 00:51:51 +0000 Subject: [PATCH 4/4] No to mkdir. --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c767ad3385b65..2843100fb3b36 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -258,7 +258,7 @@ case class InsertIntoHiveTable( table.catalogTable.identifier.table, partitionSpec) - var doOverwrite = overwrite + var doHiveOverwrite = overwrite if (oldPart.isEmpty || !ifNotExists) { // SPARK-18107: Insert overwrite runs much slower than hive-client. @@ -270,15 +270,12 @@ case class InsertIntoHiveTable( val partitionPath = new Path(uri) val fs = partitionPath.getFileSystem(hadoopConf) if (fs.exists(partitionPath)) { - val pathPermission = fs.getFileStatus(partitionPath).getPermission() if (!fs.delete(partitionPath, true)) { throw new RuntimeException( "Cannot remove partition directory '" + partitionPath.toString) - } else { - fs.mkdirs(partitionPath, pathPermission) } // Don't let Hive do overwrite operation since it is slower. - doOverwrite = false + doHiveOverwrite = false } } } @@ -291,7 +288,7 @@ case class InsertIntoHiveTable( table.catalogTable.identifier.table, outputPath.toString, partitionSpec, - isOverwrite = doOverwrite, + isOverwrite = doHiveOverwrite, holdDDLTime = holdDDLTime, inheritTableSpecs = inheritTableSpecs) }