Skip to content


[SPARK-30481][CORE] Integrate event log compactor into Spark History …
Browse files Browse the repository at this point in the history

### What changes were proposed in this pull request?

This patch addresses remaining functionality on event log compaction: integrate compaction into FsHistoryProvider.

This patch is next task of SPARK-30479 (#27164), please refer the description of PR #27085 to see overall rationalization of this patch.

### Why are the changes needed?

One of major goal of SPARK-28594 is to prevent the event logs to become too huge, and SPARK-29779 achieves the goal. We've got another approach in prior, but the old approach required models in both KVStore and live entities to guarantee compatibility, while they're not designed to do so.

### Does this PR introduce any user-facing change?


### How was this patch tested?

Added UT.

Closes #27208 from HeartSaVioR/SPARK-30481.

Authored-by: Jungtaek Lim (HeartSaVioR) <>
Signed-off-by: Marcelo Vanzin <>
  • Loading branch information
HeartSaVioR authored and vanzin committed Jan 29, 2020
1 parent 580c2b7 commit a2fe73b
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 99 deletions.
Expand Up @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.history.EventFilter.FilterStatistics
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ReplayListenerBus
import org.apache.spark.util.Utils

Expand All @@ -49,9 +48,11 @@ import org.apache.spark.util.Utils
class EventLogFileCompactor(
sparkConf: SparkConf,
hadoopConf: Configuration,
fs: FileSystem) extends Logging {
private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN)
private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD)
fs: FileSystem,
maxFilesToRetain: Int,
compactionThresholdScore: Double) extends Logging {

require(maxFilesToRetain > 0, "Max event log files to retain should be higher than 0.")

* Compacts the old event log files into one compact file, and clean old event log files being
Expand Down
Expand Up @@ -158,6 +158,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)

private val fileCompactor = new EventLogFileCompactor(conf, hadoopConf, fs,

// Used to store the paths, which are being processed. This enable the replay log tasks execute
// asynchronously and make sure that checkForLogs would not process a path repeatedly.
private val processing = ConcurrentHashMap.newKeySet[String]
Expand Down Expand Up @@ -475,10 +478,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

if (shouldReloadLog(info, reader)) {
// ignore fastInProgressParsing when the status of application is changed from
// in-progress to completed, which is needed for rolling event log.
if (info.appId.isDefined && (info.isComplete == reader.completed) &&
fastInProgressParsing) {
// ignore fastInProgressParsing when rolling event log is enabled on the log path,
// to ensure proceeding compaction even fastInProgressParsing is turned on.
if (info.appId.isDefined && reader.lastIndex.isEmpty && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
// Also, we need to update the `lastUpdated time` to display the updated time in
Expand Down Expand Up @@ -518,7 +520,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// to parse it. This will allow the cleaner code to detect the file as stale later on
// if it was not possible to parse it.
listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime, LogType.EventLogs,
None, None, reader.fileSizeForLastIndex, reader.lastIndex,
None, None, reader.fileSizeForLastIndex, reader.lastIndex, None,
reader.fileSizeForLastIndex > 0
Expand All @@ -532,16 +534,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

updated.foreach { entry =>
try {
val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
submitLogProcessTask(entry.rootPath) { () =>
mergeApplicationListing(entry, newLastScanTime, true)

Expand Down Expand Up @@ -661,27 +655,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
val rootPath = reader.rootPath
try {
val lastEvaluatedForCompaction: Option[Long] = try {[LogInfo], rootPath.toString).lastEvaluatedForCompaction
} catch {
case _: NoSuchElementException => None

doMergeApplicationListing(reader, scanTime, enableOptimizations)
doMergeApplicationListing(reader, scanTime, enableOptimizations, lastEvaluatedForCompaction)
if (conf.get(CLEANER_ENABLED)) {
} catch {
case e: InterruptedException =>
throw e
case e: AccessControlException =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log ${reader.rootPath}", e)
logWarning(s"Unable to read log $rootPath", e)
// SPARK-28157 We should remove this blacklisted entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], reader.rootPath.toString)
listing.delete(classOf[LogInfo], rootPath.toString)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {

// triggering another task for compaction task
submitLogProcessTask(rootPath) { () => compact(reader) }

Expand All @@ -692,7 +696,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] def doMergeApplicationListing(
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
enableOptimizations: Boolean,
lastEvaluatedForCompaction: Option[Long]): Unit = {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
eventString.startsWith(APPL_END_EVENT_PREFIX) ||
Expand Down Expand Up @@ -770,8 +775,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(,, reader.fileSizeForLastIndex,
reader.lastIndex, reader.completed)), reader.fileSizeForLastIndex, reader.lastIndex,
lastEvaluatedForCompaction, reader.completed))

// For a finished log, remove the corresponding "in progress" entry from the listing DB if
// the file is really gone.
Expand All @@ -795,15 +800,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// mean the end event is before the configured threshold, so call the method again to
// re-parse the whole log.
logInfo(s"Reparsing $logPath since end event was not found.")
doMergeApplicationListing(reader, scanTime, enableOptimizations = false)
doMergeApplicationListing(reader, scanTime, enableOptimizations = false,

case _ =>
// If the app hasn't written down its app ID to the logs, still record the entry in the
// listing db, with an empty ID. This will make the log eligible for deletion if the app
// does not make progress after the configured max log age.
LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None,
reader.fileSizeForLastIndex, reader.lastIndex, reader.completed))
reader.fileSizeForLastIndex, reader.lastIndex, lastEvaluatedForCompaction,

private def compact(reader: EventLogFileReader): Unit = {
val rootPath = reader.rootPath
try {
reader.lastIndex match {
case Some(lastIndex) =>
try {
val info =[LogInfo], reader.rootPath.toString)
if (info.lastEvaluatedForCompaction.isEmpty ||
info.lastEvaluatedForCompaction.get < lastIndex) {
// haven't tried compaction for this index, do compaction
listing.write(info.copy(lastEvaluatedForCompaction = Some(lastIndex)))
} catch {
case _: NoSuchElementException =>
// this should exist, but ignoring doesn't hurt much

case None => // This is not applied to single event log file.
} finally {

Expand Down Expand Up @@ -962,7 +994,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case e: NoSuchElementException =>
// For every new driver log file discovered, create a new entry in listing
listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs, None,
None, f.getLen(), None, false))
None, f.getLen(), None, None, false))
if (deleteFile) {
Expand All @@ -989,9 +1021,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

* Rebuilds the application state store from its event log.
* Rebuilds the application state store from its event log. Exposed for testing.
private def rebuildAppStore(
private[spark] def rebuildAppStore(
store: KVStore,
reader: EventLogFileReader,
lastUpdated: Long): Unit = {
Expand All @@ -1010,8 +1042,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} replayBus.addListener(listener)

try {
val eventLogFiles = reader.listEventLogFiles
logInfo(s"Parsing ${reader.rootPath} to re-build UI...")
parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed)
parseAppEventLogs(eventLogFiles, replayBus, !reader.completed)
logInfo(s"Finished parsing ${reader.rootPath}")
} catch {
Expand Down Expand Up @@ -1122,30 +1155,59 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.

val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
val isCompressed = reader.compressionCodec.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${}...")
val lease =, isCompressed)
val newStorePath = try {
Utils.tryWithResource(, metadata)) { store =>
rebuildAppStore(store, reader,
var retried = false
var newStorePath: File = null
while (newStorePath == null) {
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
val isCompressed = reader.compressionCodec.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${}...")
val lease =, isCompressed)
try {
Utils.tryWithResource(, metadata)) { store =>
rebuildAppStore(store, reader,
newStorePath = lease.commit(appId,
} catch {
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding app $appId - trying again...")
retried = true

case e: Exception =>
throw e
} catch {
case e: Exception =>
throw e
}, metadata)

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
val store = new InMemoryStore()
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
rebuildAppStore(store, reader,
var retried = false
var store: KVStore = null
while (store == null) {
try {
val s = new InMemoryStore()
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
rebuildAppStore(s, reader,
store = s
} catch {
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding log path ${attempt.logPath} - " +
"trying again...")
retried = true

case e: Exception =>
throw e


Expand Down Expand Up @@ -1175,6 +1237,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/** NOTE: 'task' should ensure it executes 'endProcessing' at the end */
private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = {
try {
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting task", e)

private[history] object FsHistoryProvider {
Expand Down Expand Up @@ -1218,6 +1295,8 @@ private[history] case class LogInfo(
fileSize: Long,
@JsonDeserialize(contentAs = classOf[JLong])
lastIndex: Option[Long],
@JsonDeserialize(contentAs = classOf[JLong])
lastEvaluatedForCompaction: Option[Long],
isComplete: Boolean)

private[history] class AttemptInfoWrapper(
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/History.scala
Expand Up @@ -84,6 +84,22 @@ private[spark] object History {

.doc("The maximum number of event log files which will be retained as non-compacted. " +
"By default, all event log files will be retained. Please set the configuration " +
s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " +
"the overall size of event log files.")
.checkValue(_ > 0, "Max event log files to retain should be higher than 0.")


val DRIVER_LOG_CLEANER_ENABLED = ConfigBuilder("spark.history.fs.driverlog.cleaner.enabled")

Expand Down
18 changes: 0 additions & 18 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Expand Up @@ -195,24 +195,6 @@ package object config {
"configured to be at least 10 MiB.")

// TODO: remove this when integrating compactor with FsHistoryProvider
.doc("The maximum number of event log files which will be retained as non-compacted. " +
"By default, all event log files will be retained. Please set the configuration " +
s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " +
"the overall size of event log files.")
.checkValue(_ > 0, "Max event log files to retain should be higher than 0.")


private[spark] val EXECUTOR_ID =

Expand Down

0 comments on commit a2fe73b

Please sign in to comment.