Skip to content

Commit

Permalink
add some comment
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Sep 20, 2019
1 parent 1c36bfe commit 12f6300
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

// Used to store the paths, which are being processed. This enable the replay log tasks execute
// asynchronously and make sure that checkForLogs would not process a path repeatedly.
private val processing = ConcurrentHashMap.newKeySet[String]

private def isProcessing(path: Path): Boolean = {
Expand All @@ -170,7 +172,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
processing.add(path.getName)
}

private def endProcess(path: Path): Unit = {
private def endProcessing(path: Path): Unit = {
processing.remove(path.getName)
}

Expand Down Expand Up @@ -685,7 +687,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
endProcess(fileStatus.getPath)
endProcessing(fileStatus.getPath)
pendingReplayTasksCount.decrementAndGet()
}
}
Expand Down

0 comments on commit 12f6300

Please sign in to comment.