From 329886e54d3a70e2314d67b6b6060fc33cef9b8d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 8 Feb 2017 01:28:23 +0800 Subject: [PATCH 1/2] renaming partition should not leave useless directories --- .../catalog/ExternalCatalogUtils.scala | 17 +++--- .../spark/sql/hive/HiveExternalCatalog.scala | 53 ++++++++++++++++--- .../PartitionProviderCompatibilitySuite.scala | 29 ++++++++++ 3 files changed, 84 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 4331841fbffb4..58ced549bafe9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -108,18 +108,21 @@ object ExternalCatalogUtils { partitionColumnNames: Seq[String], tablePath: Path): Path = { val partitionPathStrings = partitionColumnNames.map { col => - val partitionValue = spec(col) - val partitionString = if (partitionValue == null) { - DEFAULT_PARTITION_NAME - } else { - escapePathName(partitionValue) - } - escapePathName(col) + "=" + partitionString + getPartitionPathString(col, spec(col)) } partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) => new Path(totalPath, nextPartPath) } } + + def getPartitionPathString(col: String, value: String): String = { + val partitionString = if (value == null) { + DEFAULT_PARTITION_NAME + } else { + escapePathName(value) + } + escapePathName(col) + "=" + partitionString + } } object CatalogUtils { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 208c8c9d5d0cf..44d6b0f40a4e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -892,21 +892,58 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col) if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) { val tablePath = new Path(tableMeta.location) + val fs = tablePath.getFileSystem(hadoopConf) val newParts = newSpecs.map { spec => + val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec) val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec)) - val wrongPath = new Path(partition.location) - val rightPath = ExternalCatalogUtils.generatePartitionPath( - spec, partitionColumnNames, tablePath) + partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString))) + } + alterPartitions(db, table, newParts) + } + } + + /** + * Rename the partition directory w.r.t. the actual partition columns. + * + * It will recursively rename the partition directory from the first partition column, to be most + * compatible with different file systems. e.g. in some file systems, renaming `a=1/b=2` to + * `A=1/B=2` will result to `a=1/B=2`, while in some other file systems, the renaming works, but + * will leave an empty directory `a=1`. + */ + private def renamePartitionDirectory( + fs: FileSystem, + tablePath: Path, + partCols: Seq[String], + newSpec: TablePartitionSpec): Path = { + import ExternalCatalogUtils.getPartitionPathString + + var totalPath = tablePath + partCols.foreach { col => + val partValue = newSpec(col) + val expectedPartitionString = getPartitionPathString(col, partValue) + val expectedPartitionPath = new Path(totalPath, expectedPartitionString) + + if (fs.exists(expectedPartitionPath)) { + // It is possible that some parental partition directories already exist or doesn't need to + // be renamed. e.g. the partition columns are `a` and `B`, then we don't need to rename + // `/table_path/a=1`. Or we already have a partition directory `A=1/B=2`, and we rename + // another partition to `A=1/B=3`, then we will have `A=1/B=2` and `a=1/b=3`, and we should + // just move `a=1/b=3` into `A=1` with new name `B=3`. + } else { + val actualPartitionString = getPartitionPathString(col.toLowerCase, partValue) + val actualPartitionPath = new Path(totalPath, actualPartitionString) try { - tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath) + fs.rename(actualPartitionPath, expectedPartitionPath) } catch { - case e: IOException => throw new SparkException( - s"Unable to rename partition path from $wrongPath to $rightPath", e) + case e: IOException => + throw new SparkException("Unable to rename partition path from " + + s"$actualPartitionPath to $expectedPartitionPath", e) } - partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString))) } - alterPartitions(db, table, newParts) + totalPath = expectedPartitionPath } + + totalPath } override def alterPartitions( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index dca207a72d889..96385961c9a52 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.hive import java.io.File +import org.apache.hadoop.fs.Path + import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -481,4 +484,30 @@ class PartitionProviderCompatibilitySuite assert(spark.sql("show partitions test").count() == 5) } } + + test("SPARK-19359: renaming partition should not leave useless directories") { + withTable("t", "t1") { + Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t") + spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)") + + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + var tablePath = new Path(table.location) + val fs = tablePath.getFileSystem(spark.sessionState.newHadoopConf()) + // the `A=2` directory is still there, we follow this behavior from hive. + assert(fs.listStatus(tablePath) + .filterNot(_.getPath.toString.contains("A=2")).count(_.isDirectory) == 1) + assert(fs.listStatus(new Path(tablePath, "A=4")).count(_.isDirectory) == 1) + + + Seq((1, 2, 3, 4)).toDF("id", "A", "b", "C").write.partitionBy("A", "b", "C").saveAsTable("t1") + spark.sql("alter table t1 partition(A=2, b=3, C=4) rename to partition(A=4, b=5, C=6)") + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + tablePath = new Path(table.location) + // the `A=2` directory is still there, we follow this behavior from hive. + assert(fs.listStatus(tablePath) + .filterNot(_.getPath.toString.contains("A=2")).count(_.isDirectory) == 1) + assert(fs.listStatus(new Path(tablePath, "A=4")).count(_.isDirectory) == 1) + assert(fs.listStatus(new Path(new Path(tablePath, "A=4"), "b=5")).count(_.isDirectory) == 1) + } + } } From 2c4d3c71e971bd039936c43143718ba7091d6113 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 9 Feb 2017 09:21:16 +0800 Subject: [PATCH 2/2] address comments --- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 44d6b0f40a4e0..1fc8e8ea9fa3b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -917,11 +917,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat newSpec: TablePartitionSpec): Path = { import ExternalCatalogUtils.getPartitionPathString - var totalPath = tablePath + var currentFullPath = tablePath partCols.foreach { col => val partValue = newSpec(col) val expectedPartitionString = getPartitionPathString(col, partValue) - val expectedPartitionPath = new Path(totalPath, expectedPartitionString) + val expectedPartitionPath = new Path(currentFullPath, expectedPartitionString) if (fs.exists(expectedPartitionPath)) { // It is possible that some parental partition directories already exist or doesn't need to @@ -931,7 +931,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // just move `a=1/b=3` into `A=1` with new name `B=3`. } else { val actualPartitionString = getPartitionPathString(col.toLowerCase, partValue) - val actualPartitionPath = new Path(totalPath, actualPartitionString) + val actualPartitionPath = new Path(currentFullPath, actualPartitionString) try { fs.rename(actualPartitionPath, expectedPartitionPath) } catch { @@ -940,10 +940,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat s"$actualPartitionPath to $expectedPartitionPath", e) } } - totalPath = expectedPartitionPath + currentFullPath = expectedPartitionPath } - totalPath + currentFullPath } override def alterPartitions(