Skip to content

Commit

Permalink
SPARK-3276 Changed the property name to reflect the unit of value and…
Browse files Browse the repository at this point in the history
… reduced number of fields.

* Changed the property name to spark.streaming.minRememberDurationMin to reflect the unit of value (minutes).
* Deleted the constant MIN_REMEMBER_DURATION, because now minRememberDurationMin is serving the same purpose.
  • Loading branch information
emres committed Apr 9, 2015
1 parent 43cc1ce commit daccc82
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* the streaming app.
* - If a file is to be visible in the directory listings, it must be visible within a certain
* duration of the mod time of the file. This duration is the "remember window", which is set to
* 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be
* 1 minute (see `FileInputDStream.minRememberDurationMin`). Otherwise, the file will never be
* selected as the mod time will be less than the ignore threshold when it becomes visible.
* - Once a file is visible, the mod time cannot change. If it does due to appends, then the
* processing semantics are undefined.
Expand Down Expand Up @@ -336,16 +336,17 @@ object FileInputDStream {
* Files with mod times older than this "window" of remembering will be ignored. So if new
* files are visible within this window, then the file will get selected in the next batch.
*/
private val minRememberDuration = new SparkConf().get("spark.streaming.minRememberDuration", "1")
private val MIN_REMEMBER_DURATION = Minutes(minRememberDuration.toLong)
private val minRememberDurationMin = Minutes(new SparkConf()
.get("spark.streaming.minRememberDurationMin", "1")
.toLong)

def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")

/**
* Calculate the number of last batches to remember, such that all the files selected in
* at least last MIN_REMEMBER_DURATION duration can be remembered.
* at least last minRememberDurationMin duration can be remembered.
*/
def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt
}
}

0 comments on commit daccc82

Please sign in to comment.