Skip to content

Commit

Permalink
Fix HDFSMetadataLogSuite too
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Dec 19, 2016
1 parent 16ad4e1 commit e309285
Showing 1 changed file with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.sql.execution.streaming.FakeFileSystem._
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.UninterruptibleThread
import org.apache.spark.util.{UninterruptibleThread, Utils}

class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {

Expand Down Expand Up @@ -88,14 +88,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
s"fs.$scheme.impl",
classOf[FakeFileSystem].getName)
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://$temp")
val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
assert(metadataLog.get(0) === Some("batch0"))
assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0"))


val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://$temp")
val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
assert(metadataLog2.get(0) === Some("batch0"))
assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0"))
Expand Down Expand Up @@ -209,14 +209,21 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}

// Open and delete
val f1 = fm.open(path)
fm.delete(path)
assert(!fm.exists(path))
intercept[IOException] {
fm.open(path)
// Open and delete
if (Utils.isWindows) {
Utils.tryWithResource(fm.open(path))(_ => ())
fm.delete(path)
fm.delete(path) // should not throw exception
} else {
Utils.tryWithResource(fm.open(path)) { _ =>
fm.delete(path)
assert(!fm.exists(path))
intercept[IOException] {
fm.open(path)
}
fm.delete(path) // should not throw exception
}
}
fm.delete(path) // should not throw exception
f1.close()

// Rename
val path1 = new Path(s"$dir/file1")
Expand Down

0 comments on commit e309285

Please sign in to comment.