From 7d8e37d7ba4aaaf06d0f8be5f6a047c43c91f8aa Mon Sep 17 00:00:00 2001 From: "moshe.good" Date: Fri, 4 Mar 2016 11:49:01 -0500 Subject: [PATCH] FileInputDStream should read old files when newFilesOnly is set to false --- .../spark/streaming/dstream/FileInputDStream.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 7fba2e8ec0e7a..d99f8a923b919 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -103,6 +103,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Initial ignore threshold based on which old, existing files in the directory (at the time of // starting the streaming application) will be ignored or considered private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.getTimeMillis() else 0L + private var initialFileScan = true /* * Make sure that the information of files selected in the last few batches are remembered. @@ -189,10 +190,13 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( lastNewFileFindingTime = clock.getTimeMillis() // Calculate ignore threshold - val modTimeIgnoreThreshold = math.max( - initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting - currentTime - durationToRemember.milliseconds // trailing end of the remember window - ) + val modTimeIgnoreThreshold = if (initialFileScan) { + initialFileScan = false + initialModTimeIgnoreThreshold // initial threshold based on newFilesOnly setting + } else { + currentTime - durationToRemember.milliseconds // trailing end of the remember window + } + logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") val filter = new PathFilter {