From ec3ffd9d37cc68c36918af8682333b22c7b49d3f Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sun, 19 Jan 2020 20:38:27 +0900 Subject: [PATCH] Separate task between app listing and compaction --- .../deploy/history/FsHistoryProvider.scala | 89 +++++++++++-------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 56dfb95701302..be8f918dda076 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -571,35 +571,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - /** - * Returns a tuple containing two values. Each element means: - * - 1st (Boolean): true if the list of event log files are changed, false otherwise. - * - 2nd (Option[Long]): Some(value) if the method succeeds to try compaction, - * value indicates the last event log index to try compaction. None otherwise. - */ - private def compact(reader: EventLogFileReader): (Boolean, Option[Long]) = { - reader.lastIndex match { - case Some(lastIndex) => - try { - val info = listing.read(classOf[LogInfo], reader.rootPath.toString) - if (info.lastEvaluatedForCompaction.isEmpty || - info.lastEvaluatedForCompaction.get < lastIndex) { - // haven't tried compaction for this index, do compaction - val result = fileCompactor.compact(reader.listEventLogFiles) - (result.code == CompactionResultCode.SUCCESS, Some(lastIndex)) - } else { - (false, info.lastEvaluatedForCompaction) - } - } catch { - case _: NoSuchElementException => - // this should exist, but ignoring doesn't hurt much - (false, None) - } - - case None => (false, None) // This is not applied to single event log file. - } - } - private[history] def shouldReloadLog(info: LogInfo, reader: EventLogFileReader): Boolean = { if (info.isComplete != reader.completed) { true @@ -694,14 +665,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) enableOptimizations: Boolean): Unit = { val rootPath = reader.rootPath try { - val (shouldReload, lastCompactionIndex) = compact(reader) - val newReader = if (shouldReload) { - EventLogFileReader(fs, rootPath).get - } else { - reader + val lastEvaluatedForCompaction: Option[Long] = try { + listing.read(classOf[LogInfo], rootPath.toString).lastEvaluatedForCompaction + } catch { + case _: NoSuchElementException => None } + pendingReplayTasksCount.incrementAndGet() - doMergeApplicationListing(newReader, scanTime, enableOptimizations, lastCompactionIndex) + doMergeApplicationListing(reader, scanTime, enableOptimizations, lastEvaluatedForCompaction) if (conf.get(CLEANER_ENABLED)) { checkAndCleanLog(rootPath.toString) } @@ -720,6 +691,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { endProcessing(rootPath) pendingReplayTasksCount.decrementAndGet() + + // triggering another task for compaction task + try { + processing(rootPath) + val task: Runnable = () => compact(reader) + replayExecutor.submit(task) + } catch { + // let the iteration over the updated entries break, since an exception on + // replayExecutor.submit (..) indicates the ExecutorService is unable + // to take any more submissions at this time + case e: Exception => + logError(s"Exception while submitting task for compaction", e) + endProcessing(rootPath) + } } } @@ -731,7 +716,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) reader: EventLogFileReader, scanTime: Long, enableOptimizations: Boolean, - lastCompactionIndex: Option[Long]): Unit = { + lastEvaluatedForCompaction: Option[Long]): Unit = { val eventsFilter: ReplayEventsFilter = { eventString => eventString.startsWith(APPL_START_EVENT_PREFIX) || eventString.startsWith(APPL_END_EVENT_PREFIX) || @@ -810,7 +795,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) addListing(app) listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(app.info.id), app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, reader.lastIndex, - lastCompactionIndex, reader.completed)) + lastEvaluatedForCompaction, reader.completed)) // For a finished log, remove the corresponding "in progress" entry from the listing DB if // the file is really gone. @@ -835,7 +820,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") doMergeApplicationListing(reader, scanTime, enableOptimizations = false, - lastCompactionIndex) + lastEvaluatedForCompaction) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the @@ -843,7 +828,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // does not make progress after the configured max log age. listing.write( LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None, - reader.fileSizeForLastIndex, reader.lastIndex, lastCompactionIndex, reader.completed)) + reader.fileSizeForLastIndex, reader.lastIndex, lastEvaluatedForCompaction, + reader.completed)) + } + } + + private def compact(reader: EventLogFileReader): Unit = { + val rootPath = reader.rootPath + try { + reader.lastIndex match { + case Some(lastIndex) => + try { + val info = listing.read(classOf[LogInfo], reader.rootPath.toString) + if (info.lastEvaluatedForCompaction.isEmpty || + info.lastEvaluatedForCompaction.get < lastIndex) { + // haven't tried compaction for this index, do compaction + fileCompactor.compact(reader.listEventLogFiles) + listing.write(info.copy(lastEvaluatedForCompaction = Some(lastIndex))) + } + } catch { + case _: NoSuchElementException => + // this should exist, but ignoring doesn't hurt much + } + + case None => // This is not applied to single event log file. + } + } finally { + endProcessing(rootPath) } }