Skip to content

Commit

Permalink
Fixed more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 12, 2018
1 parent f9965f1 commit 1b23492
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 51 deletions.
Expand Up @@ -94,7 +94,7 @@ object CheckpointFileManager extends Logging {
*/
sealed trait RenameHelperMethods { self => CheckpointFileManager
/** Create a file with overwrite. */
def create(path: Path): FSDataOutputStream
def createTempFile(path: Path): FSDataOutputStream

/**
* Rename a file.
Expand All @@ -107,7 +107,7 @@ object CheckpointFileManager extends Logging {
* implementation must not overwrite if the file alraedy exists and
* must throw `FileAlreadyExistsException` in that case.
*/
def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit
def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit
}

/**
Expand All @@ -131,7 +131,7 @@ object CheckpointFileManager extends Logging {
finalPath: Path,
tempPath: Path,
overwriteIfPossible: Boolean)
extends CancellableFSDataOutputStream(fm.create(tempPath)) {
extends CancellableFSDataOutputStream(fm.createTempFile(tempPath)) {

def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, overwrite: Boolean) = {
this(fm, path, generateTempPath(path), overwrite)
Expand All @@ -143,8 +143,15 @@ object CheckpointFileManager extends Logging {
override def close(): Unit = synchronized {
try {
if (terminated) return
super.close()
fm.rename(tempPath, finalPath, overwriteIfPossible)
underlyingStream.close()
try {
fm.renameTempFile(tempPath, finalPath, overwriteIfPossible)
} catch {
case fe: FileAlreadyExistsException =>
logWarning(
s"Failed to rename temp file $tempPath to $finalPath because file exists", fe)
if (!overwriteIfPossible) throw fe
}
logInfo(s"Renamed temp file $tempPath to $finalPath")
} finally {
terminated = true
Expand Down Expand Up @@ -208,9 +215,6 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration

protected val fs = path.getFileSystem(hadoopConf)

fs.setVerifyChecksum(false)
fs.setWriteChecksum(false)

override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
fs.listStatus(path, filter)
}
Expand All @@ -219,7 +223,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
fs.mkdirs(path, FsPermission.getDirDefault)
}

override def create(path: Path): FSDataOutputStream = {
override def createTempFile(path: Path): FSDataOutputStream = {
fs.create(path, true)
}

Expand All @@ -236,33 +240,29 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
fs.exists(path)
}

override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
if (!overwriteIfPossible && fs.exists(dstPath)) {
throw new FileAlreadyExistsException(
s"Failed to rename $srcPath to $dstPath as destination already exists")
}

try {
if (!fs.rename(srcPath, dstPath) && !overwriteIfPossible) {
if (fs.exists(dstPath)) {
// Some implementations of FileSystem may not throw FileAlreadyExistsException but
// only return false if file already exists. Explicitly throw the error.
// Note that this is definitely not atomic, so this is only a best-effort attempt
// to throw the most appropriate exception when rename returned false.
if (!fs.rename(srcPath, dstPath)) {
// If overwriteIfPossible = false, then we want to find out why the rename failed and
// try to throw the right error.
if (fs.exists(dstPath)) {
// Some implementations of FileSystem may only return false instead of throwing
// FileAlreadyExistsException. In that case, explicitly throw the error the error
// if overwriteIfPossible = false. Note that this is definitely not atomic.
// This is only a best-effort attempt to identify the situation when rename returned
// false.
if (!overwriteIfPossible) {
throw new FileAlreadyExistsException(s"$dstPath already exists")
} else {
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
logWarning(msg)
throw new IOException(msg)
}
} else {
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
logWarning(msg)
throw new IOException(msg)
}
} catch {
case fe: FileAlreadyExistsException =>
// Some implementation of FileSystem can directly throw FileAlreadyExistsException if file
// already exists. Ignore the error if overwriteIfPossible = true as it is expected to be
// best effort.
logWarning(s"Failed to rename temp file $srcPath to $dstPath because file exists", fe)
if (!overwriteIfPossible) throw fe
}
}

Expand Down Expand Up @@ -303,9 +303,11 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
fc.mkdir(path, FsPermission.getDirDefault, true)
}

override def create(path: Path): FSDataOutputStream = {
override def createTempFile(path: Path): FSDataOutputStream = {
import CreateFlag._
fc.create(path, EnumSet.of(CREATE, OVERWRITE))
import Options._
fc.create(
path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
}

override def createAtomic(
Expand All @@ -321,7 +323,7 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
fc.util.exists(path)
}

override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
import Options.Rename._
fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
}
Expand Down
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -61,9 +62,11 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
assert(!fm.exists(path))
fm.createAtomic(path, overwriteIfPossible = false).close()
assert(fm.exists(path))
intercept[IOException] {
// should throw exception since file exists and overwrite is false
fm.createAtomic(path, overwriteIfPossible = false).close()
quietly {
intercept[IOException] {
// should throw exception since file exists and overwrite is false
fm.createAtomic(path, overwriteIfPossible = false).close()
}
}

// Create atomic with overwrite if possible
Expand Down Expand Up @@ -107,22 +110,22 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession {

test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") {
import FakeFileSystem.scheme
spark.conf.set(
s"fs.$scheme.impl",
classOf[FakeFileSystem].getName)
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
assert(metadataLog.get(0) === Some("batch0"))
assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0"))


val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
assert(metadataLog2.get(0) === Some("batch0"))
assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0"))
spark.conf.set(s"fs.$scheme.impl", classOf[FakeFileSystem].getName)
quietly {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
assert(metadataLog.get(0) === Some("batch0"))
assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0"))


val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
assert(metadataLog2.get(0) === Some("batch0"))
assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0"))

}
}
}
}
Expand Down
Expand Up @@ -85,7 +85,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {

// There should be exactly one file, called "2", in the metadata directory.
// This check also tests for regressions of SPARK-17475
val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
val allFiles = new File(metadataLog.metadataPath.toString).listFiles()
.filter(!_.getName.startsWith(".")).toSeq
assert(allFiles.size == 1)
assert(allFiles(0).getName() == "2")
}
Expand Down Expand Up @@ -136,7 +137,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("HDFSMetadataLog: metadata directory collision") {
testQuietly("HDFSMetadataLog: metadata directory collision") {
withTempDir { temp =>
val waiter = new Waiter
val maxBatchId = 100
Expand Down

0 comments on commit 1b23492

Please sign in to comment.