Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Dec 14, 2019
1 parent c6ac35e commit e9ebb6f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
try {
pendingReplayTasksCount.incrementAndGet()
doMergeApplicationListing(reader, scanTime, enableOptimizations)
if (conf.get(CLEANER_ENABLED)) {
checkAndCleanLog(reader.rootPath.toString)
}
} catch {
case e: InterruptedException =>
throw e
Expand All @@ -680,9 +683,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} finally {
endProcessing(reader.rootPath)
pendingReplayTasksCount.decrementAndGet()
if (conf.get(CLEANER_ENABLED)) {
checkAndCleanLog(reader.rootPath.toString)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1322,21 +1322,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}

test("SPARK-29043: clean up specified event log") {
def getLogPath(logFile: File): String = {
val uri = logFile.toURI
uri.getScheme + File.pathSeparator + uri.getPath
}

val clock = new ManualClock()
val conf = createTestConf().set(MAX_LOG_AGE_S.key, "2d")
val conf = createTestConf().set(MAX_LOG_AGE_S.key, "0").set(CLEANER_ENABLED.key, "true")
val provider = new FsHistoryProvider(conf, clock)

// create an invalid application log file
val nonValidLogFile = newLogFile("NonValidLogFile", None, inProgress = true)
nonValidLogFile.createNewFile()
writeFile(nonValidLogFile, None,
SparkListenerApplicationStart(nonValidLogFile.getName, None, 1L, "test", None))
nonValidLogFile.setLastModified(clock.getTimeMillis())
val inValidLogFile = newLogFile("inValidLogFile", None, inProgress = true)
inValidLogFile.createNewFile()
writeFile(inValidLogFile, None,
SparkListenerApplicationStart(inValidLogFile.getName, None, 1L, "test", None))
inValidLogFile.setLastModified(clock.getTimeMillis())

// create a valid application log file
val validLogFile = newLogFile("validLogFile", None, inProgress = true)
Expand All @@ -1346,12 +1341,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
validLogFile.setLastModified(clock.getTimeMillis())

provider.checkForLogs()
clock.advance(TimeUnit.DAYS.toMillis(2))
provider.checkAndCleanLog(getLogPath(nonValidLogFile))
// The invalid application log file would be cleaned by checkAndCleanLog().
assert(new File(testDir.toURI).listFiles().size === 1)

clock.advance(1)
provider.checkAndCleanLog(getLogPath(validLogFile))
// cleanLogs() would clean the valid application log file.
provider.cleanLogs()
assert(new File(testDir.toURI).listFiles().size === 0)
}

Expand Down

0 comments on commit e9ebb6f

Please sign in to comment.