From 1b23492017b8209b6198e6886e06a5cd9d59b9db Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 11 Apr 2018 19:09:52 -0700 Subject: [PATCH] Fixed more tests --- .../streaming/CheckpointFileManager.scala | 64 ++++++++++--------- .../CheckpointFileManagerSuite.scala | 39 +++++------ .../streaming/HDFSMetadataLogSuite.scala | 5 +- 3 files changed, 57 insertions(+), 51 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 331c85365f78e..5d3faed525770 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -94,7 +94,7 @@ object CheckpointFileManager extends Logging { */ sealed trait RenameHelperMethods { self => CheckpointFileManager /** Create a file with overwrite. */ - def create(path: Path): FSDataOutputStream + def createTempFile(path: Path): FSDataOutputStream /** * Rename a file. @@ -107,7 +107,7 @@ object CheckpointFileManager extends Logging { * implementation must not overwrite if the file alraedy exists and * must throw `FileAlreadyExistsException` in that case. */ - def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit + def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit } /** @@ -131,7 +131,7 @@ object CheckpointFileManager extends Logging { finalPath: Path, tempPath: Path, overwriteIfPossible: Boolean) - extends CancellableFSDataOutputStream(fm.create(tempPath)) { + extends CancellableFSDataOutputStream(fm.createTempFile(tempPath)) { def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, overwrite: Boolean) = { this(fm, path, generateTempPath(path), overwrite) @@ -143,8 +143,15 @@ object CheckpointFileManager extends Logging { override def close(): Unit = synchronized { try { if (terminated) return - super.close() - fm.rename(tempPath, finalPath, overwriteIfPossible) + underlyingStream.close() + try { + fm.renameTempFile(tempPath, finalPath, overwriteIfPossible) + } catch { + case fe: FileAlreadyExistsException => + logWarning( + s"Failed to rename temp file $tempPath to $finalPath because file exists", fe) + if (!overwriteIfPossible) throw fe + } logInfo(s"Renamed temp file $tempPath to $finalPath") } finally { terminated = true @@ -208,9 +215,6 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration protected val fs = path.getFileSystem(hadoopConf) - fs.setVerifyChecksum(false) - fs.setWriteChecksum(false) - override def list(path: Path, filter: PathFilter): Array[FileStatus] = { fs.listStatus(path, filter) } @@ -219,7 +223,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration fs.mkdirs(path, FsPermission.getDirDefault) } - override def create(path: Path): FSDataOutputStream = { + override def createTempFile(path: Path): FSDataOutputStream = { fs.create(path, true) } @@ -236,33 +240,29 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration fs.exists(path) } - override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { + override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { if (!overwriteIfPossible && fs.exists(dstPath)) { throw new FileAlreadyExistsException( s"Failed to rename $srcPath to $dstPath as destination already exists") } - try { - if (!fs.rename(srcPath, dstPath) && !overwriteIfPossible) { - if (fs.exists(dstPath)) { - // Some implementations of FileSystem may not throw FileAlreadyExistsException but - // only return false if file already exists. Explicitly throw the error. - // Note that this is definitely not atomic, so this is only a best-effort attempt - // to throw the most appropriate exception when rename returned false. + if (!fs.rename(srcPath, dstPath)) { + // If overwriteIfPossible = false, then we want to find out why the rename failed and + // try to throw the right error. + if (fs.exists(dstPath)) { + // Some implementations of FileSystem may only return false instead of throwing + // FileAlreadyExistsException. In that case, explicitly throw the error the error + // if overwriteIfPossible = false. Note that this is definitely not atomic. + // This is only a best-effort attempt to identify the situation when rename returned + // false. + if (!overwriteIfPossible) { throw new FileAlreadyExistsException(s"$dstPath already exists") - } else { - val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false" - logWarning(msg) - throw new IOException(msg) } + } else { + val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false" + logWarning(msg) + throw new IOException(msg) } - } catch { - case fe: FileAlreadyExistsException => - // Some implementation of FileSystem can directly throw FileAlreadyExistsException if file - // already exists. Ignore the error if overwriteIfPossible = true as it is expected to be - // best effort. - logWarning(s"Failed to rename temp file $srcPath to $dstPath because file exists", fe) - if (!overwriteIfPossible) throw fe } } @@ -303,9 +303,11 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio fc.mkdir(path, FsPermission.getDirDefault, true) } - override def create(path: Path): FSDataOutputStream = { + override def createTempFile(path: Path): FSDataOutputStream = { import CreateFlag._ - fc.create(path, EnumSet.of(CREATE, OVERWRITE)) + import Options._ + fc.create( + path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled())) } override def createAtomic( @@ -321,7 +323,7 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio fc.util.exists(path) } - override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { + override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index 185b96c83e285..83f42f9a59ff3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils @@ -61,9 +62,11 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { assert(!fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = false).close() assert(fm.exists(path)) - intercept[IOException] { - // should throw exception since file exists and overwrite is false - fm.createAtomic(path, overwriteIfPossible = false).close() + quietly { + intercept[IOException] { + // should throw exception since file exists and overwrite is false + fm.createAtomic(path, overwriteIfPossible = false).close() + } } // Create atomic with overwrite if possible @@ -107,22 +110,22 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession { test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") { import FakeFileSystem.scheme - spark.conf.set( - s"fs.$scheme.impl", - classOf[FakeFileSystem].getName) - withTempDir { temp => - val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") - assert(metadataLog.add(0, "batch0")) - assert(metadataLog.getLatest() === Some(0 -> "batch0")) - assert(metadataLog.get(0) === Some("batch0")) - assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0")) - - - val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") - assert(metadataLog2.get(0) === Some("batch0")) - assert(metadataLog2.getLatest() === Some(0 -> "batch0")) - assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0")) + spark.conf.set(s"fs.$scheme.impl", classOf[FakeFileSystem].getName) + quietly { + withTempDir { temp => + val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") + assert(metadataLog.add(0, "batch0")) + assert(metadataLog.getLatest() === Some(0 -> "batch0")) + assert(metadataLog.get(0) === Some("batch0")) + assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0")) + + val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") + assert(metadataLog2.get(0) === Some("batch0")) + assert(metadataLog2.getLatest() === Some(0 -> "batch0")) + assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0")) + + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 610f0cab955b0..157227f2c4545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -85,7 +85,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { // There should be exactly one file, called "2", in the metadata directory. // This check also tests for regressions of SPARK-17475 - val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + val allFiles = new File(metadataLog.metadataPath.toString).listFiles() + .filter(!_.getName.startsWith(".")).toSeq assert(allFiles.size == 1) assert(allFiles(0).getName() == "2") } @@ -136,7 +137,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("HDFSMetadataLog: metadata directory collision") { + testQuietly("HDFSMetadataLog: metadata directory collision") { withTempDir { temp => val waiter = new Waiter val maxBatchId = 100