Skip to content

Commit

Permalink
fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
viper-kun committed Jan 29, 2015
1 parent adcfe86 commit 70c28d6
Showing 1 changed file with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
value
}

private def getDeprecatedConfig(conf: SparkConf, key: String): Option[String] = {
conf.getOption(key).map(warnUpdateInterval(key, _))
}

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
.orElse(conf.getOption("spark.history.fs.updateInterval")
.map(warnUpdateInterval("spark.history.fs.updateInterval", _)))
.orElse(conf.getOption("spark.history.updateInterval")
.map(warnUpdateInterval("spark.history.updateInterval", _)))
.orElse(getDeprecatedConfig(conf, "spark.history.fs.updateInterval"))
.orElse(getDeprecatedConfig(conf, "spark.history.updateInterval"))
.map(_.toInt)
.getOrElse(10) * 1000

Expand All @@ -79,8 +81,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))

// The schedule thread pool size must be one, otherwise it will have concurrent issues about fs
// and applications between check task and clean task..
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
// and applications between check task and clean task.
private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("spark-history-task-%d").setDaemon(true).build())

Expand Down Expand Up @@ -129,12 +131,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
"Logging directory specified is not a directory: %s".format(logDir))
}

// A task that periodically checks for event log updates on disk.
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS)
// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS)

if (conf.getBoolean("spark.history.fs.cleaner.enable", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, TimeUnit.MILLISECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enable", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
TimeUnit.MILLISECONDS)
}
}
}

Expand Down

0 comments on commit 70c28d6

Please sign in to comment.