Skip to content

Commit

Permalink
Add test case.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 31, 2019
1 parent 0d11021 commit 27047da
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
Expand Up @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.util.{Date, UUID}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileAlreadyExistsException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Expand Down
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.sources
import java.io.File
import java.sql.Date

import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem}

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -735,4 +737,35 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
assert(msg.contains("Cannot write nullable values to non-null column 's'"))
}
}

test("SPARK-29649: Stop task set if FileAlreadyExistsException was thrown") {
withSQLConf("fs.file.impl" -> classOf[FileExistingTestFileSystem].getName,
"fs.file.impl.disable.cache" -> "true") {
withTable("t") {
sql(
"""
|create table t(i int, part1 int) using parquet
|partitioned by (part1)
""".stripMargin)

val df = Seq((1, 1)).toDF("i", "part1")
val err = intercept[SparkException] {
df.write.mode("overwrite").format("parquet").insertInto("t")
}
assert(err.getCause.getMessage.contains("can not write to output file: " +
"org.apache.hadoop.fs.FileAlreadyExistsException"))
}
}
}
}

class FileExistingTestFileSystem extends RawLocalFileSystem {
override def create(
f: Path,
overwrite: Boolean,
bufferSize: Int,
replication: Short,
blockSize: Long): FSDataOutputStream = {
throw new FileAlreadyExistsException(s"${f.toString} already exists")
}
}

0 comments on commit 27047da

Please sign in to comment.