diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index c4fa891a6697c..19965b27c688b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.util.{Date, UUID} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileAlreadyExistsException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 9e33b8aaec5d9..555ee05faa93a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.sources import java.io.File import java.sql.Date +import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem} + import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -735,4 +737,35 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { assert(msg.contains("Cannot write nullable values to non-null column 's'")) } } + + test("SPARK-29649: Stop task set if FileAlreadyExistsException was thrown") { + withSQLConf("fs.file.impl" -> classOf[FileExistingTestFileSystem].getName, + "fs.file.impl.disable.cache" -> "true") { + withTable("t") { + sql( + """ + |create table t(i int, part1 int) using parquet + |partitioned by (part1) + """.stripMargin) + + val df = Seq((1, 1)).toDF("i", "part1") + val err = intercept[SparkException] { + df.write.mode("overwrite").format("parquet").insertInto("t") + } + assert(err.getCause.getMessage.contains("can not write to output file: " + + "org.apache.hadoop.fs.FileAlreadyExistsException")) + } + } + } +} + +class FileExistingTestFileSystem extends RawLocalFileSystem { + override def create( + f: Path, + overwrite: Boolean, + bufferSize: Int, + replication: Short, + blockSize: Long): FSDataOutputStream = { + throw new FileAlreadyExistsException(s"${f.toString} already exists") + } }