diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala index 80a0a7067a4e4..8558f765175fc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.deploy.history.EventFilter.FilterStatistics import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} import org.apache.spark.scheduler.ReplayListenerBus import org.apache.spark.util.Utils @@ -49,9 +48,11 @@ import org.apache.spark.util.Utils class EventLogFileCompactor( sparkConf: SparkConf, hadoopConf: Configuration, - fs: FileSystem) extends Logging { - private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN) - private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD) + fs: FileSystem, + maxFilesToRetain: Int, + compactionThresholdScore: Double) extends Logging { + + require(maxFilesToRetain > 0, "Max event log files to retain should be higher than 0.") /** * Compacts the old event log files into one compact file, and clean old event log files being diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 54c50006c33b7..2c023b00e6549 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -158,6 +158,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) new HistoryServerDiskManager(conf, path, listing, clock) } + private val fileCompactor = new EventLogFileCompactor(conf, hadoopConf, fs, + conf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN), conf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD)) + // Used to store the paths, which are being processed. This enable the replay log tasks execute // asynchronously and make sure that checkForLogs would not process a path repeatedly. private val processing = ConcurrentHashMap.newKeySet[String] @@ -475,10 +478,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } if (shouldReloadLog(info, reader)) { - // ignore fastInProgressParsing when the status of application is changed from - // in-progress to completed, which is needed for rolling event log. - if (info.appId.isDefined && (info.isComplete == reader.completed) && - fastInProgressParsing) { + // ignore fastInProgressParsing when rolling event log is enabled on the log path, + // to ensure proceeding compaction even fastInProgressParsing is turned on. + if (info.appId.isDefined && reader.lastIndex.isEmpty && fastInProgressParsing) { // When fast in-progress parsing is on, we don't need to re-parse when the // size changes, but we do need to invalidate any existing UIs. // Also, we need to update the `lastUpdated time` to display the updated time in @@ -518,7 +520,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // to parse it. This will allow the cleaner code to detect the file as stale later on // if it was not possible to parse it. listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime, LogType.EventLogs, - None, None, reader.fileSizeForLastIndex, reader.lastIndex, + None, None, reader.fileSizeForLastIndex, reader.lastIndex, None, reader.completed)) reader.fileSizeForLastIndex > 0 } @@ -532,16 +534,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } updated.foreach { entry => - processing(entry.rootPath) - try { - val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true) - replayExecutor.submit(task) - } catch { - // let the iteration over the updated entries break, since an exception on - // replayExecutor.submit (..) indicates the ExecutorService is unable - // to take any more submissions at this time - case e: Exception => - logError(s"Exception while submitting event log for replay", e) + submitLogProcessTask(entry.rootPath) { () => + mergeApplicationListing(entry, newLastScanTime, true) } } @@ -661,27 +655,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) reader: EventLogFileReader, scanTime: Long, enableOptimizations: Boolean): Unit = { + val rootPath = reader.rootPath try { + val lastEvaluatedForCompaction: Option[Long] = try { + listing.read(classOf[LogInfo], rootPath.toString).lastEvaluatedForCompaction + } catch { + case _: NoSuchElementException => None + } + pendingReplayTasksCount.incrementAndGet() - doMergeApplicationListing(reader, scanTime, enableOptimizations) + doMergeApplicationListing(reader, scanTime, enableOptimizations, lastEvaluatedForCompaction) if (conf.get(CLEANER_ENABLED)) { - checkAndCleanLog(reader.rootPath.toString) + checkAndCleanLog(rootPath.toString) } } catch { case e: InterruptedException => throw e case e: AccessControlException => // We don't have read permissions on the log file - logWarning(s"Unable to read log ${reader.rootPath}", e) - blacklist(reader.rootPath) + logWarning(s"Unable to read log $rootPath", e) + blacklist(rootPath) // SPARK-28157 We should remove this blacklisted entry from the KVStore // to handle permission-only changes with the same file sizes later. - listing.delete(classOf[LogInfo], reader.rootPath.toString) + listing.delete(classOf[LogInfo], rootPath.toString) case e: Exception => logError("Exception while merging application listings", e) } finally { - endProcessing(reader.rootPath) + endProcessing(rootPath) pendingReplayTasksCount.decrementAndGet() + + // triggering another task for compaction task + submitLogProcessTask(rootPath) { () => compact(reader) } } } @@ -692,7 +696,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private[history] def doMergeApplicationListing( reader: EventLogFileReader, scanTime: Long, - enableOptimizations: Boolean): Unit = { + enableOptimizations: Boolean, + lastEvaluatedForCompaction: Option[Long]): Unit = { val eventsFilter: ReplayEventsFilter = { eventString => eventString.startsWith(APPL_START_EVENT_PREFIX) || eventString.startsWith(APPL_END_EVENT_PREFIX) || @@ -770,8 +775,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) invalidateUI(app.info.id, app.attempts.head.info.attemptId) addListing(app) listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(app.info.id), - app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, - reader.lastIndex, reader.completed)) + app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, reader.lastIndex, + lastEvaluatedForCompaction, reader.completed)) // For a finished log, remove the corresponding "in progress" entry from the listing DB if // the file is really gone. @@ -795,7 +800,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") - doMergeApplicationListing(reader, scanTime, enableOptimizations = false) + doMergeApplicationListing(reader, scanTime, enableOptimizations = false, + lastEvaluatedForCompaction) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the @@ -803,7 +809,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // does not make progress after the configured max log age. listing.write( LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None, - reader.fileSizeForLastIndex, reader.lastIndex, reader.completed)) + reader.fileSizeForLastIndex, reader.lastIndex, lastEvaluatedForCompaction, + reader.completed)) + } + } + + private def compact(reader: EventLogFileReader): Unit = { + val rootPath = reader.rootPath + try { + reader.lastIndex match { + case Some(lastIndex) => + try { + val info = listing.read(classOf[LogInfo], reader.rootPath.toString) + if (info.lastEvaluatedForCompaction.isEmpty || + info.lastEvaluatedForCompaction.get < lastIndex) { + // haven't tried compaction for this index, do compaction + fileCompactor.compact(reader.listEventLogFiles) + listing.write(info.copy(lastEvaluatedForCompaction = Some(lastIndex))) + } + } catch { + case _: NoSuchElementException => + // this should exist, but ignoring doesn't hurt much + } + + case None => // This is not applied to single event log file. + } + } finally { + endProcessing(rootPath) } } @@ -962,7 +994,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case e: NoSuchElementException => // For every new driver log file discovered, create a new entry in listing listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs, None, - None, f.getLen(), None, false)) + None, f.getLen(), None, None, false)) false } if (deleteFile) { @@ -989,9 +1021,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Rebuilds the application state store from its event log. + * Rebuilds the application state store from its event log. Exposed for testing. */ - private def rebuildAppStore( + private[spark] def rebuildAppStore( store: KVStore, reader: EventLogFileReader, lastUpdated: Long): Unit = { @@ -1010,8 +1042,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } replayBus.addListener(listener) try { + val eventLogFiles = reader.listEventLogFiles logInfo(s"Parsing ${reader.rootPath} to re-build UI...") - parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed) + parseAppEventLogs(eventLogFiles, replayBus, !reader.completed) trackingStore.close(false) logInfo(s"Finished parsing ${reader.rootPath}") } catch { @@ -1122,30 +1155,59 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // At this point the disk data either does not exist or was deleted because it failed to // load, so the event log needs to be replayed. - val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), - attempt.lastIndex) - val isCompressed = reader.compressionCodec.isDefined - logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") - val lease = dm.lease(reader.totalSize, isCompressed) - val newStorePath = try { - Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store => - rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + var retried = false + var newStorePath: File = null + while (newStorePath == null) { + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + val isCompressed = reader.compressionCodec.isDefined + logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + val lease = dm.lease(reader.totalSize, isCompressed) + try { + Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store => + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + } + newStorePath = lease.commit(appId, attempt.info.attemptId) + } catch { + case _: IOException if !retried => + // compaction may touch the file(s) which app rebuild wants to read + // compaction wouldn't run in short interval, so try again... + logWarning(s"Exception occurred while rebuilding app $appId - trying again...") + lease.rollback() + retried = true + + case e: Exception => + lease.rollback() + throw e } - lease.commit(appId, attempt.info.attemptId) - } catch { - case e: Exception => - lease.rollback() - throw e } KVUtils.open(newStorePath, metadata) } private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { - val store = new InMemoryStore() - val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), - attempt.lastIndex) - rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + var retried = false + var store: KVStore = null + while (store == null) { + try { + val s = new InMemoryStore() + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime()) + store = s + } catch { + case _: IOException if !retried => + // compaction may touch the file(s) which app rebuild wants to read + // compaction wouldn't run in short interval, so try again... + logWarning(s"Exception occurred while rebuilding log path ${attempt.logPath} - " + + "trying again...") + retried = true + + case e: Exception => + throw e + } + } + store } @@ -1175,6 +1237,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } deleted } + + /** NOTE: 'task' should ensure it executes 'endProcessing' at the end */ + private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = { + try { + processing(rootPath) + replayExecutor.submit(task) + } catch { + // let the iteration over the updated entries break, since an exception on + // replayExecutor.submit (..) indicates the ExecutorService is unable + // to take any more submissions at this time + case e: Exception => + logError(s"Exception while submitting task", e) + endProcessing(rootPath) + } + } } private[history] object FsHistoryProvider { @@ -1218,6 +1295,8 @@ private[history] case class LogInfo( fileSize: Long, @JsonDeserialize(contentAs = classOf[JLong]) lastIndex: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) + lastEvaluatedForCompaction: Option[Long], isComplete: Boolean) private[history] class AttemptInfoWrapper( diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index ca9af316dffd0..17fb55d9db860 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -84,6 +84,22 @@ private[spark] object History { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("1m") + private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = + ConfigBuilder("spark.history.fs.eventLog.rolling.maxFilesToRetain") + .doc("The maximum number of event log files which will be retained as non-compacted. " + + "By default, all event log files will be retained. Please set the configuration " + + s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " + + "the overall size of event log files.") + .intConf + .checkValue(_ > 0, "Max event log files to retain should be higher than 0.") + .createWithDefault(Integer.MAX_VALUE) + + private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD = + ConfigBuilder("spark.history.fs.eventLog.rolling.compaction.score.threshold") + .internal() + .doubleConf + .createWithDefault(0.7d) + val DRIVER_LOG_CLEANER_ENABLED = ConfigBuilder("spark.history.fs.driverlog.cleaner.enabled") .fallbackConf(CLEANER_ENABLED) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 110198815c255..40b05cf96d1e3 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -195,24 +195,6 @@ package object config { "configured to be at least 10 MiB.") .createWithDefaultString("128m") - private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = - ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain") - // TODO: remove this when integrating compactor with FsHistoryProvider - .internal() - .doc("The maximum number of event log files which will be retained as non-compacted. " + - "By default, all event log files will be retained. Please set the configuration " + - s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " + - "the overall size of event log files.") - .intConf - .checkValue(_ > 0, "Max event log files to retain should be higher than 0.") - .createWithDefault(Integer.MAX_VALUE) - - private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD = - ConfigBuilder("spark.eventLog.rolling.compaction.score.threshold") - .internal() - .doubleConf - .createWithDefault(0.7d) - private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala index 866e610aab980..2a914023ec821 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite, Success} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.EventLogTestHelper.writeEventsToRollingWriter import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.status.ListenerEventsTestHelper._ @@ -35,13 +34,16 @@ import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{JsonProtocol, Utils} class EventLogFileCompactorSuite extends SparkFunSuite { - private val sparkConf = testSparkConf() + import EventLogFileCompactorSuite._ + + private val sparkConf = new SparkConf() private val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) test("No event log files") { withTempDir { dir => val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) - val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, TEST_COMPACTION_SCORE_THRESHOLD) assertNoCompaction(fs, Seq.empty, compactor.compact(Seq.empty), CompactionResultCode.NOT_ENOUGH_FILES) @@ -54,7 +56,8 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, (1 to 2).map(_ => testEvent): _*) - val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, TEST_COMPACTION_SCORE_THRESHOLD) assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), CompactionResultCode.NOT_ENOUGH_FILES) } @@ -66,7 +69,8 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, (1 to 5).map(_ => testEvent): _*) - val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, TEST_COMPACTION_SCORE_THRESHOLD) assertCompaction(fs, fileStatuses, compactor.compact(fileStatuses), expectedNumOfFilesCompacted = 2) } @@ -85,7 +89,8 @@ class EventLogFileCompactorSuite extends SparkFunSuite { assert(fs.rename(fileToCompact, compactedPath)) val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) - val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, TEST_COMPACTION_SCORE_THRESHOLD) assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), CompactionResultCode.NOT_ENOUGH_FILES) } @@ -104,7 +109,8 @@ class EventLogFileCompactorSuite extends SparkFunSuite { assert(fs.rename(fileToCompact, compactedPath)) val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) - val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, TEST_COMPACTION_SCORE_THRESHOLD) assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), CompactionResultCode.NOT_ENOUGH_FILES) } @@ -123,7 +129,8 @@ class EventLogFileCompactorSuite extends SparkFunSuite { assert(fs.rename(fileToCompact, compactedPath)) val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) - val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, TEST_COMPACTION_SCORE_THRESHOLD) assertCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), expectedNumOfFilesCompacted = 7) } @@ -145,7 +152,8 @@ class EventLogFileCompactorSuite extends SparkFunSuite { testEvent, testEvent) - val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, TEST_COMPACTION_SCORE_THRESHOLD) assertCompaction(fs, fileStatuses, compactor.compact(fileStatuses), expectedNumOfFilesCompacted = 2) @@ -166,7 +174,6 @@ class EventLogFileCompactorSuite extends SparkFunSuite { test("Don't compact file if score is lower than threshold") { withTempDir { dir => val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) - val newConf = sparkConf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.7d) // job 1 having 4 tasks val rddsForStage1 = createRddsWithId(1 to 2) @@ -180,7 +187,7 @@ class EventLogFileCompactorSuite extends SparkFunSuite { // here job 1 is finished and job 2 is still live, hence half of total tasks are considered // as live - val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, newConf, hadoopConf, + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, Seq(SparkListenerJobStart(1, 0, Seq(stage1)), SparkListenerStageSubmitted(stage1)), tasks, Seq(SparkListenerJobStart(2, 0, Seq(stage2)), SparkListenerStageSubmitted(stage2)), @@ -190,7 +197,8 @@ class EventLogFileCompactorSuite extends SparkFunSuite { testEvent, testEvent) - val compactor = new EventLogFileCompactor(newConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, 0.7d) assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), CompactionResultCode.LOW_SCORE_FOR_COMPACTION) } @@ -260,7 +268,8 @@ class EventLogFileCompactorSuite extends SparkFunSuite { val filters = Seq(new TestEventFilter1, new TestEventFilter2) val logPath = new Path(writer.logPath) - val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs, + TEST_ROLLING_MAX_FILES_TO_RETAIN, TEST_COMPACTION_SCORE_THRESHOLD) val newPath = compactor.rewrite(filters, Seq(fs.getFileStatus(logPath))) assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED) @@ -315,12 +324,12 @@ class EventLogFileCompactorSuite extends SparkFunSuite { private def testEvent: Seq[SparkListenerEvent] = Seq(SparkListenerApplicationStart("app", Some("app"), 0, "user", None)) +} - private def testSparkConf(): SparkConf = { - new SparkConf() - .set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 3) - // to simplify the tests, we set the score threshold as 0.0d - // individual test can override the value to verify the functionality - .set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) - } +object EventLogFileCompactorSuite { + val TEST_ROLLING_MAX_FILES_TO_RETAIN = 3 + + // To simplify the tests, we set the score threshold as 0.0d. + // Individual test can use the other value to verify the functionality. + val TEST_COMPACTION_SCORE_THRESHOLD = 0.0d } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index a96667ffacd26..c2f34fc3a95ed 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -37,7 +37,9 @@ import org.mockito.Mockito.{doThrow, mock, spy, verify, when} import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf, SparkFunSuite} +import org.apache.spark.{JobExecutionStatus, SecurityManager, SPARK_VERSION, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR import org.apache.spark.internal.config.History._ @@ -50,10 +52,10 @@ import org.apache.spark.status.AppStatusStore import org.apache.spark.status.KVUtils.KVStoreScalaSerializer import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} +import org.apache.spark.util.kvstore.InMemoryStore import org.apache.spark.util.logging.DriverLogger class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { - private var testDir: File = null override def beforeEach(): Unit = { @@ -164,8 +166,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { override private[history] def doMergeApplicationListing( reader: EventLogFileReader, lastSeen: Long, - enableSkipToEnd: Boolean): Unit = { - super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd) + enableSkipToEnd: Boolean, + lastCompactionIndex: Option[Long]): Unit = { + super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd, lastCompactionIndex) doMergeApplicationListingCall += 1 } } @@ -1167,7 +1170,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { var fileStatus = new FileStatus(200, false, 0, 0, 0, path) when(mockedFs.getFileStatus(path)).thenReturn(fileStatus) var logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 100, None, false) + Some("attemptId"), 100, None, None, false) var reader = EventLogFileReader(mockedFs, path) assert(reader.isDefined) assert(mockedProvider.shouldReloadLog(logInfo, reader.get)) @@ -1177,14 +1180,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { when(mockedFs.getFileStatus(path)).thenReturn(fileStatus) // DFSInputStream.getFileLength is more than logInfo fileSize logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 100, None, false) + Some("attemptId"), 100, None, None, false) reader = EventLogFileReader(mockedFs, path) assert(reader.isDefined) assert(mockedProvider.shouldReloadLog(logInfo, reader.get)) // DFSInputStream.getFileLength is equal to logInfo fileSize logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 200, None, false) + Some("attemptId"), 200, None, None, false) reader = EventLogFileReader(mockedFs, path) assert(reader.isDefined) assert(!mockedProvider.shouldReloadLog(logInfo, reader.get)) @@ -1292,11 +1295,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val serializer = new KVStoreScalaSerializer() val logInfoWithIndexAsNone = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 100, None, false) + Some("attemptId"), 100, None, None, false) assertSerDe(serializer, logInfoWithIndexAsNone) val logInfoWithIndex = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 100, Some(3), false) + Some("attemptId"), 100, Some(3), None, false) assertSerDe(serializer, logInfoWithIndex) } @@ -1362,6 +1365,111 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + test("compact event log files") { + def verifyEventLogFiles( + fs: FileSystem, + rootPath: String, + expectedIndexForCompact: Option[Long], + expectedIndicesForNonCompact: Seq[Long]): Unit = { + val reader = EventLogFileReader(fs, new Path(rootPath)).get + var logFiles = reader.listEventLogFiles + + expectedIndexForCompact.foreach { idx => + val headFile = logFiles.head + assert(EventLogFileWriter.isCompacted(headFile.getPath)) + assert(idx == RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName)) + logFiles = logFiles.drop(1) + } + + assert(logFiles.size === expectedIndicesForNonCompact.size) + + logFiles.foreach { logFile => + assert(RollingEventLogFilesWriter.isEventLogFile(logFile)) + assert(!EventLogFileWriter.isCompacted(logFile.getPath)) + } + + val indices = logFiles.map { logFile => + RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName) + } + assert(expectedIndicesForNonCompact === indices) + } + + withTempDir { dir => + val conf = createTestConf() + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1) + conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf) + writer.start() + + // writing event log file 1 - don't compact for now + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app", Some("app"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + + updateAndCheck(provider) { _ => + verifyEventLogFiles(fs, writer.logPath, None, Seq(1)) + val info = provider.listing.read(classOf[LogInfo], writer.logPath) + assert(info.lastEvaluatedForCompaction === Some(1)) + } + + // writing event log file 2 - compact the event log file 1 into 1.compact + writeEventsToRollingWriter(writer, Seq.empty, rollFile = true) + writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1), + SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false) + + updateAndCheck(provider) { _ => + verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2)) + val info = provider.listing.read(classOf[LogInfo], writer.logPath) + assert(info.lastEvaluatedForCompaction === Some(2)) + } + + // writing event log file 3 - compact two files - 1.compact & 2 into one, 2.compact + writeEventsToRollingWriter(writer, Seq.empty, rollFile = true) + writeEventsToRollingWriter(writer, Seq( + SparkListenerExecutorAdded(3, "exec1", new ExecutorInfo("host1", 1, Map.empty)), + SparkListenerJobStart(2, 4, Seq.empty), + SparkListenerJobEnd(2, 5, JobSucceeded)), rollFile = false) + + writer.stop() + + updateAndCheck(provider) { _ => + verifyEventLogFiles(fs, writer.logPath, Some(2), Seq(3)) + + val info = provider.listing.read(classOf[LogInfo], writer.logPath) + assert(info.lastEvaluatedForCompaction === Some(3)) + + val store = new InMemoryStore + val appStore = new AppStatusStore(store) + + val reader = EventLogFileReader(fs, new Path(writer.logPath)).get + provider.rebuildAppStore(store, reader, 0L) + + // replayed store doesn't have any job, as events for job are removed while compacting + intercept[NoSuchElementException] { + appStore.job(1) + } + + // but other events should be available even they were in original files to compact + val appInfo = appStore.applicationInfo() + assert(appInfo.id === "app") + assert(appInfo.name === "app") + + // All events in retained file(s) should be available, including events which would have + // been filtered out if compaction is applied. e.g. finished jobs, removed executors, etc. + val exec1 = appStore.executorSummary("exec1") + assert(exec1.hostPort === "host1") + val job2 = appStore.job(2) + assert(job2.status === JobExecutionStatus.SUCCEEDED) + } + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/docs/monitoring.md b/docs/monitoring.md index 090178f5b37eb..31bf1ebdecad3 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -300,7 +300,26 @@ Security options for the Spark History Server are covered more detail in the Even this is set to `true`, this configuration has no effect on a live application, it only affects the history server. - + + spark.history.fs.eventLog.rolling.maxFilesToRetain + Int.MaxValue + + The maximum number of event log files which will be retained as non-compacted. By default, + all event log files will be retained.
+ Please note that compaction will happen in Spark History Server, which means this configuration + should be set to the configuration of Spark History server, and the same value will be applied + across applications which are being loaded in Spark History Server. This also means compaction + and cleanup would require running Spark History Server.
+ Please set the configuration in Spark History Server, and spark.eventLog.rolling.maxFileSize + in each application accordingly if you want to control the overall size of event log files. + The event log files older than these retained files will be compacted into single file and + deleted afterwards.
+ NOTE: Spark History Server may not compact the old event log files if it figures + out not a lot of space would be reduced during compaction. For streaming query + (including Structured Streaming) we normally expect compaction will run, but for + batch query compaction won't run in many cases. + + Note that in all of these UIs, the tables are sortable by clicking their headers,