diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 0b7e695efa870..1a338febbfdc5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -25,7 +25,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.ExecutionException import scala.io.Source import scala.util.Try import scala.xml.Node @@ -161,6 +160,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) new HistoryServerDiskManager(conf, path, listing, clock) } + // Used to store the paths, which are being processed. This enable the replay log tasks execute + // asynchronously and make sure that checkForLogs would not process a path repeatedly. + private val processing = ConcurrentHashMap.newKeySet[String] + + private def isProcessing(path: Path): Boolean = { + processing.contains(path.getName) + } + + private def isProcessing(info: LogInfo): Boolean = { + processing.contains(info.logPath.split("/").last) + } + + private def processing(path: Path): Unit = { + processing.add(path.getName) + } + + private def endProcessing(path: Path): Unit = { + processing.remove(path.getName) + } + private val blacklist = new ConcurrentHashMap[String, Long] // Visible for testing @@ -440,6 +459,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => !isBlacklisted(entry.getPath) } + .filter { entry => !isProcessing(entry.getPath) } .flatMap { entry => EventLogFileReader(fs, entry) } .filter { reader => try { @@ -512,11 +532,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}") } - val tasks = updated.flatMap { entry => + updated.foreach { entry => + processing(entry.rootPath) try { - val task: Future[Unit] = replayExecutor.submit( - () => mergeApplicationListing(entry, newLastScanTime, true)) - Some(task -> entry.rootPath) + val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true) + replayExecutor.submit(task) } catch { // let the iteration over the updated entries break, since an exception on // replayExecutor.submit (..) indicates the ExecutorService is unable @@ -527,31 +547,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - pendingReplayTasksCount.addAndGet(tasks.size) - - // Wait for all tasks to finish. This makes sure that checkForLogs - // is not scheduled again while some tasks are already running in - // the replayExecutor. - tasks.foreach { case (task, path) => - try { - task.get() - } catch { - case e: InterruptedException => - throw e - case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] => - // We don't have read permissions on the log file - logWarning(s"Unable to read log $path", e.getCause) - blacklist(path) - // SPARK-28157 We should remove this blacklisted entry from the KVStore - // to handle permission-only changes with the same file sizes later. - listing.delete(classOf[LogInfo], path.toString) - case e: Exception => - logError("Exception while merging application listings", e) - } finally { - pendingReplayTasksCount.decrementAndGet() - } - } - // Delete all information about applications whose log files disappeared from storage. // This is done by identifying the event logs which were not touched by the current // directory scan. @@ -563,7 +558,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .last(newLastScanTime - 1) .asScala .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => log.appId.foreach { appId => cleanAppData(appId, log.attemptId, log.logPath) listing.delete(classOf[LogInfo], log.logPath) @@ -664,10 +659,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + private def mergeApplicationListing( + reader: EventLogFileReader, + scanTime: Long, + enableOptimizations: Boolean): Unit = { + try { + pendingReplayTasksCount.incrementAndGet() + doMergeApplicationListing(reader, scanTime, enableOptimizations) + } catch { + case e: InterruptedException => + throw e + case e: AccessControlException => + // We don't have read permissions on the log file + logWarning(s"Unable to read log ${reader.rootPath}", e) + blacklist(reader.rootPath) + // SPARK-28157 We should remove this blacklisted entry from the KVStore + // to handle permission-only changes with the same file sizes later. + listing.delete(classOf[LogInfo], reader.rootPath.toString) + case e: Exception => + logError("Exception while merging application listings", e) + } finally { + endProcessing(reader.rootPath) + pendingReplayTasksCount.decrementAndGet() + + val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis() + if (isExpired) { + listing.delete(classOf[LogInfo], reader.rootPath.toString) + deleteLog(fs, reader.rootPath) + } + } + } + /** * Replay the given log file, saving the application in the listing db. + * Visable for testing */ - protected def mergeApplicationListing( + private[history] def doMergeApplicationListing( reader: EventLogFileReader, scanTime: Long, enableOptimizations: Boolean): Unit = { @@ -773,7 +800,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") - mergeApplicationListing(reader, scanTime, enableOptimizations = false) + doMergeApplicationListing(reader, scanTime, enableOptimizations = false) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the @@ -827,7 +854,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .asScala .filter { l => l.logType == null || l.logType == LogType.EventLogs } .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") deleteLog(fs, new Path(log.logPath)) @@ -935,7 +962,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .asScala .filter { l => l.logType != null && l.logType == LogType.DriverLogs } .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => logInfo(s"Deleting invalid driver log ${log.logPath}") listing.delete(classOf[LogInfo], log.logPath) deleteLog(driverLogFs, new Path(log.logPath)) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 281e6935de375..4097224184812 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -160,13 +160,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assume(!Utils.isWindows) class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) { - var mergeApplicationListingCall = 0 - override protected def mergeApplicationListing( + var doMergeApplicationListingCall = 0 + override private[history] def doMergeApplicationListing( reader: EventLogFileReader, lastSeen: Long, enableSkipToEnd: Boolean): Unit = { - super.mergeApplicationListing(reader, lastSeen, enableSkipToEnd) - mergeApplicationListingCall += 1 + super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd) + doMergeApplicationListingCall += 1 } } val provider = new TestFsHistoryProvider @@ -187,7 +187,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { list.size should be (1) } - provider.mergeApplicationListingCall should be (1) + provider.doMergeApplicationListingCall should be (1) } test("history file is renamed from inprogress to completed") {