Skip to content

Commit

Permalink
Separate task between app listing and compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Jan 19, 2020
1 parent 8500453 commit ec3ffd9
Showing 1 changed file with 50 additions and 39 deletions.
Expand Up @@ -571,35 +571,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

/**
* Returns a tuple containing two values. Each element means:
* - 1st (Boolean): true if the list of event log files are changed, false otherwise.
* - 2nd (Option[Long]): Some(value) if the method succeeds to try compaction,
* value indicates the last event log index to try compaction. None otherwise.
*/
private def compact(reader: EventLogFileReader): (Boolean, Option[Long]) = {
reader.lastIndex match {
case Some(lastIndex) =>
try {
val info = listing.read(classOf[LogInfo], reader.rootPath.toString)
if (info.lastEvaluatedForCompaction.isEmpty ||
info.lastEvaluatedForCompaction.get < lastIndex) {
// haven't tried compaction for this index, do compaction
val result = fileCompactor.compact(reader.listEventLogFiles)
(result.code == CompactionResultCode.SUCCESS, Some(lastIndex))
} else {
(false, info.lastEvaluatedForCompaction)
}
} catch {
case _: NoSuchElementException =>
// this should exist, but ignoring doesn't hurt much
(false, None)
}

case None => (false, None) // This is not applied to single event log file.
}
}

private[history] def shouldReloadLog(info: LogInfo, reader: EventLogFileReader): Boolean = {
if (info.isComplete != reader.completed) {
true
Expand Down Expand Up @@ -694,14 +665,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
enableOptimizations: Boolean): Unit = {
val rootPath = reader.rootPath
try {
val (shouldReload, lastCompactionIndex) = compact(reader)
val newReader = if (shouldReload) {
EventLogFileReader(fs, rootPath).get
} else {
reader
val lastEvaluatedForCompaction: Option[Long] = try {
listing.read(classOf[LogInfo], rootPath.toString).lastEvaluatedForCompaction
} catch {
case _: NoSuchElementException => None
}

pendingReplayTasksCount.incrementAndGet()
doMergeApplicationListing(newReader, scanTime, enableOptimizations, lastCompactionIndex)
doMergeApplicationListing(reader, scanTime, enableOptimizations, lastEvaluatedForCompaction)
if (conf.get(CLEANER_ENABLED)) {
checkAndCleanLog(rootPath.toString)
}
Expand All @@ -720,6 +691,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} finally {
endProcessing(rootPath)
pendingReplayTasksCount.decrementAndGet()

// triggering another task for compaction task
try {
processing(rootPath)
val task: Runnable = () => compact(reader)
replayExecutor.submit(task)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting task for compaction", e)
endProcessing(rootPath)
}
}
}

Expand All @@ -731,7 +716,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean,
lastCompactionIndex: Option[Long]): Unit = {
lastEvaluatedForCompaction: Option[Long]): Unit = {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
eventString.startsWith(APPL_END_EVENT_PREFIX) ||
Expand Down Expand Up @@ -810,7 +795,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
addListing(app)
listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(app.info.id),
app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, reader.lastIndex,
lastCompactionIndex, reader.completed))
lastEvaluatedForCompaction, reader.completed))

// For a finished log, remove the corresponding "in progress" entry from the listing DB if
// the file is really gone.
Expand All @@ -835,15 +820,41 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// re-parse the whole log.
logInfo(s"Reparsing $logPath since end event was not found.")
doMergeApplicationListing(reader, scanTime, enableOptimizations = false,
lastCompactionIndex)
lastEvaluatedForCompaction)

case _ =>
// If the app hasn't written down its app ID to the logs, still record the entry in the
// listing db, with an empty ID. This will make the log eligible for deletion if the app
// does not make progress after the configured max log age.
listing.write(
LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None,
reader.fileSizeForLastIndex, reader.lastIndex, lastCompactionIndex, reader.completed))
reader.fileSizeForLastIndex, reader.lastIndex, lastEvaluatedForCompaction,
reader.completed))
}
}

private def compact(reader: EventLogFileReader): Unit = {
val rootPath = reader.rootPath
try {
reader.lastIndex match {
case Some(lastIndex) =>
try {
val info = listing.read(classOf[LogInfo], reader.rootPath.toString)
if (info.lastEvaluatedForCompaction.isEmpty ||
info.lastEvaluatedForCompaction.get < lastIndex) {
// haven't tried compaction for this index, do compaction
fileCompactor.compact(reader.listEventLogFiles)
listing.write(info.copy(lastEvaluatedForCompaction = Some(lastIndex)))
}
} catch {
case _: NoSuchElementException =>
// this should exist, but ignoring doesn't hurt much
}

case None => // This is not applied to single event log file.
}
} finally {
endProcessing(rootPath)
}
}

Expand Down

0 comments on commit ec3ffd9

Please sign in to comment.