Skip to content

Commit

Permalink
Add FileStreamSource.VERSION instead of using spark version
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Feb 5, 2016
1 parent 9fd21c2 commit 9a1042c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class FileStreamSource(
providerName: String,
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {

private val version = sqlContext.sparkContext.version
private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
private var maxBatchId = -1
private val seenFiles = new OpenHashSet[String]
Expand Down Expand Up @@ -181,7 +180,7 @@ class FileStreamSource(
val writer = new PrintWriter(new OutputStreamWriter(output, UTF_8))
try {
// scalastyle:off println
writer.println(version)
writer.println(FileStreamSource.VERSION)
writer.println(FileStreamSource.START_TAG)
files.foreach(file => writer.println("-" + file))
writer.println(FileStreamSource.END_TAG)
Expand All @@ -207,6 +206,7 @@ object FileStreamSource {

private val START_TAG = "START"
private val END_TAG = "END"
val VERSION = "FILESTREAM1"

/**
* Parse a metadata file and return the content. If the metadata file is corrupted, it will return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
assert(new File(src, "_metadata").mkdirs())
stringToFile(
new File(src, "_metadata/0"),
s"${sqlContext.sparkContext.version}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
stringToFile(
new File(src, "_metadata/1"),
s"${sqlContext.sparkContext.version}\nSTART\n-")
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
stringToFile(new File(src, "_metadata/1"), s"${FileStreamSource.VERSION}\nSTART\n-")

val textSource = createFileStreamSource("text", src.getCanonicalPath)
// the metadata file of batch is corrupted, so currentOffset should be 0
Expand All @@ -266,10 +264,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
assert(new File(src, "_metadata").mkdirs())
stringToFile(
new File(src, "_metadata/0"),
s"${sqlContext.sparkContext.version}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
stringToFile(
new File(src, "_metadata/1"),
s"${sqlContext.sparkContext.version}\nSTART\n-/x/y/z\nEND\n")
s"${FileStreamSource.VERSION}\nSTART\n-/x/y/z\nEND\n")

val textSource = createFileStreamSource("text", src.getCanonicalPath)
assert(textSource.currentOffset() === LongOffset(1))
Expand All @@ -282,23 +280,21 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {

// Invalid metadata
assert(readBatch(stringToStream("")) === Nil)
assert(readBatch(stringToStream(sqlContext.sparkContext.version)) === Nil)
assert(readBatch(stringToStream(s"${sqlContext.sparkContext.version}\n")) === Nil)
assert(readBatch(stringToStream(s"${sqlContext.sparkContext.version}\nSTART")) === Nil)
assert(readBatch(stringToStream(s"${sqlContext.sparkContext.version}\nSTART\n-")) === Nil)
assert(readBatch(stringToStream(s"${sqlContext.sparkContext.version}\nSTART\n-/a/b/c")) === Nil)
assert(
readBatch(stringToStream(s"${sqlContext.sparkContext.version}\nSTART\n-/a/b/c\n")) === Nil)
assert(
readBatch(stringToStream(s"${sqlContext.sparkContext.version}\nSTART\n-/a/b/c\nEN")) === Nil)
assert(readBatch(stringToStream(FileStreamSource.VERSION)) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\n")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n")) === Nil)
assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEN")) === Nil)

// Valid metadata
assert(readBatch(stringToStream(
s"${sqlContext.sparkContext.version}\nSTART\n-/a/b/c\nEND")) === Seq("/a/b/c"))
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND")) === Seq("/a/b/c"))
assert(readBatch(stringToStream(
s"${sqlContext.sparkContext.version}\nSTART\n-/a/b/c\nEND\n")) === Seq("/a/b/c"))
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND\n")) === Seq("/a/b/c"))
assert(readBatch(stringToStream(
s"${sqlContext.sparkContext.version}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n"))
s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n"))
=== Seq("/a/b/c", "/e/f/g"))
}
}
Expand Down

0 comments on commit 9a1042c

Please sign in to comment.