Skip to content

Commit

Permalink
truncate processing db when initial
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Sep 15, 2019
1 parent 4bfb418 commit fa89be3
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)

private def getKVStore(path: File, dbName: String): KVStore = {
private def getKVStore(path: File, dbName: String, truncate: Boolean = false): KVStore = {
if (truncate) {
Files.deleteIfExists(new File(path, s"${dbName}.ldb").toPath)
}
val dbPath = Files.createDirectories(new File(path, s"${dbName}.ldb").toPath()).toFile()
Utils.chmod700(dbPath)

Expand Down Expand Up @@ -162,7 +165,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}.getOrElse(new InMemoryStore())

private[history] val processing: KVStore = storePath.map { path =>
getKVStore(path, "processing")
// We should truncate process db when initial.
getKVStore(path, "processing", truncate = true)
}.getOrElse(new InMemoryStore())

private val diskManager = storePath.map { path =>
Expand Down

0 comments on commit fa89be3

Please sign in to comment.