From 842589d34a2c2b97180d392d35eaa985edbfd671 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 13 Oct 2016 15:30:59 -0700 Subject: [PATCH 01/12] SHS-NG M2: Store FsHistoryProvider listing data in a KVStore. The application listing is still generated from event logs, but is now stored in a KVStore instance. By default an in-memory store is used, but a new config allows setting a local disk path to store the data, in which case a LevelDB store will be created. The provider stores things internally using the public REST API types; I believe this is better going forward since it will make it easier to get rid of the internal history server API which is mostly redundant at this point. I also added a finalizer to LevelDBIterator, to make sure that resources are eventually released. This helps when code iterates but does not exhaust the iterator, thus not triggering the auto-close code. HistoryServerSuite was modified to not re-start the history server unnecessarily; this makes the json validation tests run more quickly. --- .../spark/util/kvstore/LevelDBIterator.java | 14 + core/pom.xml | 5 + .../history/ApplicationHistoryProvider.scala | 8 + .../deploy/history/FsHistoryProvider.scala | 636 ++++++++++-------- .../apache/spark/deploy/history/config.scala | 46 ++ .../org/apache/spark/status/api/v1/api.scala | 15 +- .../history/FsHistoryProviderSuite.scala | 57 +- .../deploy/history/HistoryServerSuite.scala | 17 +- docs/monitoring.md | 7 + .../launcher/AbstractCommandBuilder.java | 1 + scalastyle-config.xml | 2 +- 11 files changed, 483 insertions(+), 325 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/config.scala diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java index a2181f3874f86..48f9d10b681a2 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -191,6 +191,20 @@ public synchronized void close() throws IOException { } } + /** + * Because it's tricky to expose closeable iterators through many internal APIs, especially + * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by + * the iterator will eventually be released. + */ + @Override + protected void finalize() throws Throwable { + try { + close(); + } catch (Exception e) { + // Ignore error here, db may have been closed already. + } + } + private byte[] loadNext() { if (count >= max) { return null; diff --git a/core/pom.xml b/core/pom.xml index bc6b1c4f01117..5c2aee91485e6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -67,6 +67,11 @@ spark-launcher_${scala.binary.version} ${project.version} + + org.apache.spark + spark-kvstore_${scala.binary.version} + ${project.version} + org.apache.spark spark-network-common_${scala.binary.version} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 5cb48ca3e60b0..2e1aa44775d12 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -75,6 +75,14 @@ private[history] case class LoadedAppUI( private[history] abstract class ApplicationHistoryProvider { + /** + * The number of applications available for listing. Separate method in case it's cheaper + * to get a count than to calculate the whole listing. + * + * @return The number of available applications. + */ + def getAppCount(): Int = getListing().size + /** * Returns the count of application event logs that the provider is currently still processing. * History Server UI can use this to indicate to a user that the application listing on the UI diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 687fd2d3ffe64..5aae6e5e1175c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,14 +17,17 @@ package org.apache.spark.deploy.history -import java.io.{FileNotFoundException, IOException, OutputStream} -import java.util.UUID -import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit} +import java.io.{File, FileNotFoundException, IOException} +import java.util.{Date, UUID} +import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.Node +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude} +import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} @@ -35,11 +38,14 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ /** * A class that provides application history from event logs stored in the file system. @@ -78,6 +84,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) this(conf, new SystemClock()) } + import config._ import FsHistoryProvider._ // Interval between safemode checks. @@ -94,8 +101,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS, Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) - private val logDir = conf.getOption("spark.history.fs.logDirectory") - .getOrElse(DEFAULT_LOG_DIR) + private val logDir = conf.get(EVENT_LOG_DIR) private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false) private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "") @@ -117,17 +123,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) - // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted - // into the map in order, so the LinkedHashMap maintains the correct ordering. - @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] - = new mutable.LinkedHashMap() + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]() + private val storePath = conf.get(LOCAL_STORE_DIR) - // List of application logs to be deleted by event log cleaner. - private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val listing = storePath.map { path => + val dbPath = new File(path, "listing.ldb") - private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) + def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) + + try { + val db = openDB() + val meta = db.getMetadata(classOf[KVStoreMetadata]) + + if (meta == null) { + db.setMetadata(new KVStoreMetadata(CURRENT_VERSION, logDir.toString())) + db + } else if (meta.version != CURRENT_VERSION || !logDir.toString().equals(meta.logDir)) { + logInfo("Detected mismatched config in existing DB, deleting...") + db.close() + Utils.deleteRecursively(dbPath) + openDB() + } else { + db + } + } catch { + case _: UnsupportedStoreVersionException => + logInfo("Detected incompatible DB versions, deleting...") + Utils.deleteRecursively(dbPath) + openDB() + } + }.getOrElse(new InMemoryStore()) /** * Return a runnable that performs the given operation on the event logs. @@ -229,10 +255,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator + override def getListing(): Iterator[ApplicationHistoryInfo] = { + listing.view(classOf[ApplicationInfoWrapper]) + .index("endTime") + .reverse() + .iterator() + .asScala + .map(_.toAppHistoryInfo) + } - override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = { - applications.get(appId) + override def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] = { + try { + Some(load(appId).toAppHistoryInfo()) + } catch { + case e: NoSuchElementException => + None + } } override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get() @@ -241,42 +279,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val appInfo = load(appId) + appInfo.attempts + .find { attempt => attempt.info.attemptId == attemptId } + .map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), - attempt.startTime) + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + attempt.info.startTime.getTime()) // Do not call ui.bind() to avoid creating a new server for each application } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - - if (appListener.appId.isDefined) { - ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))) - } else { - None - } - + ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") + ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") + ui.getSecurityManager.setAdminAcls(adminAcls) + ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, + appListener.viewAcls.getOrElse("")) + val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + appListener.adminAclsGroups.getOrElse("") + ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) + ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) + LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)) } - } } catch { - case e: FileNotFoundException => None + case _: NoSuchElementException => None } } @@ -301,6 +335,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def stop(): Unit = { + listing.close() if (initThread != null && initThread.isAlive()) { initThread.interrupt() initThread.join() @@ -316,25 +351,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq.empty[FileStatus]) + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList + val logInfos = statusList .filter { entry => - val fileInfo = fileToAppInfo.get(entry.getPath()) - val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - prevFileSize < entry.getLen() && - SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) + SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && + recordedFileSize(entry.getPath()) < entry.getLen() } .flatMap { entry => Some(entry) } .sortWith { case (entry1, entry2) => - entry1.getModificationTime() >= entry2.getModificationTime() - } + entry1.getModificationTime() > entry2.getModificationTime() + } if (logInfos.nonEmpty) { logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") @@ -422,207 +454,100 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - applications.get(appId) match { - case Some(appInfo) => - try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => - attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - zipFileToStream(logPath, attempt.logPath, zipStream) - } - } finally { - zipStream.close() + val app = try { + load(appId) + } catch { + case _: NoSuchElementException => + throw new SparkException(s"Logs for $appId not found.") + } + + try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId + .map { id => app.attempts.filter(_.info.attemptId == Some(id)) } + .getOrElse(app.attempts) + .map(_.logPath) + .foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") + } finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { - val newAttempts = try { - val eventsFilter: ReplayEventsFilter = { eventString => - eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) || - eventString.startsWith(LOG_START_EVENT_PREFIX) - } - - val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) - - // Use loading time as lastUpdated since some filesystems don't update modifiedTime - // each time file is updated. However use modifiedTime for completed jobs so lastUpdated - // won't change whenever HistoryServer restarts and reloads the file. - val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() - - val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) - - // Without an app ID, new logs will render incorrectly in the listing page, so do not list or - // try to show their UI. - if (appListener.appId.isDefined) { - val attemptInfo = new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - lastUpdated, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted, - fileStatus.getLen(), - appListener.appSparkVersion.getOrElse("") - ) - fileToAppInfo.put(logPath, attemptInfo) - logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") - Some(attemptInfo) - } else { - logWarning(s"Failed to load application log ${fileStatus.getPath}. " + - "The application may have not started.") - None - } - - } catch { - case e: Exception => - logError( - s"Exception encountered when attempting to load application log ${fileStatus.getPath}", - e) - None - } - - if (newAttempts.isEmpty) { - return + val eventsFilter: ReplayEventsFilter = { eventString => + eventString.startsWith(APPL_START_EVENT_PREFIX) || + eventString.startsWith(APPL_END_EVENT_PREFIX) || + eventString.startsWith(LOG_START_EVENT_PREFIX) } - // Build a map containing all apps that contain new attempts. The app information in this map - // contains both the new app attempt, and those that were already loaded in the existing apps - // map. If an attempt has been updated, it replaces the old attempt in the list. - val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]() - - applications.synchronized { - newAttempts.foreach { attempt => - val appInfo = newAppMap.get(attempt.appId) - .orElse(applications.get(attempt.appId)) - .map { app => - val attempts = - app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt) - new FsApplicationHistoryInfo(attempt.appId, attempt.name, - attempts.sortWith(compareAttemptInfo)) - } - .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt))) - newAppMap(attempt.appId) = appInfo - } - - // Merge the new app list with the existing one, maintaining the expected ordering (descending - // end time). Maintaining the order is important to avoid having to sort the list every time - // there is a request for the log list. - val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo) - val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { - if (!mergedApps.contains(info.id)) { - mergedApps += (info.id -> info) - } - } + val logPath = fileStatus.getPath() + logInfo(s"Replaying log path: $logPath") - val newIterator = newApps.iterator.buffered - val oldIterator = applications.values.iterator.buffered - while (newIterator.hasNext && oldIterator.hasNext) { - if (newAppMap.contains(oldIterator.head.id)) { - oldIterator.next() - } else if (compareAppInfo(newIterator.head, oldIterator.head)) { - addIfAbsent(newIterator.next()) - } else { - addIfAbsent(oldIterator.next()) - } - } - newIterator.foreach(addIfAbsent) - oldIterator.foreach(addIfAbsent) + val bus = new ReplayListenerBus() + val listener = new AppListingListener(fileStatus, clock) + bus.addListener(listener) - applications = mergedApps - } + replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) + listener.applicationInfo.foreach(addListing) + listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) } /** * Delete event logs from the log directory according to the clean policy defined by the user. */ private[history] def cleanLogs(): Unit = { + var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None try { - val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000 - - val now = clock.getTimeMillis() - val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - - def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { - now - attempt.lastUpdated > maxAge - } - - // Scan all logs from the log directory. - // Only completed applications older than the specified max age will be deleted. - applications.values.foreach { app => - val (toClean, toRetain) = app.attempts.partition(shouldClean) - attemptsToClean ++= toClean - - if (toClean.isEmpty) { - appsToRetain += (app.id -> app) - } else if (toRetain.nonEmpty) { - appsToRetain += (app.id -> - new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList)) + val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + + // Iterate descending over all applications whose oldest attempt is older than the maxAge. + iterator = Some(listing.view(classOf[ApplicationInfoWrapper]) + .index("oldestAttempt") + .reverse() + .first(maxTime) + .closeableIterator()) + + iterator.get.asScala.foreach { app => + val (remaining, toDelete) = app.attempts.partition { attempt => + attempt.info.lastUpdated.getTime() >= maxTime + } + if (remaining.nonEmpty) { + val newApp = new ApplicationInfoWrapper(app.info, remaining) + listing.write(newApp) + } else { + listing.delete(app.getClass(), app.id) } - } - - applications = appsToRetain - val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] - attemptsToClean.foreach { attempt => - try { - fs.delete(new Path(logDir, attempt.logPath), true) - } catch { - case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") - case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - leftToClean += attempt + toDelete.foreach { attempt => + val logPath = new Path(logDir, attempt.logPath) + try { + listing.delete(classOf[LogInfo], logPath.toString()) + } catch { + case _: NoSuchElementException => + logDebug(s"Log info entry for $logPath not found.") + } + try { + fs.delete(logPath, true) + } catch { + case e: AccessControlException => + logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") + case t: IOException => + logError(s"IOException in cleaning ${attempt.logPath}", t) + } } } - - attemptsToClean = leftToClean } catch { - case t: Exception => logError("Exception in cleaning logs", t) + case t: Exception => logError("Exception while cleaning logs", t) + } finally { + iterator.foreach(_.close()) } } - /** - * Comparison function that defines the sort order for the application listing. - * - * @return Whether `i1` should precede `i2`. - */ - private def compareAppInfo( - i1: FsApplicationHistoryInfo, - i2: FsApplicationHistoryInfo): Boolean = { - val a1 = i1.attempts.head - val a2 = i2.attempts.head - if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime - } - - /** - * Comparison function that defines the sort order for application attempts within the same - * application. Order is: attempts are sorted by descending start time. - * Most recent attempt state matches with current state of the app. - * - * Normally applications should have a single running attempt; but failure to call sc.stop() - * may cause multiple running attempts to show up. - * - * @return Whether `a1` should precede `a2`. - */ - private def compareAttemptInfo( - a1: FsApplicationAttemptInfo, - a2: FsApplicationAttemptInfo): Boolean = { - a1.startTime >= a2.startTime - } - /** * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns * an `ApplicationEventListener` instance with event data captured from the replay. @@ -647,6 +572,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appListener = new ApplicationEventListener bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) + logInfo(s"Finished replaying $logPath") appListener } finally { logInput.close() @@ -683,26 +609,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * @return a summary of the component state */ override def toString: String = { - val header = s""" - | FsHistoryProvider: logdir=$logDir, - | last scan time=$lastScanTime - | Cached application count =${applications.size}} - """.stripMargin - val sb = new StringBuilder(header) - applications.foreach(entry => sb.append(entry._2).append("\n")) - sb.toString - } - - /** - * Look up an application attempt - * @param appId application ID - * @param attemptId Attempt ID, if set - * @return the matching attempt, if found - */ - def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId) - } + val count = listing.count(classOf[ApplicationInfoWrapper]) + s"""|FsHistoryProvider{logdir=$logDir, + | storedir=$storePath, + | last scan time=$lastScanTime + | application count=$count}""".stripMargin } /** @@ -720,19 +631,64 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { - lookup(appId, attemptId) match { - case None => - logDebug(s"Application Attempt $appId/$attemptId not found") - false - case Some(latest) => - prevFileSize < latest.fileSize + try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize + } catch { + case _: NoSuchElementException => false } } + + private def recordedFileSize(log: Path): Long = { + try { + listing.read(classOf[LogInfo], log.toString()).fileSize + } catch { + case _: NoSuchElementException => 0L + } + } + + private def load(appId: String): ApplicationInfoWrapper = { + listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { + val attempt = app.attempts.head + + val oldApp = try { + listing.read(classOf[ApplicationInfoWrapper], app.id) + } catch { + case _: NoSuchElementException => + app + } + + def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = { + a1.info.startTime.getTime() > a2.info.startTime.getTime() + } + + val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++ + List(attempt) + val oldestAttempt = attempts.map(_.info.lastUpdated.getTime()).min + + val newAppInfo = new ApplicationInfoWrapper( + app.info, + attempts.sortWith(compareAttemptInfo)) + listing.write(newAppInfo) + } + + /** For testing. Returns internal data about a single attempt. */ + private[history] def getAttempt(appId: String, attemptId: Option[String]): AttemptInfoWrapper = { + load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( + throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId.")) + } + } private[history] object FsHistoryProvider { - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" - private val NOT_STARTED = "" private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" @@ -742,53 +698,145 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + private val CURRENT_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - * the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +case class KVStoreMetadata( + val version: Long, + val logDir: String) + +case class LogInfo( + @KVIndexParam val logPath: String, + val fileSize: Long) + +private[history] class AttemptInfoWrapper( + val info: v1.ApplicationAttemptInfo, val logPath: String, - val name: String, - val appId: String, - attemptId: Option[String], - startTime: Long, - endTime: Long, - lastUpdated: Long, - sparkUser: String, - completed: Boolean, - val fileSize: Long, - appSparkVersion: String) - extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) { - - /** extend the superclass string value with the extra attributes of this class */ - override def toString: String = { - s"FsApplicationAttemptInfo($name, $appId," + - s" ${super.toString}, source=$logPath, size=$fileSize" + val fileSize: Long) { + + def toAppAttemptInfo(): ApplicationAttemptInfo = { + ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), + info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, + info.completed, info.appSparkVersion) } + } -/** - * Application history information - * @param id application ID - * @param name application name - * @param attempts list of attempts, most recent first. - */ -private class FsApplicationHistoryInfo( - id: String, - override val name: String, - override val attempts: List[FsApplicationAttemptInfo]) - extends ApplicationHistoryInfo(id, name, attempts) +private[history] class ApplicationInfoWrapper( + val info: v1.ApplicationInfo, + val attempts: List[AttemptInfoWrapper]) { + + @JsonIgnore @KVIndexParam + def id: String = info.id + + @JsonIgnore @KVIndexParam("endTime") + def endTime(): Long = attempts.head.info.endTime.getTime() + + @JsonIgnore @KVIndexParam("oldestAttempt") + def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min + + def toAppHistoryInfo(): ApplicationHistoryInfo = { + ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) + } + + def toApiInfo(): v1.ApplicationInfo = { + new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores, + info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info)) + } + +} + +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { + + private val app = new MutableApplicationInfo() + private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + app.id = event.appId.orNull + app.name = event.appName + + attempt.attemptId = event.appAttemptId + attempt.startTime = new Date(event.time) + attempt.lastUpdated = new Date(clock.getTimeMillis()) + attempt.sparkUser = event.sparkUser + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + attempt.endTime = new Date(event.time) + attempt.lastUpdated = new Date(log.getModificationTime()) + attempt.duration = event.time - attempt.startTime.getTime() + attempt.completed = true + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case SparkListenerLogStart(sparkVersion) => + attempt.appSparkVersion = sparkVersion + case _ => + } + + def applicationInfo: Option[ApplicationInfoWrapper] = { + if (app.id != null) { + Some(app.toView(List(attempt.toView()))) + } else { + None + } + } + + private class MutableApplicationInfo { + var id: String = null + var name: String = null + var coresGranted: Option[Int] = None + var maxCores: Option[Int] = None + var coresPerExecutor: Option[Int] = None + var memoryPerExecutorMB: Option[Int] = None + + def toView(attempts: List[AttemptInfoWrapper]): ApplicationInfoWrapper = { + val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor, + memoryPerExecutorMB, Nil) + new ApplicationInfoWrapper(apiInfo, attempts) + } + + } + + private class MutableAttemptInfo(logPath: String, fileSize: Long) { + var attemptId: Option[String] = None + var startTime = new Date(-1) + var endTime = new Date(-1) + var lastUpdated = new Date(-1) + var duration = 0L + var sparkUser: String = null + var completed = false + var appSparkVersion = "" + + def toView(): AttemptInfoWrapper = { + val apiInfo = new v1.ApplicationAttemptInfo( + attemptId, + startTime, + endTime, + lastUpdated, + duration, + sparkUser, + completed, + appSparkVersion) + new AttemptInfoWrapper( + apiInfo, + logPath, + fileSize) + } + + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala new file mode 100644 index 0000000000000..669d54ad6e913 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.util.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") + .stringConf + .createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") + .stringConf + .createOptional + +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 05948f2661056..31659b25db318 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -20,6 +20,8 @@ import java.util.Date import scala.collection.Map +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + import org.apache.spark.JobExecutionStatus class ApplicationInfo private[spark]( @@ -31,6 +33,9 @@ class ApplicationInfo private[spark]( val memoryPerExecutorMB: Option[Int], val attempts: Seq[ApplicationAttemptInfo]) +@JsonIgnoreProperties( + value = Array("startTimeEpoch", "endTimeEpoch", "lastUpdatedEpoch"), + allowGetters = true) class ApplicationAttemptInfo private[spark]( val attemptId: Option[String], val startTime: Date, @@ -40,9 +45,13 @@ class ApplicationAttemptInfo private[spark]( val sparkUser: String, val completed: Boolean = false, val appSparkVersion: String) { - def getStartTimeEpoch: Long = startTime.getTime - def getEndTimeEpoch: Long = endTime.getTime - def getLastUpdatedEpoch: Long = lastUpdated.getTime + + def getStartTimeEpoch: Long = startTime.getTime + + def getEndTimeEpoch: Long = endTime.getTime + + def getLastUpdatedEpoch: Long = lastUpdated.getTime + } class ExecutorStageSummary private[spark]( diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 7109146ece371..38921fc0f0844 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -36,6 +36,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.io._ import org.apache.spark.scheduler._ @@ -54,6 +55,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc Utils.deleteRecursively(testDir) } + private def newProvider( + conf: SparkConf, + clock: Clock = null): FsHistoryProvider = { + if (clock == null) { + new FsHistoryProvider(conf) + } else { + new FsHistoryProvider(conf, clock) + } + } + /** Create a fake log file using the new log format used in Spark 1.3+ */ private def newLogFile( appId: String, @@ -68,7 +79,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("Parse application logs") { val clock = new ManualClock(12345678) - val provider = new FsHistoryProvider(createTestConf(), clock) + val provider = newProvider(createTestConf(), clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -163,7 +174,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("history file is renamed from inprogress to completed") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -172,20 +183,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ) updateAndCheck(provider) { list => list.size should be (1) - list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should - endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should endWith(EventLoggingListener.IN_PROGRESS) } logFile1.renameTo(newLogFile("app1", None, inProgress = false)) updateAndCheck(provider) { list => list.size should be (1) - list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not - endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should not endWith(EventLoggingListener.IN_PROGRESS) } } test("Parse logs that application is not started") { - val provider = new FsHistoryProvider((createTestConf())) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -197,7 +206,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-5582: empty log directory") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -213,7 +222,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("apps with multiple attempts with order") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) writeFile(attempt1, true, None, @@ -274,7 +283,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("log cleaner") { val maxAge = TimeUnit.SECONDS.toMillis(10) val clock = new ManualClock(maxAge / 2) - val provider = new FsHistoryProvider( + val provider = newProvider( createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) @@ -320,7 +329,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) val maxAge = TimeUnit.SECONDS.toMillis(40) val clock = new ManualClock(0) - val provider = new FsHistoryProvider( + val provider = newProvider( createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) val log1 = newLogFile("inProgressApp1", None, inProgress = true) @@ -342,23 +351,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.checkForLogs() // This should not trigger any cleanup - updateAndCheck(provider)(list => list.size should be(2)) + updateAndCheck(provider) { list => + list.size should be(2) + } // Should trigger cleanup for first file but not second one clock.setTime(firstFileModifiedTime + maxAge + 1) - updateAndCheck(provider)(list => list.size should be(1)) + updateAndCheck(provider) { list => + list.size should be(1) + } assert(!log1.exists()) assert(log2.exists()) // Should cleanup the second file as well. clock.setTime(secondFileModifiedTime + maxAge + 1) - updateAndCheck(provider)(list => list.size should be(0)) + updateAndCheck(provider) { list => + list.size should be(0) + } assert(!log1.exists()) assert(!log2.exists()) } test("Event log copy") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) val logs = (1 to 2).map { i => val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false) writeFile(log, true, None, @@ -393,7 +408,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-8372: new logs with no app ID are ignored") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) // Write a new log file without an app id, to make sure it's ignored. val logFile1 = newLogFile("app1", None, inProgress = true) @@ -407,7 +422,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("provider correctly checks whether fs is in safe mode") { - val provider = spy(new FsHistoryProvider(createTestConf())) + val provider = spy(newProvider(createTestConf())) val dfs = mock(classOf[DistributedFileSystem]) // Asserts that safe mode is false because we can't really control the return value of the mock, // since the API is different between hadoop 1 and 2. @@ -479,7 +494,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc SparkListenerApplicationEnd(5L) ) - val provider = new FsHistoryProvider(createTestConf()) + val provider = newProvider(createTestConf()) updateAndCheck(provider) { list => list.size should be (1) list(0).name should be ("real-app") @@ -496,7 +511,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc var provider: FsHistoryProvider = null try { - provider = new FsHistoryProvider(conf) + provider = newProvider(conf) val log = newLogFile("app1", Some("attempt1"), inProgress = false) writeFile(log, true, None, SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(), @@ -624,7 +639,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } private def createTestConf(): SparkConf = { - new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + .set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath()) } private class SafeModeTestProvider(conf: SparkConf, clock: Clock) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 95acb9a54440f..4277b82faa277 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -43,6 +43,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.selenium.WebBrowser import org.apache.spark._ +import org.apache.spark.deploy.history.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.JobUIData import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -64,6 +65,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers private val logDir = getTestResourcePath("spark-events") private val expRoot = getTestResourceFile("HistoryServerExpectations") + private val storeDir = Utils.createTempDir(namePrefix = "history") private var provider: FsHistoryProvider = null private var server: HistoryServer = null @@ -74,6 +76,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) conf.setAll(extraConf) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -87,14 +90,13 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers def stop(): Unit = { server.stop() + server = null } before { - init() - } - - after{ - stop() + if (server == null) { + init() + } } val cases = Seq( @@ -296,6 +298,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -372,6 +375,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("incomplete apps get refreshed") { + server.stop() implicit val webDriver: WebDriver = new HtmlUnitDriver implicit val formats = org.json4s.DefaultFormats @@ -388,6 +392,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.update.interval", "1s") .set("spark.eventLog.enabled", "true") .set("spark.history.cache.window", "250ms") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .remove("spark.testing") val provider = new FsHistoryProvider(myConf) val securityManager = HistoryServer.createSecurityManager(myConf) @@ -413,8 +418,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - // stop the server with the old config, and start the new one - server.stop() server = new HistoryServer(myConf, provider, securityManager, 18080) server.initialize() server.bind() diff --git a/docs/monitoring.md b/docs/monitoring.md index 3e577c5f36778..6bbd3e45be54e 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -220,6 +220,13 @@ The history server can be configured as follows: Number of threads that will be used by history server to process event logs. + + spark.history.store.path + /var/lib/spark-history + + Local directory where history server will cache application history data. + + Note that in all of these UIs, the tables are sortable by clicking their headers, diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 860ab35852331..09b295a886dd2 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -144,6 +144,7 @@ List buildClassPath(String appClassPath) throws IOException { if (prependClasses || isTesting) { String scala = getScalaVersion(); List projects = Arrays.asList( + "common/kvstore", "common/network-common", "common/network-shuffle", "common/network-yarn", diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 0a4073b03957c..853e66158a2d2 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -86,7 +86,7 @@ This file is divided into 3 sections: - + From 1f08bd7d7f5a57d27a4748baff0353f4a9f285da Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 8 Aug 2017 16:31:11 -0700 Subject: [PATCH 02/12] SHS-NG M2: Re-work LevelDBIterator.finalize(). Make sure the db is still open before trying to close the iterator, otherwise it may cause a JVM crash. --- .../apache/spark/util/kvstore/LevelDB.java | 33 ++++++++++++++----- .../spark/util/kvstore/LevelDBIterator.java | 6 +--- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 310febc352ef8..ff48b155fab31 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -213,17 +213,32 @@ public long count(Class type, String index, Object indexedValue) throws Excep @Override public void close() throws IOException { - DB _db = this._db.getAndSet(null); - if (_db == null) { - return; + synchronized (this._db) { + DB _db = this._db.getAndSet(null); + if (_db == null) { + return; + } + + try { + _db.close(); + } catch (IOException ioe) { + throw ioe; + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } } + } - try { - _db.close(); - } catch (IOException ioe) { - throw ioe; - } catch (Exception e) { - throw new IOException(e.getMessage(), e); + /** + * Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle + * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. + */ + void closeIterator(LevelDBIterator it) throws IOException { + synchronized (this._db) { + DB _db = this._db.get(); + if (_db != null) { + it.close(); + } } } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java index 48f9d10b681a2..b3ba76ba58052 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -198,11 +198,7 @@ public synchronized void close() throws IOException { */ @Override protected void finalize() throws Throwable { - try { - close(); - } catch (Exception e) { - // Ignore error here, db may have been closed already. - } + db.closeIterator(this); } private byte[] loadNext() { From 1ec1a672673eafa9f04ad81c1fff3feadfcddd2c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 8 Aug 2017 18:13:24 -0700 Subject: [PATCH 03/12] Add private where needed. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5aae6e5e1175c..081a2ac6b0668 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -714,11 +714,11 @@ private class KVStoreScalaSerializer extends KVStoreSerializer { } -case class KVStoreMetadata( +private[history] case class KVStoreMetadata( val version: Long, val logDir: String) -case class LogInfo( +private[history] case class LogInfo( @KVIndexParam val logPath: String, val fileSize: Long) From b696f963ee001687ecd9266b9115ba2aa2867740 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 14 Aug 2017 09:53:24 -0700 Subject: [PATCH 04/12] Feedback. --- .../deploy/history/ApplicationHistoryProvider.scala | 8 -------- .../spark/deploy/history/FsHistoryProvider.scala | 11 ++++++----- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 2e1aa44775d12..5cb48ca3e60b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -75,14 +75,6 @@ private[history] case class LoadedAppUI( private[history] abstract class ApplicationHistoryProvider { - /** - * The number of applications available for listing. Separate method in case it's cheaper - * to get a count than to calculate the whole listing. - * - * @return The number of available applications. - */ - def getAppCount(): Int = getListing().size - /** * Returns the count of application event logs that the provider is currently still processing. * History Server UI can use this to indicate to a user that the application listing on the UI diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 081a2ac6b0668..cae8587bdc1fe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -137,9 +137,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val meta = db.getMetadata(classOf[KVStoreMetadata]) if (meta == null) { - db.setMetadata(new KVStoreMetadata(CURRENT_VERSION, logDir.toString())) + db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir.toString())) db - } else if (meta.version != CURRENT_VERSION || !logDir.toString().equals(meta.logDir)) { + } else if (meta.version != CURRENT_LISTING_VERSION || + !logDir.toString().equals(meta.logDir)) { logInfo("Detected mismatched config in existing DB, deleting...") db.close() Utils.deleteRecursively(dbPath) @@ -505,7 +506,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 - // Iterate descending over all applications whose oldest attempt is older than the maxAge. + // Iterate descending over all applications whose oldest attempt happended before maxTime. iterator = Some(listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") .reverse() @@ -672,7 +673,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++ List(attempt) - val oldestAttempt = attempts.map(_.info.lastUpdated.getTime()).min val newAppInfo = new ApplicationInfoWrapper( app.info, @@ -699,7 +699,8 @@ private[history] object FsHistoryProvider { private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" - private val CURRENT_VERSION = 1L + /** Current version of the data written to the listing database. */ + private val CURRENT_LISTING_VERSION = 1L } /** From 519dab056964dae71309f65bcadee8ec08366284 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 15 Aug 2017 15:27:39 -0700 Subject: [PATCH 05/12] Feedback. --- .../spark/deploy/history/FsHistoryProvider.scala | 12 +++++------- docs/monitoring.md | 6 ++++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index cae8587bdc1fe..2d33a91b38cd7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -56,11 +56,10 @@ import org.apache.spark.util.kvstore._ * * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any * entries in the log dir whose modification time is greater than the last scan time - * are considered new or updated. These are replayed to create a new [[FsApplicationAttemptInfo]] - * entry and update or create a matching [[FsApplicationHistoryInfo]] element in the list - * of applications. + * are considered new or updated. These are replayed to create a new attempt info entry + * and update or create a matching application info element in the list of applications. * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the - * [[FsApplicationAttemptInfo]] is replaced by another one with a larger log size. + * attempt is replaced by another one with a larger log size. * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]] * instance is out of date, the log size of the cached instance is checked against the app last * loaded by [[checkForLogs]]. @@ -137,10 +136,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val meta = db.getMetadata(classOf[KVStoreMetadata]) if (meta == null) { - db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir.toString())) + db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir)) db - } else if (meta.version != CURRENT_LISTING_VERSION || - !logDir.toString().equals(meta.logDir)) { + } else if (meta.version != CURRENT_LISTING_VERSION || !logDir.equals(meta.logDir)) { logInfo("Detected mismatched config in existing DB, deleting...") db.close() Utils.deleteRecursively(dbPath) diff --git a/docs/monitoring.md b/docs/monitoring.md index 6bbd3e45be54e..f20cb22dd11e1 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -222,9 +222,11 @@ The history server can be configured as follows: spark.history.store.path - /var/lib/spark-history + (none) - Local directory where history server will cache application history data. + Local directory where to cache application history data. If set, the history + server will store application data on disk instead of keeping it in memory. The data + written to disk will be re-used in the event of a history server restart. From dc642bd70042da965387916656747ae78acdc192 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 16 Aug 2017 13:26:00 -0700 Subject: [PATCH 06/12] Feedback. --- .../deploy/history/FsHistoryProvider.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 2d33a91b38cd7..55a929d28c566 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -126,7 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val storePath = conf.get(LOCAL_STORE_DIR) - private val listing = storePath.map { path => + private val listing: KVStore = storePath.map { path => val dbPath = new File(path, "listing.ldb") def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) @@ -334,10 +334,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def stop(): Unit = { - listing.close() - if (initThread != null && initThread.isAlive()) { - initThread.interrupt() - initThread.join() + try { + if (initThread != null && initThread.isAlive()) { + initThread.interrupt() + initThread.join() + } + } finally { + listing.close() } } @@ -362,7 +365,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && recordedFileSize(entry.getPath()) < entry.getLen() } - .flatMap { entry => Some(entry) } .sortWith { case (entry1, entry2) => entry1.getModificationTime() > entry2.getModificationTime() } @@ -788,7 +790,7 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends def applicationInfo: Option[ApplicationInfoWrapper] = { if (app.id != null) { - Some(app.toView(List(attempt.toView()))) + Some(app.toView()) } else { None } @@ -802,10 +804,10 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends var coresPerExecutor: Option[Int] = None var memoryPerExecutorMB: Option[Int] = None - def toView(attempts: List[AttemptInfoWrapper]): ApplicationInfoWrapper = { + def toView(): ApplicationInfoWrapper = { val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor, memoryPerExecutorMB, Nil) - new ApplicationInfoWrapper(apiInfo, attempts) + new ApplicationInfoWrapper(apiInfo, List(attempt.toView())) } } From 9020184bba90fc1c7394ae8ab91877efe0699914 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 12 Sep 2017 16:24:31 -0700 Subject: [PATCH 07/12] More feedback. --- .../deploy/history/FsHistoryProvider.scala | 21 +++++++++++++------ .../apache/spark/deploy/history/config.scala | 2 ++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 55a929d28c566..7a32fdf77e3d8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -255,12 +255,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def getListing(): Iterator[ApplicationHistoryInfo] = { + // Return the listing in end time descending order. listing.view(classOf[ApplicationInfoWrapper]) .index("endTime") .reverse() .iterator() .asScala - .map(_.toAppHistoryInfo) + .map(_.toAppHistoryInfo()) } override def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] = { @@ -641,6 +642,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Return the last known size of the given event log, recorded the last time the file + * system scanner detected a change in the file. + */ private def recordedFileSize(log: Path): Long = { try { listing.read(classOf[LogInfo], log.toString()).fileSize @@ -699,7 +704,11 @@ private[history] object FsHistoryProvider { private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" - /** Current version of the data written to the listing database. */ + /** + * Current version of the data written to the listing database. When opening an existing + * db, if the version does not match this value, the FsHistoryProvider will throw away + * all data and re-generate the listing data from the event logs. + */ private val CURRENT_LISTING_VERSION = 1L } @@ -716,12 +725,12 @@ private class KVStoreScalaSerializer extends KVStoreSerializer { } private[history] case class KVStoreMetadata( - val version: Long, - val logDir: String) + version: Long, + logDir: String) private[history] case class LogInfo( - @KVIndexParam val logPath: String, - val fileSize: Long) + @KVIndexParam logPath: String, + fileSize: Long) private[history] class AttemptInfoWrapper( val info: v1.ApplicationAttemptInfo, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala index 669d54ad6e913..fb9e997def0dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -40,6 +40,8 @@ private[spark] object config { .createWithDefaultString("7d") val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") + .doc("Local directory where to cache application history information. By default this is " + + "not set, meaning all history information will be kept in memory.") .stringConf .createOptional From 29da2347cfba910c2b66734907fda422d163481a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 18 Sep 2017 09:16:32 -0700 Subject: [PATCH 08/12] Cleanup. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 7a32fdf77e3d8..55d1695bae2fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -496,7 +496,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) listener.applicationInfo.foreach(addListing) - listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) + listing.write(LogInfo(logPath.toString(), fileStatus.getLen())) } /** @@ -694,8 +694,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } private[history] object FsHistoryProvider { - private val NOT_STARTED = "" - private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" From 56b68a06e5fc41e423a10ed767c1eb50d62395f5 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 18 Sep 2017 09:50:25 -0700 Subject: [PATCH 09/12] Add test for CURRENT_LISTING_VERSION. --- .../deploy/history/FsHistoryProvider.scala | 5 ++-- .../history/FsHistoryProviderSuite.scala | 29 ++++++++++++++++++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index faf85e8a5d259..cf7f45317f9a3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -126,7 +126,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val storePath = conf.get(LOCAL_STORE_DIR) - private val listing: KVStore = storePath.map { path => + // Visible for testing. + private[history] val listing: KVStore = storePath.map { path => val dbPath = new File(path, "listing.ldb") def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) @@ -707,7 +708,7 @@ private[history] object FsHistoryProvider { * db, if the version does not match this value, the FsHistoryProvider will throw away * all data and re-generate the listing data from the event logs. */ - private val CURRENT_LISTING_VERSION = 1L + private[history] val CURRENT_LISTING_VERSION = 1L } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 38921fc0f0844..bcaaeca516a87 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -595,7 +595,34 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc securityManager.checkUIViewPermissions("user4") should be (false) securityManager.checkUIViewPermissions("user5") should be (false) } - } + } + + test("mismatched version discards old listing") { + val conf = createTestConf() + val oldProvider = newProvider(conf) + + val logFile1 = newLogFile("app1", None, inProgress = false) + writeFile(logFile1, true, None, + SparkListenerLogStart("2.3"), + SparkListenerApplicationStart("test", Some("test"), 1L, "test", None), + SparkListenerApplicationEnd(5L) + ) + + updateAndCheck(oldProvider) { list => + list.size should be (1) + } + assert(oldProvider.listing.count(classOf[ApplicationInfoWrapper]) === 1) + + // Manually overwrite the version in the listing db; this should cause the new provider to + // discard all data because the versions don't match. + val meta = new KVStoreMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1, + conf.get(LOCAL_STORE_DIR).get) + oldProvider.listing.setMetadata(meta) + oldProvider.stop() + + val mistatchedVersionProvider = newProvider(conf) + assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) === 0) + } /** * Asks the provider to check for logs and calls a function to perform checks on the updated From 67550488c08db72915b486dbb93578788b36426c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 25 Sep 2017 09:46:28 -0700 Subject: [PATCH 10/12] Feedback. --- .../deploy/history/FsHistoryProvider.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index cf7f45317f9a3..8156bd2fc40d0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -508,7 +508,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 - // Iterate descending over all applications whose oldest attempt happended before maxTime. + // Iterate descending over all applications whose oldest attempt happened before maxTime. iterator = Some(listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") .reverse() @@ -516,14 +516,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .closeableIterator()) iterator.get.asScala.foreach { app => + // Applications may have multiple attempts, some of which may not need to be deleted yet. val (remaining, toDelete) = app.attempts.partition { attempt => attempt.info.lastUpdated.getTime() >= maxTime } + if (remaining.nonEmpty) { val newApp = new ApplicationInfoWrapper(app.info, remaining) listing.write(newApp) - } else { - listing.delete(app.getClass(), app.id) } toDelete.foreach { attempt => @@ -543,6 +543,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logError(s"IOException in cleaning ${attempt.logPath}", t) } } + + if (remaining.isEmpty) { + listing.delete(app.getClass(), app.id) + } } } catch { case t: Exception => logError("Exception while cleaning logs", t) @@ -639,7 +643,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) recordedFileSize(logPath) > prevFileSize } catch { - case _: NoSuchElementException => false + case _: NoSuchElementException => + logDebug(s"Application Attempt $appId/$attemptId not found") + false } } @@ -761,11 +767,6 @@ private[history] class ApplicationInfoWrapper( ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) } - def toApiInfo(): v1.ApplicationInfo = { - new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores, - info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info)) - } - } private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { From cd2172a027cba92755c89dbaa5f010917749a98a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 25 Sep 2017 10:05:04 -0700 Subject: [PATCH 11/12] Run single test with in-memory store. --- .../history/FsHistoryProviderSuite.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index bcaaeca516a87..d4d64b81069f0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -77,9 +77,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc new File(logPath) } - test("Parse application logs") { + Seq(true, false).foreach { inMemory => + test(s"Parse application logs (inMemory = $inMemory)") { + testAppLogParsing(inMemory) + } + } + + private def testAppLogParsing(inMemory: Boolean) { val clock = new ManualClock(12345678) - val provider = newProvider(createTestConf(), clock) + val provider = newProvider(createTestConf(inMemory = inMemory), clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -665,10 +671,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc new FileOutputStream(file).close() } - private def createTestConf(): SparkConf = { - new SparkConf() + private def createTestConf(inMemory: Boolean = false): SparkConf = { + val conf = new SparkConf() .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) - .set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath()) + + if (!inMemory) { + conf.set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath()) + } + + conf } private class SafeModeTestProvider(conf: SparkConf, clock: Clock) From 5eff2c5f0891c6ea3fcfbbc7dacbbaf56c1d1788 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 26 Sep 2017 10:31:45 -0700 Subject: [PATCH 12/12] Get rid of `newProvider`. --- .../deploy/history/FsHistoryProvider.scala | 9 +++-- .../history/FsHistoryProviderSuite.scala | 38 +++++++------------ 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 79a62d4bdd741..3889dd097ee59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -284,7 +284,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val appInfo = load(appId) appInfo.attempts - .find { attempt => attempt.info.attemptId == attemptId } + .find(_.info.attemptId == attemptId) .map { attempt => val replayBus = new ReplayListenerBus() val ui = { @@ -299,6 +299,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) + assert(appListener.appId.isDefined) ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) // make sure to set admin acls before view acls so they are properly picked up @@ -313,6 +314,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)) } } catch { + case _: FileNotFoundException => None case _: NoSuchElementException => None } } @@ -357,9 +359,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) // scan for modified applications, replay and merge them - val logInfos = statusList + val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally @@ -675,7 +676,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val attempt = app.attempts.head val oldApp = try { - listing.read(classOf[ApplicationInfoWrapper], app.id) + load(app.id) } catch { case _: NoSuchElementException => app diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d4d64b81069f0..2141934c92640 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -55,16 +55,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc Utils.deleteRecursively(testDir) } - private def newProvider( - conf: SparkConf, - clock: Clock = null): FsHistoryProvider = { - if (clock == null) { - new FsHistoryProvider(conf) - } else { - new FsHistoryProvider(conf, clock) - } - } - /** Create a fake log file using the new log format used in Spark 1.3+ */ private def newLogFile( appId: String, @@ -85,7 +75,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc private def testAppLogParsing(inMemory: Boolean) { val clock = new ManualClock(12345678) - val provider = newProvider(createTestConf(inMemory = inMemory), clock) + val provider = new FsHistoryProvider(createTestConf(inMemory = inMemory), clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -180,7 +170,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("history file is renamed from inprogress to completed") { - val provider = newProvider(createTestConf()) + val provider = new FsHistoryProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -200,7 +190,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("Parse logs that application is not started") { - val provider = newProvider(createTestConf()) + val provider = new FsHistoryProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -212,7 +202,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-5582: empty log directory") { - val provider = newProvider(createTestConf()) + val provider = new FsHistoryProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -228,7 +218,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("apps with multiple attempts with order") { - val provider = newProvider(createTestConf()) + val provider = new FsHistoryProvider(createTestConf()) val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) writeFile(attempt1, true, None, @@ -289,7 +279,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("log cleaner") { val maxAge = TimeUnit.SECONDS.toMillis(10) val clock = new ManualClock(maxAge / 2) - val provider = newProvider( + val provider = new FsHistoryProvider( createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) @@ -335,7 +325,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) val maxAge = TimeUnit.SECONDS.toMillis(40) val clock = new ManualClock(0) - val provider = newProvider( + val provider = new FsHistoryProvider( createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) val log1 = newLogFile("inProgressApp1", None, inProgress = true) @@ -379,7 +369,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("Event log copy") { - val provider = newProvider(createTestConf()) + val provider = new FsHistoryProvider(createTestConf()) val logs = (1 to 2).map { i => val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false) writeFile(log, true, None, @@ -414,7 +404,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-8372: new logs with no app ID are ignored") { - val provider = newProvider(createTestConf()) + val provider = new FsHistoryProvider(createTestConf()) // Write a new log file without an app id, to make sure it's ignored. val logFile1 = newLogFile("app1", None, inProgress = true) @@ -428,7 +418,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("provider correctly checks whether fs is in safe mode") { - val provider = spy(newProvider(createTestConf())) + val provider = spy(new FsHistoryProvider(createTestConf())) val dfs = mock(classOf[DistributedFileSystem]) // Asserts that safe mode is false because we can't really control the return value of the mock, // since the API is different between hadoop 1 and 2. @@ -500,7 +490,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc SparkListenerApplicationEnd(5L) ) - val provider = newProvider(createTestConf()) + val provider = new FsHistoryProvider(createTestConf()) updateAndCheck(provider) { list => list.size should be (1) list(0).name should be ("real-app") @@ -517,7 +507,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc var provider: FsHistoryProvider = null try { - provider = newProvider(conf) + provider = new FsHistoryProvider(conf) val log = newLogFile("app1", Some("attempt1"), inProgress = false) writeFile(log, true, None, SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(), @@ -605,7 +595,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("mismatched version discards old listing") { val conf = createTestConf() - val oldProvider = newProvider(conf) + val oldProvider = new FsHistoryProvider(conf) val logFile1 = newLogFile("app1", None, inProgress = false) writeFile(logFile1, true, None, @@ -626,7 +616,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc oldProvider.listing.setMetadata(meta) oldProvider.stop() - val mistatchedVersionProvider = newProvider(conf) + val mistatchedVersionProvider = new FsHistoryProvider(conf) assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) === 0) }