diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 30f9a650a69c9..c061d617fce4b 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -188,13 +188,18 @@ class HadoopMapReduceCommitProtocol( val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") + val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet if (dynamicPartitionOverwrite) { - val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet - logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths") - absPartitionPaths.foreach(fs.delete(_, true)) + logDebug(s"Clean up absolute partition directories for overwriting: $absParentPaths") + absParentPaths.foreach(fs.delete(_, true)) } + logDebug(s"Create absolute parent directories: $absParentPaths") + absParentPaths.foreach(fs.mkdirs) for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) + if (!fs.rename(new Path(src), new Path(dst))) { + throw new IOException(s"Failed to rename $src to $dst when committing files staged for " + + s"absolute locations") + } } if (dynamicPartitionOverwrite) { @@ -213,7 +218,11 @@ class HadoopMapReduceCommitProtocol( // a parent that exists, otherwise we may get unexpected result on the rename. fs.mkdirs(finalPartPath.getParent) } - fs.rename(new Path(stagingDir, part), finalPartPath) + val stagingPartPath = new Path(stagingDir, part) + if (!fs.rename(stagingPartPath, finalPartPath)) { + throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " + + s"committing files staged for overwriting dynamic partitions") + } } } diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala index 1d3e28b39548f..8f220801f41e3 100644 --- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala +++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala @@ -57,8 +57,14 @@ object DebugFilesystem extends Logging { } /** - * DebugFilesystem wraps file open calls to track all open connections. This can be used in tests - * to check that connections are not leaked. + * DebugFilesystem wraps + * 1) file open calls to track all open connections. This can be used in tests to check that + * connections are not leaked; + * 2) rename calls to return false when destination's parent path does not exist. When + * destination parent does not exist, LocalFileSystem uses FileUtil#copy to copy the + * file and returns true if succeed, while many other hadoop file systems (e.g. HDFS, S3A) + * return false without renaming any file. This helps to test that Spark can work with the + * latter file systems. */ // TODO(ekl) we should consider always interposing this to expose num open conns as a metric class DebugFilesystem extends LocalFileSystem { @@ -120,4 +126,8 @@ class DebugFilesystem extends LocalFileSystem { override def hashCode(): Int = wrapped.hashCode() } } + + override def rename(src: Path, dst: Path): Boolean = { + exists(dst.getParent) && super.rename(src, dst) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index ab7e49581a9f4..7d29a9ed0ae79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -499,7 +499,11 @@ class InMemoryCatalog( newSpec, partitionColumnNames, tablePath) try { val fs = tablePath.getFileSystem(hadoopConfig) - fs.rename(oldPartPath, newPartPath) + fs.mkdirs(newPartPath) + if(!fs.rename(oldPartPath, newPartPath)) { + throw new IOException(s"Renaming partition path from $oldPartPath to " + + s"$newPartPath returned false") + } } catch { case e: IOException => throw QueryExecutionErrors.unableToRenamePartitionPathError(oldPartPath, e) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 9de3b0e954848..d3c21032d0b32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources -import java.io.File +import java.io.{File, IOException} import java.sql.Date import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem} @@ -950,6 +950,110 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { checkAnswer(spark.table("t2"), Nil) } } + + test("SPARK-35106: insert overwrite with custom partition path") { + withTempPath { path => + withTable("t") { + sql( + """ + |create table t(i int, part1 int, part2 int) using parquet + |partitioned by (part1, part2) + """.stripMargin) + + sql(s"alter table t add partition(part1=1, part2=1) location '${path.getAbsolutePath}'") + sql(s"insert into t partition(part1=1, part2=1) select 1") + checkAnswer(spark.table("t"), Row(1, 1, 1)) + + sql("insert overwrite table t partition(part1=1, part2=1) select 2") + checkAnswer(spark.table("t"), Row(2, 1, 1)) + + sql("insert overwrite table t partition(part1=2, part2) select 2, 2") + checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil) + + sql("insert overwrite table t partition(part1=1, part2=2) select 3") + checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil) + + sql("insert overwrite table t partition(part1=1, part2) select 4, 1") + checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Nil) + } + } + } + + test("SPARK-35106: dynamic partition overwrite with custom partition path") { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { + withTempPath { path => + withTable("t") { + sql( + """ + |create table t(i int, part1 int, part2 int) using parquet + |partitioned by (part1, part2) + """.stripMargin) + + sql(s"insert into t partition(part1=1, part2=1) select 1") + checkAnswer(spark.table("t"), Row(1, 1, 1)) + + sql(s"alter table t add partition(part1=1, part2=2) location '${path.getAbsolutePath}'") + + // dynamic partition overwrite to empty custom partition + sql(s"insert overwrite table t partition(part1=1, part2=2) select 1") + checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(1, 1, 2) :: Nil) + + // dynamic partition overwrite to non-empty custom partition + sql("insert overwrite table t partition(part1=1, part2=2) select 2") + checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(2, 1, 2) :: Nil) + } + } + } + } + + test("SPARK-35106: Throw exception when rename custom partition paths returns false") { + withSQLConf( + "fs.file.impl" -> classOf[RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem].getName, + "fs.file.impl.disable.cache" -> "true") { + withTempPath { path => + withTable("t") { + sql( + """ + |create table t(i int, part1 int, part2 int) using parquet + |partitioned by (part1, part2) + """.stripMargin) + + sql(s"alter table t add partition(part1=1, part2=1) location '${path.getAbsolutePath}'") + + val e = intercept[SparkException] { + sql(s"insert into t partition(part1=1, part2=1) select 1") + }.getCause + assert(e.isInstanceOf[IOException]) + assert(e.getMessage.contains("Failed to rename")) + assert(e.getMessage.contains("when committing files staged for absolute location")) + } + } + } + } + + test("SPARK-35106: Throw exception when rename dynamic partition paths returns false") { + withSQLConf( + "fs.file.impl" -> classOf[RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem].getName, + "fs.file.impl.disable.cache" -> "true", + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { + + withTable("t") { + sql( + """ + |create table t(i int, part1 int, part2 int) using parquet + |partitioned by (part1, part2) + """.stripMargin) + + val e = intercept[SparkException] { + sql(s"insert overwrite table t partition(part1, part2) values (1, 1, 1)") + }.getCause + assert(e.isInstanceOf[IOException]) + assert(e.getMessage.contains("Failed to rename")) + assert(e.getMessage.contains( + "when committing files staged for overwriting dynamic partitions")) + } + } + } } class FileExistingTestFileSystem extends RawLocalFileSystem { @@ -962,3 +1066,13 @@ class FileExistingTestFileSystem extends RawLocalFileSystem { throw new FileAlreadyExistsException(s"${f.toString} already exists") } } + +class RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem { + override def rename(src: Path, dst: Path): Boolean = { + (!isSparkStagingDir(src) || isSparkStagingDir(dst)) && super.rename(src, dst) + } + + private def isSparkStagingDir(path: Path): Boolean = { + path.toString.contains(".spark-staging-") + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 7f50d522ff42c..9bcc19b1605ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1073,7 +1073,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // scalastyle:on caselocale val actualPartitionPath = new Path(currentFullPath, actualPartitionString) try { - fs.rename(actualPartitionPath, expectedPartitionPath) + fs.mkdirs(expectedPartitionPath) + if(!fs.rename(actualPartitionPath, expectedPartitionPath)) { + throw new IOException(s"Renaming partition path from $actualPartitionPath to " + + s"$expectedPartitionPath returned false") + } } catch { case e: IOException => throw new SparkException("Unable to rename partition path from " +