From 1190ffcb109025bd62c909059b0cf16e6a748de9 Mon Sep 17 00:00:00 2001 From: Rong Tang Date: Mon, 17 Sep 2018 15:00:23 -0700 Subject: [PATCH] implement incremental loading and add a flag to load incomplete or not --- .../deploy/history/FsHistoryProvider.scala | 33 +++++++++++++++---- .../apache/spark/deploy/history/config.scala | 9 +++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 44d23908146c7..c1db82526ac1e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -106,6 +106,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val logDir = conf.get(EVENT_LOG_DIR) + private val appRefreshNum = conf.get(HISTORY_APP_REFRESH_NUM) + private val loadInComplete = conf.get(HISTORY_APP_LOAD_INCOMPLETE) + private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false) private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "") private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "") @@ -440,6 +443,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) !entry.getPath().getName().startsWith(".") && !isBlacklisted(entry.getPath) } + .filter { entry => + if (entry.getPath().getName().endsWith(".inprogress")) { + loadInComplete + } else { + true + } + } .filter { entry => try { val info = listing.read(classOf[LogInfo], entry.getPath().toString()) @@ -465,20 +475,31 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } catch { case _: NoSuchElementException => - // If the file is currently not being tracked by the SHS, add an entry for it and try - // to parse it. This will allow the cleaner code to detect the file as stale later on - // if it was not possible to parse it. - listing.write(LogInfo(entry.getPath().toString(), newLastScanTime, None, None, - entry.getLen())) entry.getLen() > 0 } } .sortWith { case (entry1, entry2) => entry1.getModificationTime() > entry2.getModificationTime() } + .take(appRefreshNum) + + updated.foreach{ + entry => + try { + listing.read(classOf[LogInfo], entry.getPath().toString()) + } catch { + case _: NoSuchElementException => + // If the file is currently not being tracked by the SHS, add an entry for it and + // try to parse it. This will allow the cleaner code to detect the file as stale + // later on if it was not possible to parse it. + listing.write(LogInfo(entry.getPath().toString(), newLastScanTime, None, None, + entry.getLen())) + } + } + if (updated.nonEmpty) { - logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}") + logDebug(s"Attempts to update: ${updated.size} ${updated.map(_.getPath)}") } val tasks = updated.flatMap { entry => diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala index 25ba9edb9e014..942b1b46e8c82 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -64,4 +64,13 @@ private[spark] object config { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("1m") + val HISTORY_APP_REFRESH_NUM = ConfigBuilder("spark.history.fs.appRefreshNum") + .doc("APP number to refresh each scan") + .intConf + .createWithDefault(3000) + + val HISTORY_APP_LOAD_INCOMPLETE = ConfigBuilder("spark.history.fs.load.incomplete") + .doc("Show incomplete APP's information or not") + .booleanConf + .createWithDefault(true) }