From f73af34cabb8f4e7e993e6c6d88d4de603776b8e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 23 Nov 2016 13:59:35 -0800 Subject: [PATCH 1/3] [SPARK-20644][core] Initial ground work for kvstore UI backend. There are two somewhat unrelated things going on in this patch, but both are meant to make integration of individual UI pages later on much easier. The first part is some tweaking of the code in the listener so that it does less updates of the kvstore for data that changes fast; for example, it avoids writing changes down to the store for every task-related event, since those can arrive very quickly at times. Instead, for these kinds of events, it chooses to only flush things if a certain interval has passed. The interval is based on how often the current spark-shell code updates the progress bar for jobs, so that users can get reasonably accurate data. The code also delays as much as possible hitting the underlying kvstore when replaying apps in the history server. This is to avoid unnecessary writes to disk. The second set of changes prepare the history server and SparkUI for integrating with the kvstore. A new class, AppStatusStore, is used for translating between the stored data and the types used in the UI / API. The SHS now populates a kvstore with data loaded from event logs when an application UI is requested. Because this store can hold references to disk-based resources, the code was modified to retrieve data from the store under a read lock. This allows the SHS to detect when the store is still being used, and only update it (e.g. because an updated event log was detected) when there is no other thread using the store. This changed ended up creating a lot of churn in the ApplicationCache code, which was cleaned up a lot in the process. I also removed some metrics which don't make too much sense with the new code. Tested with existing and added unit tests, and by making sure the SHS still works on a real cluster. --- .../scala/org/apache/spark/SparkContext.scala | 17 +- .../deploy/history/ApplicationCache.scala | 400 +++++------------- .../history/ApplicationHistoryProvider.scala | 44 +- .../deploy/history/FsHistoryProvider.scala | 266 ++++++++---- .../spark/deploy/history/HistoryServer.scala | 18 +- .../scheduler/ApplicationEventListener.scala | 67 --- .../spark/status/AppStatusListener.scala | 87 ++-- .../apache/spark/status/AppStatusStore.scala | 239 +++++++++++ .../org/apache/spark/status/LiveEntity.scala | 5 +- .../spark/status/api/v1/ApiRootResource.scala | 19 +- .../org/apache/spark/status/config.scala | 30 ++ .../org/apache/spark/status/storeTypes.scala | 27 +- .../scala/org/apache/spark/ui/SparkUI.scala | 73 +--- .../history/ApplicationCacheSuite.scala | 194 ++------- .../history/FsHistoryProviderSuite.scala | 40 +- .../deploy/history/HistoryServerSuite.scala | 28 +- .../spark/status/AppStatusListenerSuite.scala | 19 +- project/MimaExcludes.scala | 2 + 18 files changed, 836 insertions(+), 739 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala create mode 100644 core/src/main/scala/org/apache/spark/status/AppStatusStore.scala create mode 100644 core/src/main/scala/org/apache/spark/status/config.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6f25d346e6e54..861c02858a5bd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend +import org.apache.spark.status.AppStatusStore import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -213,6 +214,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private var _statusStore: AppStatusStore = _ /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -421,6 +423,10 @@ class SparkContext(config: SparkConf) extends Logging { _jobProgressListener = new JobProgressListener(_conf) listenerBus.addToStatusQueue(jobProgressListener) + // Initialize the app status store and listener before SparkEnv is created so that it gets + // all events. + _statusStore = AppStatusStore.createLiveStore(conf, listenerBus) + // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) @@ -442,8 +448,12 @@ class SparkContext(config: SparkConf) extends Logging { _ui = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(SparkUI.createLiveUI(this, _conf, _jobProgressListener, - _env.securityManager, appName, startTime = startTime)) + Some(SparkUI.create(Some(this), _statusStore, _conf, + l => listenerBus.addToStatusQueue(l), + _env.securityManager, + appName, + "", + startTime)) } else { // For tests, do not enable the UI None @@ -1939,6 +1949,9 @@ class SparkContext(config: SparkConf) extends Logging { } SparkEnv.set(null) } + if (_statusStore != null) { + _statusStore.close() + } // Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this // `SparkContext` is stopped. localProperties.remove() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala index a370526c46f3d..60c24cb0a38bd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.history import java.util.NoSuchElementException +import java.util.concurrent.ExecutionException import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} @@ -26,6 +27,7 @@ import scala.util.control.NonFatal import com.codahale.metrics.{Counter, MetricRegistry, Timer} import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} +import com.google.common.util.concurrent.UncheckedExecutionException import org.eclipse.jetty.servlet.FilterHolder import org.apache.spark.internal.Logging @@ -40,11 +42,6 @@ import org.apache.spark.util.Clock * Incompleted applications have their update time checked on every * retrieval; if the cached entry is out of date, it is refreshed. * - * @note there must be only one instance of [[ApplicationCache]] in a - * JVM at a time. This is because a static field in [[ApplicationCacheCheckFilterRelay]] - * keeps a reference to the cache so that HTTP requests on the attempt-specific web UIs - * can probe the current cache to see if the attempts have changed. - * * Creating multiple instances will break this routing. * @param operations implementation of record access operations * @param retainedApplications number of retained applications @@ -80,7 +77,7 @@ private[history] class ApplicationCache( metrics.evictionCount.inc() val key = rm.getKey logDebug(s"Evicting entry ${key}") - operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui) + operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui) } } @@ -89,7 +86,7 @@ private[history] class ApplicationCache( * * Tagged as `protected` so as to allow subclasses in tests to access it directly */ - protected val appCache: LoadingCache[CacheKey, CacheEntry] = { + private val appCache: LoadingCache[CacheKey, CacheEntry] = { CacheBuilder.newBuilder() .maximumSize(retainedApplications) .removalListener(removalListener) @@ -101,130 +98,46 @@ private[history] class ApplicationCache( */ val metrics = new CacheMetrics("history.cache") - init() - - /** - * Perform any startup operations. - * - * This includes declaring this instance as the cache to use in the - * [[ApplicationCacheCheckFilterRelay]]. - */ - private def init(): Unit = { - ApplicationCacheCheckFilterRelay.setApplicationCache(this) - } - - /** - * Stop the cache. - * This will reset the relay in [[ApplicationCacheCheckFilterRelay]]. - */ - def stop(): Unit = { - ApplicationCacheCheckFilterRelay.resetApplicationCache() - } - - /** - * Get an entry. - * - * Cache fetch/refresh will have taken place by the time this method returns. - * @param appAndAttempt application to look up in the format needed by the history server web UI, - * `appId/attemptId` or `appId`. - * @return the entry - */ - def get(appAndAttempt: String): SparkUI = { - val parts = splitAppAndAttemptKey(appAndAttempt) - get(parts._1, parts._2) - } - - /** - * Get the Spark UI, converting a lookup failure from an exception to `None`. - * @param appAndAttempt application to look up in the format needed by the history server web UI, - * `appId/attemptId` or `appId`. - * @return the entry - */ - def getSparkUI(appAndAttempt: String): Option[SparkUI] = { + def get(appId: String, attemptId: Option[String] = None): CacheEntry = { try { - val ui = get(appAndAttempt) - Some(ui) + appCache.get(new CacheKey(appId, attemptId)) } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - None - case cause: Exception => throw cause - } + case e @ (_: ExecutionException | _: UncheckedExecutionException) => + throw Option(e.getCause()).getOrElse(e) } } /** - * Get the associated spark UI. - * - * Cache fetch/refresh will have taken place by the time this method returns. - * @param appId application ID - * @param attemptId optional attempt ID - * @return the entry + * Run a closure while holding an application's UI read lock. This prevents the history server + * from closing the UI data store while it's being used. */ - def get(appId: String, attemptId: Option[String]): SparkUI = { - lookupAndUpdate(appId, attemptId)._1.ui - } + def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + var entry = get(appId, attemptId) - /** - * Look up the entry; update it if needed. - * @param appId application ID - * @param attemptId optional attempt ID - * @return the underlying cache entry -which can have its timestamp changed, and a flag to - * indicate that the entry has changed - */ - private def lookupAndUpdate(appId: String, attemptId: Option[String]): (CacheEntry, Boolean) = { - metrics.lookupCount.inc() - val cacheKey = CacheKey(appId, attemptId) - var entry = appCache.getIfPresent(cacheKey) - var updated = false - if (entry == null) { - // no entry, so fetch without any post-fetch probes for out-of-dateness - // this will trigger a callback to loadApplicationEntry() - entry = appCache.get(cacheKey) - } else if (!entry.completed) { - val now = clock.getTimeMillis() - log.debug(s"Probing at time $now for updated application $cacheKey -> $entry") - metrics.updateProbeCount.inc() - updated = time(metrics.updateProbeTimer) { - entry.updateProbe() + // If the entry exists, we need to make sure we run the closure with a valid entry. So + // we need to re-try until we can lock a valid entry for read. + entry.loadedUI.lock.readLock().lock() + try { + while (!entry.loadedUI.valid) { + entry.loadedUI.lock.readLock().unlock() + entry = null + try { + appCache.invalidate(new CacheKey(appId, attemptId)) + entry = get(appId, attemptId) + metrics.loadCount.inc() + } finally { + if (entry != null) { + entry.loadedUI.lock.readLock().lock() + } + } } - if (updated) { - logDebug(s"refreshing $cacheKey") - metrics.updateTriggeredCount.inc() - appCache.refresh(cacheKey) - // and repeat the lookup - entry = appCache.get(cacheKey) - } else { - // update the probe timestamp to the current time - entry.probeTime = now + + fn(entry.loadedUI.ui) + } finally { + if (entry != null) { + entry.loadedUI.lock.readLock().unlock() } } - (entry, updated) - } - - /** - * This method is visible for testing. - * - * It looks up the cached entry *and returns a clone of it*. - * This ensures that the cached entries never leak - * @param appId application ID - * @param attemptId optional attempt ID - * @return a new entry with shared SparkUI, but copies of the other fields. - */ - def lookupCacheEntry(appId: String, attemptId: Option[String]): CacheEntry = { - val entry = lookupAndUpdate(appId, attemptId)._1 - new CacheEntry(entry.ui, entry.completed, entry.updateProbe, entry.probeTime) - } - - /** - * Probe for an application being updated. - * @param appId application ID - * @param attemptId attempt ID - * @return true if an update has been triggered - */ - def checkForUpdates(appId: String, attemptId: Option[String]): Boolean = { - val (entry, updated) = lookupAndUpdate(appId, attemptId) - updated } /** @@ -272,27 +185,15 @@ private[history] class ApplicationCache( * @throws NoSuchElementException if there is no matching element */ @throws[NoSuchElementException] - def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { - + private def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { logDebug(s"Loading application Entry $appId/$attemptId") metrics.loadCount.inc() - time(metrics.loadTimer) { + val loadedUI = time(metrics.loadTimer) { + metrics.lookupCount.inc() operations.getAppUI(appId, attemptId) match { - case Some(LoadedAppUI(ui, updateState)) => - val completed = ui.getApplicationInfoList.exists(_.attempts.last.completed) - if (completed) { - // completed spark UIs are attached directly - operations.attachSparkUI(appId, attemptId, ui, completed) - } else { - // incomplete UIs have the cache-check filter put in front of them. - ApplicationCacheCheckFilterRelay.registerFilter(ui, appId, attemptId) - operations.attachSparkUI(appId, attemptId, ui, completed) - } - // build the cache entry - val now = clock.getTimeMillis() - val entry = new CacheEntry(ui, completed, updateState, now) - logDebug(s"Loaded application $appId/$attemptId -> $entry") - entry + case Some(loadedUI) => + logDebug(s"Loaded application $appId/$attemptId") + loadedUI case None => metrics.lookupFailureCount.inc() // guava's cache logs via java.util log, so is of limited use. Hence: our own message @@ -301,32 +202,20 @@ private[history] class ApplicationCache( attemptId.map { id => s" attemptId '$id'" }.getOrElse(" and no attempt Id")) } } - } - - /** - * Split up an `applicationId/attemptId` or `applicationId` key into the separate pieces. - * - * @param appAndAttempt combined key - * @return a tuple of the application ID and, if present, the attemptID - */ - def splitAppAndAttemptKey(appAndAttempt: String): (String, Option[String]) = { - val parts = appAndAttempt.split("/") - require(parts.length == 1 || parts.length == 2, s"Invalid app key $appAndAttempt") - val appId = parts(0) - val attemptId = if (parts.length > 1) Some(parts(1)) else None - (appId, attemptId) - } - - /** - * Merge an appId and optional attempt Id into a key of the form `applicationId/attemptId`. - * - * If there is an `attemptId`; `applicationId` if not. - * @param appId application ID - * @param attemptId optional attempt ID - * @return a unified string - */ - def mergeAppAndAttemptToKey(appId: String, attemptId: Option[String]): String = { - appId + attemptId.map { id => s"/$id" }.getOrElse("") + try { + val completed = loadedUI.ui.getApplicationInfoList.exists(_.attempts.last.completed) + if (!completed) { + // incomplete UIs have the cache-check filter put in front of them. + registerFilter(new CacheKey(appId, attemptId), loadedUI, this) + } + operations.attachSparkUI(appId, attemptId, loadedUI.ui, completed) + new CacheEntry(loadedUI, completed) + } catch { + case e: Exception => + logWarning(s"Failed to initialize application UI for $appId/$attemptId", e) + operations.detachSparkUI(appId, attemptId, loadedUI.ui) + throw e + } } /** @@ -347,6 +236,26 @@ private[history] class ApplicationCache( sb.append("----\n") sb.toString() } + + /** + * Register a filter for the web UI which checks for updates to the given app/attempt + * @param ui Spark UI to attach filters to + * @param appId application ID + * @param attemptId attempt ID + */ + def registerFilter(key: CacheKey, loadedUI: LoadedAppUI, cache: ApplicationCache): Unit = { + require(loadedUI != null) + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) + val filter = new ApplicationCacheCheckFilter(key, loadedUI, cache) + val holder = new FilterHolder(filter) + require(loadedUI.ui.getHandlers != null, "null handlers") + loadedUI.ui.getHandlers.foreach { handler => + handler.addFilter(holder, "/*", enumDispatcher) + } + } + + def invalidate(key: CacheKey): Unit = appCache.invalidate(key) + } /** @@ -360,14 +269,12 @@ private[history] class ApplicationCache( * @param probeTime Times in milliseconds when the probe was last executed. */ private[history] final class CacheEntry( - val ui: SparkUI, - val completed: Boolean, - val updateProbe: () => Boolean, - var probeTime: Long) { + val loadedUI: LoadedAppUI, + val completed: Boolean) { /** string value is for test assertions */ override def toString: String = { - s"UI $ui, completed=$completed, probeTime=$probeTime" + s"UI ${loadedUI.ui}, completed=$completed" } } @@ -396,23 +303,17 @@ private[history] class CacheMetrics(prefix: String) extends Source { val evictionCount = new Counter() val loadCount = new Counter() val loadTimer = new Timer() - val updateProbeCount = new Counter() - val updateProbeTimer = new Timer() - val updateTriggeredCount = new Counter() /** all the counters: for registration and string conversion. */ private val counters = Seq( ("lookup.count", lookupCount), ("lookup.failure.count", lookupFailureCount), ("eviction.count", evictionCount), - ("load.count", loadCount), - ("update.probe.count", updateProbeCount), - ("update.triggered.count", updateTriggeredCount)) + ("load.count", loadCount)) /** all metrics, including timers */ private val allMetrics = counters ++ Seq( - ("load.timer", loadTimer), - ("update.probe.timer", updateProbeTimer)) + ("load.timer", loadTimer)) /** * Name of metric source @@ -498,23 +399,11 @@ private[history] trait ApplicationCacheOperations { * Implementation note: there's some abuse of a shared global entry here because * the configuration data passed to the servlet is just a string:string map. */ -private[history] class ApplicationCacheCheckFilter() extends Filter with Logging { - - import ApplicationCacheCheckFilterRelay._ - var appId: String = _ - var attemptId: Option[String] = _ - - /** - * Bind the app and attempt ID, throwing an exception if no application ID was provided. - * @param filterConfig configuration - */ - override def init(filterConfig: FilterConfig): Unit = { - - appId = Option(filterConfig.getInitParameter(APP_ID)) - .getOrElse(throw new ServletException(s"Missing Parameter $APP_ID")) - attemptId = Option(filterConfig.getInitParameter(ATTEMPT_ID)) - logDebug(s"initializing filter $this") - } +private[history] class ApplicationCacheCheckFilter( + key: CacheKey, + loadedUI: LoadedAppUI, + cache: ApplicationCache) + extends Filter with Logging { /** * Filter the request. @@ -543,123 +432,24 @@ private[history] class ApplicationCacheCheckFilter() extends Filter with Logging // if the request is for an attempt, check to see if it is in need of delete/refresh // and have the cache update the UI if so - if (operation=="HEAD" || operation=="GET" - && checkForUpdates(requestURI, appId, attemptId)) { - // send a redirect back to the same location. This will be routed - // to the *new* UI - logInfo(s"Application Attempt $appId/$attemptId updated; refreshing") + loadedUI.lock.readLock().lock() + if (loadedUI.valid) { + try { + chain.doFilter(request, response) + } finally { + loadedUI.lock.readLock.unlock() + } + } else { + loadedUI.lock.readLock.unlock() + cache.invalidate(key) val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("") val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr) httpResponse.sendRedirect(redirectUrl) - } else { - chain.doFilter(request, response) } } - override def destroy(): Unit = { - } - - override def toString: String = s"ApplicationCacheCheckFilter for $appId/$attemptId" -} - -/** - * Global state for the [[ApplicationCacheCheckFilter]] instances, so that they can relay cache - * probes to the cache. - * - * This is an ugly workaround for the limitation of servlets and filters in the Java servlet - * API; they are still configured on the model of a list of classnames and configuration - * strings in a `web.xml` field, rather than a chain of instances wired up by hand or - * via an injection framework. There is no way to directly configure a servlet filter instance - * with a reference to the application cache which is must use: some global state is needed. - * - * Here, [[ApplicationCacheCheckFilter]] is that global state; it relays all requests - * to the singleton [[ApplicationCache]] - * - * The field `applicationCache` must be set for the filters to work - - * this is done during the construction of [[ApplicationCache]], which requires that there - * is only one cache serving requests through the WebUI. - * - * *Important* In test runs, if there is more than one [[ApplicationCache]], the relay logic - * will break: filters may not find instances. Tests must not do that. - * - */ -private[history] object ApplicationCacheCheckFilterRelay extends Logging { - // name of the app ID entry in the filter configuration. Mandatory. - val APP_ID = "appId" - - // name of the attempt ID entry in the filter configuration. Optional. - val ATTEMPT_ID = "attemptId" - - // name of the filter to register - val FILTER_NAME = "org.apache.spark.deploy.history.ApplicationCacheCheckFilter" - - /** the application cache to relay requests to */ - @volatile - private var applicationCache: Option[ApplicationCache] = None - - /** - * Set the application cache. Logs a warning if it is overwriting an existing value - * @param cache new cache - */ - def setApplicationCache(cache: ApplicationCache): Unit = { - applicationCache.foreach( c => logWarning(s"Overwriting application cache $c")) - applicationCache = Some(cache) - } - - /** - * Reset the application cache - */ - def resetApplicationCache(): Unit = { - applicationCache = None - } - - /** - * Check to see if there has been an update - * @param requestURI URI the request came in on - * @param appId application ID - * @param attemptId attempt ID - * @return true if an update was loaded for the app/attempt - */ - def checkForUpdates(requestURI: String, appId: String, attemptId: Option[String]): Boolean = { + override def init(config: FilterConfig): Unit = { } - logDebug(s"Checking $appId/$attemptId from $requestURI") - applicationCache match { - case Some(cache) => - try { - cache.checkForUpdates(appId, attemptId) - } catch { - case ex: Exception => - // something went wrong. Keep going with the existing UI - logWarning(s"When checking for $appId/$attemptId from $requestURI", ex) - false - } + override def destroy(): Unit = { } - case None => - logWarning("No application cache instance defined") - false - } - } - - - /** - * Register a filter for the web UI which checks for updates to the given app/attempt - * @param ui Spark UI to attach filters to - * @param appId application ID - * @param attemptId attempt ID - */ - def registerFilter( - ui: SparkUI, - appId: String, - attemptId: Option[String] ): Unit = { - require(ui != null) - val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) - val holder = new FilterHolder() - holder.setClassName(FILTER_NAME) - holder.setInitParameter(APP_ID, appId) - attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id)) - require(ui.getHandlers != null, "null handlers") - ui.getHandlers.foreach { handler => - handler.addFilter(holder, "/*", enumDispatcher) - } - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 5cb48ca3e60b0..96a80c9a6665c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.history +import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.zip.ZipOutputStream import scala.xml.Node @@ -47,31 +48,30 @@ private[spark] case class ApplicationHistoryInfo( } } -/** - * A probe which can be invoked to see if a loaded Web UI has been updated. - * The probe is expected to be relative purely to that of the UI returned - * in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded, - * the probe returned with it is the one that must be used to check for it - * being out of date; previous probes must be discarded. - */ -private[history] abstract class HistoryUpdateProbe { - /** - * Return true if the history provider has a later version of the application - * attempt than the one against this probe was constructed. - * @return - */ - def isUpdated(): Boolean -} - /** * All the information returned from a call to `getAppUI()`: the new UI * and any required update state. * @param ui Spark UI * @param updateProbe probe to call to check on the update state of this application attempt */ -private[history] case class LoadedAppUI( - ui: SparkUI, - updateProbe: () => Boolean) +private[history] case class LoadedAppUI(ui: SparkUI) { + + val lock = new ReentrantReadWriteLock() + + @volatile private var _valid = true + + def valid: Boolean = _valid + + def invalidate(): Unit = { + lock.writeLock().lock() + try { + _valid = false + } finally { + lock.writeLock().unlock() + } + } + +} private[history] abstract class ApplicationHistoryProvider { @@ -145,4 +145,10 @@ private[history] abstract class ApplicationHistoryProvider { * @return html text to display when the application list is empty */ def getEmptyListingHtml(): Seq[Node] = Seq.empty + + /** + * Called when an application UI is unloaded from the history server. + */ + def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index cf97597b484d8..5d595325a23ad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} -import java.util.{Date, UUID} +import java.util.{Date, ServiceLoader, UUID} import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -26,8 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.Node -import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude} -import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} @@ -42,6 +41,7 @@ import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.{AppStatusListener, AppStatusStore, AppStatusStoreMetadata, KVUtils} import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI @@ -125,23 +125,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - private val storePath = conf.get(LOCAL_STORE_DIR) + private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) // Visible for testing. private[history] val listing: KVStore = storePath.map { path => + require(path.isDirectory(), s"Configured store directory ($path) does not exist.") val dbPath = new File(path, "listing.ldb") - val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, logDir.toString()) + val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, + AppStatusStore.CURRENT_VERSION, logDir.toString()) try { open(new File(path, "listing.ldb"), metadata) } catch { + // If there's an error, remove the listing database and any existing UI database + // from the store directory, since it's extremely likely that they'll all contain + // incompatible information. case _: UnsupportedStoreVersionException | _: MetadataMismatchException => logInfo("Detected incompatible DB versions, deleting...") - Utils.deleteRecursively(dbPath) + path.listFiles().foreach(Utils.deleteRecursively) open(new File(path, "listing.ldb"), metadata) } }.getOrElse(new InMemoryStore()) + private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]() + /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -165,7 +172,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - // Conf option used for testing the initialization code. val initThread = initialize() private[history] def initialize(): Thread = { @@ -268,42 +274,100 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getLastUpdatedTime(): Long = lastScanTime.get() override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { - try { - val appInfo = load(appId) - appInfo.attempts - .find(_.info.attemptId == attemptId) - .map { attempt => - val replayBus = new ReplayListenerBus() - val ui = { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, - HistoryServer.getAttemptURI(appId, attempt.info.attemptId), - Some(attempt.info.lastUpdated.getTime()), attempt.info.startTime.getTime()) - // Do not call ui.bind() to avoid creating a new server for each application - } + val app = try { + load(appId) + } catch { + case _: NoSuchElementException => + return None + } + + val attempt = app.attempts.find(_.info.attemptId == attemptId).orNull + if (attempt == null) { + return None + } - val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) - - val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - assert(appListener.appId.isDefined) - ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, - appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)) + val conf = this.conf.clone() + val secManager = new SecurityManager(conf) + + secManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + secManager.setAdminAcls(HISTORY_UI_ADMIN_ACLS + "," + attempt.adminAcls.getOrElse("")) + secManager.setViewAcls(attempt.info.sparkUser, attempt.viewAcls.getOrElse("")) + secManager.setAdminAclsGroups(HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + attempt.adminAclsGroups.getOrElse("")) + secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse("")) + + val replayBus = new ReplayListenerBus() + + val uiStorePath = storePath.map { path => getStorePath(path, appId, attemptId) } + + val (kvstore, needReplay) = uiStorePath match { + case Some(path) => + try { + val _replay = !path.isDirectory() + (createDiskStore(path, conf), _replay) + } catch { + case e: Exception => + // Get rid of the old data and re-create it. The store is either old or corrupted. + logWarning(s"Failed to load disk store $uiStorePath for $appId.", e) + Utils.deleteRecursively(path) + (createDiskStore(path, conf), true) } + + case _ => + (new InMemoryStore(), true) + } + + val listener = if (needReplay) { + val _listener = new AppStatusListener(kvstore, conf, false) + replayBus.addListener(_listener) + Some(_listener) + } else { + None + } + + val loadedUI = { + val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, + l => replayBus.addListener(l), + secManager, + app.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + attempt.info.startTime.getTime(), + appSparkVersion = attempt.info.appSparkVersion) + LoadedAppUI(ui) + } + + try { + val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], + Utils.getContextOrSparkClassLoader).asScala + listenerFactories.foreach { listenerFactory => + val listeners = listenerFactory.createListeners(conf, loadedUI.ui) + listeners.foreach(replayBus.addListener) + } + + val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) + replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) + listener.foreach(_.flush()) } catch { - case _: FileNotFoundException => None - case _: NoSuchElementException => None + case e: Exception => + try { + kvstore.close() + } catch { + case _e: Exception => logInfo("Error closing store.", _e) + } + uiStorePath.foreach(Utils.deleteRecursively) + if (e.isInstanceOf[FileNotFoundException]) { + return None + } else { + throw e + } + } + + synchronized { + activeUIs((appId, attemptId)) = loadedUI } + + Some(loadedUI) } override def getEmptyListingHtml(): Seq[Node] = { @@ -332,11 +396,40 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) initThread.interrupt() initThread.join() } + Seq(pool, replayExecutor).foreach { executor => + executor.shutdown() + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow() + } + } } finally { + activeUIs.foreach { case (_, loadedUI) => loadedUI.ui.store.close() } + activeUIs.clear() listing.close() } } + override def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { + val uiOption = synchronized { + activeUIs.remove((appId, attemptId)) + } + uiOption.foreach { loadedUI => + loadedUI.lock.writeLock().lock() + try { + loadedUI.ui.store.close() + } finally { + loadedUI.lock.writeLock().unlock() + } + + // If the UI is not valid, delete its files from disk, if any. This relies on the fact that + // ApplicationCache will never call this method concurrently with getAppUI() for the same + // appId / attemptId. + if (!loadedUI.valid && storePath.isDefined) { + Utils.deleteRecursively(getStorePath(storePath.get, appId, attemptId)) + } + } + } + /** * Builds the application list based on the current contents of the log directory. * Tries to reuse as much of the data already in memory as possible, by not reading @@ -475,7 +568,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val eventsFilter: ReplayEventsFilter = { eventString => eventString.startsWith(APPL_START_EVENT_PREFIX) || eventString.startsWith(APPL_END_EVENT_PREFIX) || - eventString.startsWith(LOG_START_EVENT_PREFIX) + eventString.startsWith(LOG_START_EVENT_PREFIX) || + eventString.startsWith(ENV_UPDATE_EVENT_PREFIX) } val logPath = fileStatus.getPath() @@ -486,8 +580,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) bus.addListener(listener) replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) - listener.applicationInfo.foreach(addListing) - listing.write(LogInfo(logPath.toString(), fileStatus.getLen())) + listener.applicationInfo.foreach { app => + // Invalidate the existing UI for the reloaded app attempt, if any. Note that this does + // not remove the UI from the active list; that has to be done in onUIDetached, so that + // cleanup of files can be done in a thread-safe manner. It does mean the UI will remain + // in memory for longer than it should. + synchronized { + activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui => + ui.invalidate() + ui.ui.store.close() + } + } + + addListing(app) + } + listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) } /** @@ -546,16 +653,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns - * an `ApplicationEventListener` instance with event data captured from the replay. - * `ReplayEventsFilter` determines what events are replayed and can therefore limit the - * data captured in the returned `ApplicationEventListener` instance. + * Replays the events in the specified log file on the supplied `ReplayListenerBus`. + * `ReplayEventsFilter` determines what events are replayed. */ private def replay( eventLog: FileStatus, appCompleted: Boolean, bus: ReplayListenerBus, - eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = { + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, @@ -566,11 +671,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { - val appListener = new ApplicationEventListener - bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) logInfo(s"Finished replaying $logPath") - appListener } finally { logInput.close() } @@ -613,32 +715,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | application count=$count}""".stripMargin } - /** - * Return true iff a newer version of the UI is available. The check is based on whether the - * fileSize for the currently loaded UI is smaller than the file size the last time - * the logs were loaded. - * - * This is a very cheap operation -- the work of loading the new attempt was already done - * by [[checkForLogs]]. - * @param appId application to probe - * @param attemptId attempt to probe - * @param prevFileSize the file size of the logs for the currently displayed UI - */ - private def updateProbe( - appId: String, - attemptId: Option[String], - prevFileSize: Long)(): Boolean = { - try { - val attempt = getAttempt(appId, attemptId) - val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) - recordedFileSize(logPath) > prevFileSize - } catch { - case _: NoSuchElementException => - logDebug(s"Application Attempt $appId/$attemptId not found") - false - } - } - /** * Return the last known size of the given event log, recorded the last time the file * system scanner detected a change in the file. @@ -682,6 +758,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newAppInfo) } + private def createDiskStore(path: File, conf: SparkConf): KVStore = { + val metadata = new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION) + KVUtils.open(path, metadata) + } + + private def getStorePath(path: File, appId: String, attemptId: Option[String]): File = { + val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb" + new File(path, fileName) + } + /** For testing. Returns internal data about a single attempt. */ private[history] def getAttempt(appId: String, attemptId: Option[String]): AttemptInfoWrapper = { load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( @@ -699,6 +785,8 @@ private[history] object FsHistoryProvider { private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + private val ENV_UPDATE_EVENT_PREFIX = "{\"Event\":\"SparkListenerEnvironmentUpdate\"," + /** * Current version of the data written to the listing database. When opening an existing * db, if the version does not match this value, the FsHistoryProvider will throw away @@ -709,6 +797,7 @@ private[history] object FsHistoryProvider { private[history] case class FsHistoryProviderMetadata( version: Long, + uiVersion: Long, logDir: String) private[history] case class LogInfo( @@ -718,7 +807,11 @@ private[history] case class LogInfo( private[history] class AttemptInfoWrapper( val info: v1.ApplicationAttemptInfo, val logPath: String, - val fileSize: Long) { + val fileSize: Long, + val adminAcls: Option[String], + val viewAcls: Option[String], + val adminAclsGroups: Option[String], + val viewAclsGroups: Option[String]) { def toAppAttemptInfo(): ApplicationAttemptInfo = { ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), @@ -769,6 +862,14 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends attempt.completed = true } + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { + val allProperties = event.environmentDetails("Spark Properties").toMap + attempt.viewAcls = allProperties.get("spark.ui.view.acls") + attempt.adminAcls = allProperties.get("spark.admin.acls") + attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups") + attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups") + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerLogStart(sparkVersion) => attempt.appSparkVersion = sparkVersion @@ -809,6 +910,11 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends var completed = false var appSparkVersion = "" + var adminAcls: Option[String] = None + var viewAcls: Option[String] = None + var adminAclsGroups: Option[String] = None + var viewAclsGroups: Option[String] = None + def toView(): AttemptInfoWrapper = { val apiInfo = new v1.ApplicationAttemptInfo( attemptId, @@ -822,7 +928,11 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends new AttemptInfoWrapper( apiInfo, logPath, - fileSize) + fileSize, + adminAcls, + viewAcls, + adminAclsGroups, + viewAclsGroups) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index d9c8fda99ef97..b822a48e98e91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -106,8 +106,8 @@ class HistoryServer( } } - def getSparkUI(appKey: String): Option[SparkUI] = { - appCache.getSparkUI(appKey) + override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + appCache.withSparkUI(appId, attemptId)(fn) } initialize() @@ -140,7 +140,6 @@ class HistoryServer( override def stop() { super.stop() provider.stop() - appCache.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -158,6 +157,7 @@ class HistoryServer( override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs") ui.getHandlers.foreach(detachHandler) + provider.onUIDetached(appId, attemptId, ui) } /** @@ -224,15 +224,13 @@ class HistoryServer( */ private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = { try { - appCache.get(appId, attemptId) + appCache.withSparkUI(appId, attemptId) { _ => + // Do nothing, just force the UI to load. + } true } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - false - - case cause: Exception => throw cause - } + case NonFatal(e: NoSuchElementException) => + false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala deleted file mode 100644 index 6da8865cd10d3..0000000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -/** - * A simple listener for application events. - * - * This listener expects to hear events from a single application only. If events - * from multiple applications are seen, the behavior is unspecified. - */ -private[spark] class ApplicationEventListener extends SparkListener { - var appName: Option[String] = None - var appId: Option[String] = None - var appAttemptId: Option[String] = None - var sparkUser: Option[String] = None - var startTime: Option[Long] = None - var endTime: Option[Long] = None - var viewAcls: Option[String] = None - var adminAcls: Option[String] = None - var viewAclsGroups: Option[String] = None - var adminAclsGroups: Option[String] = None - var appSparkVersion: Option[String] = None - - override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { - appName = Some(applicationStart.appName) - appId = applicationStart.appId - appAttemptId = applicationStart.appAttemptId - startTime = Some(applicationStart.time) - sparkUser = Some(applicationStart.sparkUser) - } - - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - endTime = Some(applicationEnd.time) - } - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - val environmentDetails = environmentUpdate.environmentDetails - val allProperties = environmentDetails("Spark Properties").toMap - viewAcls = allProperties.get("spark.ui.view.acls") - adminAcls = allProperties.get("spark.admin.acls") - viewAclsGroups = allProperties.get("spark.ui.view.acls.groups") - adminAclsGroups = allProperties.get("spark.admin.acls.groups") - } - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerLogStart(sparkVersion) => - appSparkVersion = Some(sparkVersion) - case _ => - } -} diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index f120685c941df..b96987e3d2bdc 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -34,12 +34,22 @@ import org.apache.spark.util.kvstore.KVStore * A Spark listener that writes application information to a data store. The types written to the * store are defined in the `storeTypes.scala` file and are based on the public REST API. */ -private class AppStatusListener(kvstore: KVStore) extends SparkListener with Logging { +private[spark] class AppStatusListener( + kvstore: KVStore, + conf: SparkConf, + live: Boolean) extends SparkListener with Logging { + + import config._ private var sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null private var coresPerTask: Int = 1 + // How often to update live entities. -1 means "never update" when replaying applications, + // meaning only the last write will happen. For live applications, this avoids a few + // operations that we can live without when rapidly processing incoming task events. + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + // Keep track of live entities, so that task metrics can be efficiently updated (without // causing too many writes to the underlying store, and other expensive operations). private val liveStages = new HashMap[(Int, Int), LiveStage]() @@ -110,7 +120,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log exec.totalCores = event.executorInfo.totalCores exec.maxTasks = event.executorInfo.totalCores / coresPerTask exec.executorLogs = event.executorInfo.logUrlMap - update(exec) + liveUpdate(exec) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { @@ -139,7 +149,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted - update(exec) + liveUpdate(exec) } } @@ -148,7 +158,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log liveExecutors.values.foreach { exec => if (exec.hostname == host) { exec.isBlacklisted = blacklisted - update(exec) + liveUpdate(exec) } } } @@ -178,7 +188,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log jobGroup, numTasks) liveJobs.put(event.jobId, job) - update(job) + liveUpdate(job) event.stageInfos.foreach { stageInfo => // A new job submission may re-use an existing stage, so this code needs to do an update @@ -186,7 +196,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log val stage = getOrCreateStage(stageInfo) stage.jobs :+= job stage.jobIds += event.jobId - update(stage) + liveUpdate(stage) } } @@ -218,38 +228,38 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log stage.jobs.foreach { job => job.completedStages = job.completedStages - event.stageInfo.stageId job.activeStages += 1 - update(job) + liveUpdate(job) } event.stageInfo.rddInfos.foreach { info => if (info.storageLevel.isValid) { - update(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info))) + liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info))) } } - update(stage) + liveUpdate(stage) } override def onTaskStart(event: SparkListenerTaskStart): Unit = { val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId) liveTasks.put(event.taskInfo.taskId, task) - update(task) + liveUpdate(task) liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage => stage.activeTasks += 1 stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime) - update(stage) + maybeUpdate(stage) stage.jobs.foreach { job => job.activeTasks += 1 - update(job) + maybeUpdate(job) } } liveExecutors.get(event.taskInfo.executorId).foreach { exec => exec.activeTasks += 1 exec.totalTasks += 1 - update(exec) + maybeUpdate(exec) } } @@ -257,7 +267,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log // Call update on the task so that the "getting result" time is written to the store; the // value is part of the mutable TaskInfo state that the live entity already references. liveTasks.get(event.taskInfo.taskId).foreach { task => - update(task) + maybeUpdate(task) } } @@ -301,13 +311,13 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log stage.activeTasks -= 1 stage.completedTasks += completedDelta stage.failedTasks += failedDelta - update(stage) + maybeUpdate(stage) stage.jobs.foreach { job => job.activeTasks -= 1 job.completedTasks += completedDelta job.failedTasks += failedDelta - update(job) + maybeUpdate(job) } val esummary = stage.executorSummary(event.taskInfo.executorId) @@ -317,7 +327,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log if (metricsDelta != null) { esummary.metrics.update(metricsDelta) } - update(esummary) + maybeUpdate(esummary) } liveExecutors.get(event.taskInfo.executorId).foreach { exec => @@ -333,7 +343,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log exec.completedTasks += completedDelta exec.failedTasks += failedDelta exec.totalDuration += event.taskInfo.duration - update(exec) + maybeUpdate(exec) } } @@ -349,7 +359,6 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log case _ if event.stageInfo.submissionTime.isDefined => v1.StageStatus.COMPLETE case _ => v1.StageStatus.SKIPPED } - update(stage) stage.jobs.foreach { job => stage.status match { @@ -362,7 +371,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log job.failedStages += 1 } job.activeStages -= 1 - update(job) + liveUpdate(job) } stage.executorSummaries.values.foreach(update) @@ -381,7 +390,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } exec.isActive = true exec.maxMemory = event.maxMem - update(exec) + liveUpdate(exec) } override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { @@ -398,15 +407,15 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log liveTasks.get(taskId).foreach { task => val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) val delta = task.updateMetrics(metrics) - update(task) + maybeUpdate(task) liveStages.get((sid, sAttempt)).foreach { stage => stage.metrics.update(delta) - update(stage) + maybeUpdate(stage) val esummary = stage.executorSummary(event.execId) esummary.metrics.update(delta) - update(esummary) + maybeUpdate(esummary) } } } @@ -419,6 +428,15 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } } + /** Flush all live entities' data to the underlying store. */ + def flush(): Unit = { + liveStages.values.foreach(update) + liveJobs.values.foreach(update) + liveExecutors.values.foreach(update) + liveTasks.values.foreach(update) + liveRDDs.values.foreach(update) + } + private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { val executorId = event.blockUpdatedInfo.blockManagerId.executorId @@ -508,9 +526,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta) exec.diskUsed = newValue(exec.diskUsed, diskDelta) exec.rddBlocks += rddBlocksDelta - if (exec.hasMemoryInfo || rddBlocksDelta != 0) { - update(exec) - } + maybeUpdate(exec) } } @@ -528,4 +544,21 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log entity.write(kvstore) } + /** Update a live entity only if it hasn't been updated in the last configured period. */ + private def maybeUpdate(entity: LiveEntity): Unit = { + if (liveUpdatePeriodNs >= 0) { + val now = System.nanoTime() + if (now - entity.lastWriteTime > liveUpdatePeriodNs) { + update(entity) + } + } + } + + /** Update an entity only if in a live app; avoids redundant writes when replaying logs. */ + private def liveUpdate(entity: LiveEntity): Unit = { + if (live) { + update(entity) + } + } + } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala new file mode 100644 index 0000000000000..2927a3227cbef --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.util.{Arrays, List => JList} + +import scala.collection.JavaConverters._ + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.status.api.v1 +import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} + +/** + * A wrapper around a KVStore that provides methods for accessing the API data stored within. + */ +private[spark] class AppStatusStore(store: KVStore) { + + def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { + val it = store.view(classOf[JobDataWrapper]).asScala.map(_.info) + if (!statuses.isEmpty()) { + it.filter { job => statuses.contains(job.status) }.toSeq + } else { + it.toSeq + } + } + + def job(jobId: Int): v1.JobData = { + store.read(classOf[JobDataWrapper], jobId).info + } + + def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = { + store.view(classOf[ExecutorSummaryWrapper]).index("active").reverse().first(true) + .last(true).asScala.map(_.info).toSeq + } + + def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = { + val it = store.view(classOf[StageDataWrapper]).asScala.map(_.info) + if (!statuses.isEmpty) { + it.filter { s => statuses.contains(s.status) }.toSeq + } else { + it.toSeq + } + } + + def stageData(stageId: Int): Seq[v1.StageData] = { + store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId) + .asScala.map(_.info).toSeq + } + + def stageAttempt(stageId: Int, stageAttemptId: Int): v1.StageData = { + store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).info + } + + def taskSummary( + stageId: Int, + stageAttemptId: Int, + quantiles: Array[Double]): v1.TaskMetricDistributions = { + + val stage = Array(stageId, stageAttemptId) + + val rawMetrics = store.view(classOf[TaskDataWrapper]) + .index("stage") + .first(stage) + .last(stage) + .asScala + .flatMap(_.info.taskMetrics) + .toList + .view + + def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] = + Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) + + // We need to do a lot of similar munging to nested metrics here. For each one, + // we want (a) extract the values for nested metrics (b) make a distribution for each metric + // (c) shove the distribution into the right field in our return type and (d) only return + // a result if the option is defined for any of the tasks. MetricHelper is a little util + // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just + // implement one "build" method, which just builds the quantiles for each field. + + val inputMetrics = + new MetricHelper[v1.InputMetrics, v1.InputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = raw.inputMetrics + + def build: v1.InputMetricDistributions = new v1.InputMetricDistributions( + bytesRead = submetricQuantiles(_.bytesRead), + recordsRead = submetricQuantiles(_.recordsRead) + ) + }.build + + val outputMetrics = + new MetricHelper[v1.OutputMetrics, v1.OutputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = raw.outputMetrics + + def build: v1.OutputMetricDistributions = new v1.OutputMetricDistributions( + bytesWritten = submetricQuantiles(_.bytesWritten), + recordsWritten = submetricQuantiles(_.recordsWritten) + ) + }.build + + val shuffleReadMetrics = + new MetricHelper[v1.ShuffleReadMetrics, v1.ShuffleReadMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics = + raw.shuffleReadMetrics + + def build: v1.ShuffleReadMetricDistributions = new v1.ShuffleReadMetricDistributions( + readBytes = submetricQuantiles { s => s.localBytesRead + s.remoteBytesRead }, + readRecords = submetricQuantiles(_.recordsRead), + remoteBytesRead = submetricQuantiles(_.remoteBytesRead), + remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk), + remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), + localBlocksFetched = submetricQuantiles(_.localBlocksFetched), + totalBlocksFetched = submetricQuantiles { s => + s.localBlocksFetched + s.remoteBlocksFetched + }, + fetchWaitTime = submetricQuantiles(_.fetchWaitTime) + ) + }.build + + val shuffleWriteMetrics = + new MetricHelper[v1.ShuffleWriteMetrics, v1.ShuffleWriteMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics = + raw.shuffleWriteMetrics + + def build: v1.ShuffleWriteMetricDistributions = new v1.ShuffleWriteMetricDistributions( + writeBytes = submetricQuantiles(_.bytesWritten), + writeRecords = submetricQuantiles(_.recordsWritten), + writeTime = submetricQuantiles(_.writeTime) + ) + }.build + + new v1.TaskMetricDistributions( + quantiles = quantiles, + executorDeserializeTime = metricQuantiles(_.executorDeserializeTime), + executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime), + executorRunTime = metricQuantiles(_.executorRunTime), + executorCpuTime = metricQuantiles(_.executorCpuTime), + resultSize = metricQuantiles(_.resultSize), + jvmGcTime = metricQuantiles(_.jvmGcTime), + resultSerializationTime = metricQuantiles(_.resultSerializationTime), + memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled), + diskBytesSpilled = metricQuantiles(_.diskBytesSpilled), + inputMetrics = inputMetrics, + outputMetrics = outputMetrics, + shuffleReadMetrics = shuffleReadMetrics, + shuffleWriteMetrics = shuffleWriteMetrics + ) + } + + def taskList( + stageId: Int, + stageAttemptId: Int, + offset: Int, + length: Int, + sortBy: v1.TaskSorting): Seq[v1.TaskData] = { + val stageKey = Array(stageId, stageAttemptId) + val base = store.view(classOf[TaskDataWrapper]) + val indexed = sortBy match { + case v1.TaskSorting.ID => + base.index("stage").first(stageKey).last(stageKey) + case v1.TaskSorting.INCREASING_RUNTIME => + base.index("runtime").first(stageKey ++ Array(-1L)).last(stageKey ++ Array(Long.MaxValue)) + case v1.TaskSorting.DECREASING_RUNTIME => + base.index("runtime").first(stageKey ++ Array(Long.MaxValue)).last(stageKey ++ Array(-1L)) + .reverse() + } + indexed.skip(offset).max(length).asScala.map(_.info).toSeq + } + + def rddList(): Seq[v1.RDDStorageInfo] = { + store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).toSeq + } + + def rdd(rddId: Int): v1.RDDStorageInfo = { + store.read(classOf[RDDStorageInfoWrapper], rddId).info + } + + def close(): Unit = { + store.close() + } + +} + +private[spark] object AppStatusStore { + + val CURRENT_VERSION = 1L + + /** + * Create an in-memory store for a live application. + * + * @param conf Configuration. + * @param bus Where to attach the listener to populate the store. + */ + def createLiveStore(conf: SparkConf, bus: LiveListenerBus): AppStatusStore = { + val store = new InMemoryStore() + val stateStore = new AppStatusStore(store) + bus.addToStatusQueue(new AppStatusListener(store, conf, true)) + stateStore + } + +} + +/** + * Helper for getting distributions from nested metric types. + */ +private abstract class MetricHelper[I, O]( + rawMetrics: Seq[v1.TaskMetrics], + quantiles: Array[Double]) { + + def getSubmetrics(raw: v1.TaskMetrics): I + + def build: O + + val data: Seq[I] = rawMetrics.map(getSubmetrics) + + /** applies the given function to all input metrics, and returns the quantiles */ + def submetricQuantiles(f: I => Double): IndexedSeq[Double] = { + Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles) + } +} diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 63fa36580bc7d..337ef0b3e6c2b 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -37,8 +37,11 @@ import org.apache.spark.util.kvstore.KVStore */ private[spark] abstract class LiveEntity { + var lastWriteTime = 0L + def write(store: KVStore): Unit = { store.write(doUpdate()) + lastWriteTime = System.nanoTime() } /** @@ -204,7 +207,7 @@ private class LiveTask( newAccumulatorInfos(info.accumulables), errorMessage, Option(recordedMetrics)) - new TaskDataWrapper(task) + new TaskDataWrapper(task, stageId, stageAttemptId) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index f17b637754826..9d3833086172f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -248,7 +248,13 @@ private[spark] object ApiRootResource { * interface needed for them all to expose application info as json. */ private[spark] trait UIRoot { - def getSparkUI(appKey: String): Option[SparkUI] + /** + * Runs some code with the current SparkUI instance for the app / attempt. + * + * @throws NoSuchElementException If the app / attempt pair does not exist. + */ + def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T + def getApplicationInfoList: Iterator[ApplicationInfo] def getApplicationInfo(appId: String): Option[ApplicationInfo] @@ -293,15 +299,18 @@ private[v1] trait ApiRequestContext { * to it. If there is no such app, throw an appropriate exception */ def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = { - val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) - uiRoot.getSparkUI(appKey) match { - case Some(ui) => + try { + uiRoot.withSparkUI(appId, attemptId) { ui => val user = httpRequest.getRemoteUser() if (!ui.securityManager.checkUIViewPermissions(user)) { throw new ForbiddenException(raw"""user "$user" is not authorized""") } f(ui) - case None => throw new NotFoundException("no such app: " + appId) + } + } catch { + case _: NoSuchElementException => + val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) + throw new NotFoundException(s"no such app: $appKey") } } } diff --git a/core/src/main/scala/org/apache/spark/status/config.scala b/core/src/main/scala/org/apache/spark/status/config.scala new file mode 100644 index 0000000000000..49144fc883e69 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/config.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.config._ + +private[spark] object config { + + val LIVE_ENTITY_UPDATE_PERIOD = ConfigBuilder("spark.ui.liveUpdate.period") + .timeConf(TimeUnit.NANOSECONDS) + .createWithDefaultString("100ms") + +} diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9579accd2cba7..340d5994a0012 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -17,12 +17,17 @@ package org.apache.spark.status +import java.lang.{Integer => JInteger, Long => JLong} + import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1._ import org.apache.spark.util.kvstore.KVIndex +private[spark] case class AppStatusStoreMetadata( + val version: Long) + private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { @JsonIgnore @KVIndex @@ -64,13 +69,33 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex def id: Array[Int] = Array(info.stageId, info.attemptId) + @JsonIgnore @KVIndex("stageId") + def stageId: Int = info.stageId + } -private[spark] class TaskDataWrapper(val info: TaskData) { +/** + * The task information is always indexed with the stage ID, since that is how the UI and API + * consume it. That means every indexed value has the stage ID and attempt ID included, aside + * from the actual data being indexed. + */ +private[spark] class TaskDataWrapper( + val info: TaskData, + val stageId: Int, + val stageAttemptId: Int) { @JsonIgnore @KVIndex def id: Long = info.taskId + @JsonIgnore @KVIndex("stage") + def stage: Array[Int] = Array(stageId, stageAttemptId) + + @JsonIgnore @KVIndex("runtime") + def runtime: Array[AnyRef] = { + val _runtime = info.taskMetrics.map(_.executorRunTime).getOrElse(-1L) + Array(stageId: JInteger, stageAttemptId: JInteger, _runtime: JLong) + } + } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 6e94073238a56..ee645f6bf8a7a 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ +import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, UIRoot} import org.apache.spark.storage.StorageStatusListener @@ -39,6 +40,7 @@ import org.apache.spark.util.Utils * Top level user interface for a Spark application. */ private[spark] class SparkUI private ( + val store: AppStatusStore, val sc: Option[SparkContext], val conf: SparkConf, securityManager: SecurityManager, @@ -51,7 +53,8 @@ private[spark] class SparkUI private ( var appName: String, val basePath: String, val lastUpdateTime: Option[Long] = None, - val startTime: Long) + val startTime: Long, + val appSparkVersion: String) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging @@ -61,8 +64,6 @@ private[spark] class SparkUI private ( var appId: String = _ - var appSparkVersion = org.apache.spark.SPARK_VERSION - private var streamingJobProgressListener: Option[SparkListener] = None /** Initialize all components of the server. */ @@ -104,8 +105,12 @@ private[spark] class SparkUI private ( logInfo(s"Stopped Spark web UI at $webUrl") } - def getSparkUI(appId: String): Option[SparkUI] = { - if (appId == this.appId) Some(this) else None + override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + if (appId == this.appId) { + fn(this) + } else { + throw new NoSuchElementException() + } } def getApplicationInfoList: Iterator[ApplicationInfo] = { @@ -159,63 +164,26 @@ private[spark] object SparkUI { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } - def createLiveUI( - sc: SparkContext, - conf: SparkConf, - jobProgressListener: JobProgressListener, - securityManager: SecurityManager, - appName: String, - startTime: Long): SparkUI = { - create(Some(sc), conf, - sc.listenerBus.addToStatusQueue, - securityManager, appName, jobProgressListener = Some(jobProgressListener), - startTime = startTime) - } - - def createHistoryUI( - conf: SparkConf, - listenerBus: SparkListenerBus, - securityManager: SecurityManager, - appName: String, - basePath: String, - lastUpdateTime: Option[Long], - startTime: Long): SparkUI = { - val sparkUI = create(None, conf, listenerBus.addListener, securityManager, appName, basePath, - lastUpdateTime = lastUpdateTime, startTime = startTime) - - val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], - Utils.getContextOrSparkClassLoader).asScala - listenerFactories.foreach { listenerFactory => - val listeners = listenerFactory.createListeners(conf, sparkUI) - listeners.foreach(listenerBus.addListener) - } - sparkUI - } - /** - * Create a new Spark UI. - * - * @param sc optional SparkContext; this can be None when reconstituting a UI from event logs. - * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the - * web UI will create and register its own JobProgressListener. + * Create a new UI backed by an AppStatusStore. */ - private def create( + def create( sc: Option[SparkContext], + store: AppStatusStore, conf: SparkConf, addListenerFn: SparkListenerInterface => Unit, securityManager: SecurityManager, appName: String, - basePath: String = "", - jobProgressListener: Option[JobProgressListener] = None, + basePath: String, + startTime: Long, lastUpdateTime: Option[Long] = None, - startTime: Long): SparkUI = { + appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = { - val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { + val jobProgressListener = sc.map(_.jobProgressListener).getOrElse { val listener = new JobProgressListener(conf) addListenerFn(listener) listener } - val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) @@ -228,8 +196,9 @@ private[spark] object SparkUI { addListenerFn(storageListener) addListenerFn(operationGraphListener) - new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, lastUpdateTime, startTime) + new SparkUI(store, sc, conf, securityManager, environmentListener, storageStatusListener, + executorsListener, jobProgressListener, storageListener, operationGraphListener, + appName, basePath, lastUpdateTime, startTime, appSparkVersion) } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index 6e50e84549047..44f9c566a380d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -18,15 +18,11 @@ package org.apache.spark.deploy.history import java.util.{Date, NoSuchElementException} -import javax.servlet.Filter import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.mutable -import scala.collection.mutable.ListBuffer import com.codahale.metrics.Counter -import com.google.common.cache.LoadingCache -import com.google.common.util.concurrent.UncheckedExecutionException import org.eclipse.jetty.servlet.ServletContextHandler import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -39,23 +35,10 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, ApplicationInfo} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{Clock, ManualClock, Utils} +import org.apache.spark.util.ManualClock class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar with Matchers { - /** - * subclass with access to the cache internals - * @param retainedApplications number of retained applications - */ - class TestApplicationCache( - operations: ApplicationCacheOperations = new StubCacheOperations(), - retainedApplications: Int, - clock: Clock = new ManualClock(0)) - extends ApplicationCache(operations, retainedApplications, clock) { - - def cache(): LoadingCache[CacheKey, CacheEntry] = appCache - } - /** * Stub cache operations. * The state is kept in a map of [[CacheKey]] to [[CacheEntry]], @@ -77,8 +60,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { logDebug(s"getAppUI($appId, $attemptId)") getAppUICount += 1 - instances.get(CacheKey(appId, attemptId)).map( e => - LoadedAppUI(e.ui, () => updateProbe(appId, attemptId, e.probeTime))) + instances.get(CacheKey(appId, attemptId)).map { e => e.loadedUI } } override def attachSparkUI( @@ -96,10 +78,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attemptId: Option[String], completed: Boolean, started: Long, - ended: Long, - timestamp: Long): SparkUI = { - val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp) - attachSparkUI(appId, attemptId, ui, completed) + ended: Long): LoadedAppUI = { + val ui = putAppUI(appId, attemptId, completed, started, ended) + attachSparkUI(appId, attemptId, ui.ui, completed) ui } @@ -108,23 +89,12 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attemptId: Option[String], completed: Boolean, started: Long, - ended: Long, - timestamp: Long): SparkUI = { - val ui = newUI(appId, attemptId, completed, started, ended) - putInstance(appId, attemptId, ui, completed, timestamp) + ended: Long): LoadedAppUI = { + val ui = LoadedAppUI(newUI(appId, attemptId, completed, started, ended)) + instances(CacheKey(appId, attemptId)) = new CacheEntry(ui, completed) ui } - def putInstance( - appId: String, - attemptId: Option[String], - ui: SparkUI, - completed: Boolean, - timestamp: Long): Unit = { - instances += (CacheKey(appId, attemptId) -> - new CacheEntry(ui, completed, () => updateProbe(appId, attemptId, timestamp), timestamp)) - } - /** * Detach a reconstructed UI * @@ -146,23 +116,6 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attached.get(CacheKey(appId, attemptId)) } - /** - * The update probe. - * @param appId application to probe - * @param attemptId attempt to probe - * @param updateTime timestamp of this UI load - */ - private[history] def updateProbe( - appId: String, - attemptId: Option[String], - updateTime: Long)(): Boolean = { - updateProbeCount += 1 - logDebug(s"isUpdated($appId, $attemptId, ${updateTime})") - val entry = instances.get(CacheKey(appId, attemptId)).get - val updated = entry.probeTime > updateTime - logDebug(s"entry = $entry; updated = $updated") - updated - } } /** @@ -210,15 +163,13 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val now = clock.getTimeMillis() // add the entry - operations.putAppUI(app1, None, true, now, now, now) + operations.putAppUI(app1, None, true, now, now) // make sure its local operations.getAppUI(app1, None).get operations.getAppUICount = 0 // now expect it to be found - val cacheEntry = cache.lookupCacheEntry(app1, None) - assert(1 === cacheEntry.probeTime) - assert(cacheEntry.completed) + cache.withSparkUI(app1, None) { _ => } // assert about queries made of the operations assert(1 === operations.getAppUICount, "getAppUICount") assert(1 === operations.attachCount, "attachCount") @@ -236,8 +187,8 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar assert(0 === operations.detachCount, "attachCount") // evict the entry - operations.putAndAttach("2", None, true, time2, time2, time2) - operations.putAndAttach("3", None, true, time2, time2, time2) + operations.putAndAttach("2", None, true, time2, time2) + operations.putAndAttach("3", None, true, time2, time2) cache.get("2") cache.get("3") @@ -248,7 +199,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val appId = "app1" val attemptId = Some("_01") val time3 = clock.getTimeMillis() - operations.putAppUI(appId, attemptId, false, time3, 0, time3) + operations.putAppUI(appId, attemptId, false, time3, 0) // expect an error here assertNotFound(appId, None) } @@ -256,10 +207,11 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Test that if an attempt ID is set, it must be used in lookups") { val operations = new StubCacheOperations() val clock = new ManualClock(1) - implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock) + implicit val cache = new ApplicationCache(operations, retainedApplications = 10, + clock = clock) val appId = "app1" val attemptId = Some("_01") - operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0) + operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0) assertNotFound(appId, None) } @@ -271,50 +223,29 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Incomplete apps refreshed") { val operations = new StubCacheOperations() val clock = new ManualClock(50) - val window = 500 - implicit val cache = new ApplicationCache(operations, retainedApplications = 5, clock = clock) + implicit val cache = new ApplicationCache(operations, 5, clock) val metrics = cache.metrics // add the incomplete app // add the entry val started = clock.getTimeMillis() val appId = "app1" val attemptId = Some("001") - operations.putAppUI(appId, attemptId, false, started, 0, started) - val firstEntry = cache.lookupCacheEntry(appId, attemptId) - assert(started === firstEntry.probeTime, s"timestamp in $firstEntry") - assert(!firstEntry.completed, s"entry is complete: $firstEntry") - assertMetric("lookupCount", metrics.lookupCount, 1) + val initialUI = operations.putAndAttach(appId, attemptId, false, started, 0) + val firstUI = cache.withSparkUI(appId, attemptId) { ui => ui } + assertMetric("lookupCount", metrics.lookupCount, 1) assert(0 === operations.updateProbeCount, "expected no update probe on that first get") - val checkTime = window * 2 - clock.setTime(checkTime) - val entry3 = cache.lookupCacheEntry(appId, attemptId) - assert(firstEntry !== entry3, s"updated entry test from $cache") + // Invalidate the first entry to trigger a re-load. + initialUI.invalidate() + + // Update the UI in the stub so that a new one is provided to the cache. + operations.putAppUI(appId, attemptId, true, started, started + 10) + + val updatedUI = cache.withSparkUI(appId, attemptId) { ui => ui } + assert(firstUI !== updatedUI, s"expected updated UI") assertMetric("lookupCount", metrics.lookupCount, 2) - assertMetric("updateProbeCount", metrics.updateProbeCount, 1) - assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0) - assert(1 === operations.updateProbeCount, s"refresh count in $cache") - assert(0 === operations.detachCount, s"detach count") - assert(entry3.probeTime === checkTime) - - val updateTime = window * 3 - // update the cached value - val updatedApp = operations.putAppUI(appId, attemptId, true, started, updateTime, updateTime) - val endTime = window * 10 - clock.setTime(endTime) - logDebug(s"Before operation = $cache") - val entry5 = cache.lookupCacheEntry(appId, attemptId) - assertMetric("lookupCount", metrics.lookupCount, 3) - assertMetric("updateProbeCount", metrics.updateProbeCount, 2) - // the update was triggered - assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1) - assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry {$entry5} in $cache") - - // at which point, the refreshes stop - clock.setTime(window * 20) - assertCacheEntryEquals(appId, attemptId, entry5) - assertMetric("updateProbeCount", metrics.updateProbeCount, 2) + assert(1 === operations.detachCount, s"detach count") } /** @@ -337,27 +268,6 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar } } - /** - * Look up the cache entry and assert that it matches in the expected value. - * This assertion works if the two CacheEntries are different -it looks at the fields. - * UI are compared on object equality; the timestamp and completed flags directly. - * @param appId application ID - * @param attemptId attempt ID - * @param expected expected value - * @param cache app cache - */ - def assertCacheEntryEquals( - appId: String, - attemptId: Option[String], - expected: CacheEntry) - (implicit cache: ApplicationCache): Unit = { - val actual = cache.lookupCacheEntry(appId, attemptId) - val errorText = s"Expected get($appId, $attemptId) -> $expected, but got $actual from $cache" - assert(expected.ui === actual.ui, errorText + " SparkUI reference") - assert(expected.completed === actual.completed, errorText + " -completed flag") - assert(expected.probeTime === actual.probeTime, errorText + " -timestamp") - } - /** * Assert that a key wasn't found in cache or loaded. * @@ -370,14 +280,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar appId: String, attemptId: Option[String]) (implicit cache: ApplicationCache): Unit = { - val ex = intercept[UncheckedExecutionException] { + val ex = intercept[NoSuchElementException] { cache.get(appId, attemptId) } - var cause = ex.getCause - assert(cause !== null) - if (!cause.isInstanceOf[NoSuchElementException]) { - throw cause - } } test("Large Scale Application Eviction") { @@ -385,12 +290,12 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val clock = new ManualClock(0) val size = 5 // only two entries are retained, so we expect evictions to occur on lookups - implicit val cache: ApplicationCache = new TestApplicationCache(operations, - retainedApplications = size, clock = clock) + implicit val cache = new ApplicationCache(operations, retainedApplications = size, + clock = clock) val attempt1 = Some("01") - val ids = new ListBuffer[String]() + val ids = new mutable.ListBuffer[String]() // build a list of applications val count = 100 for (i <- 1 to count ) { @@ -398,7 +303,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar ids += appId clock.advance(10) val t = clock.getTimeMillis() - operations.putAppUI(appId, attempt1, true, t, t, t) + operations.putAppUI(appId, attempt1, true, t, t) } // now go through them in sequence reading them, expect evictions ids.foreach { id => @@ -413,20 +318,19 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Attempts are Evicted") { val operations = new StubCacheOperations() - implicit val cache: ApplicationCache = new TestApplicationCache(operations, - retainedApplications = 4) + implicit val cache = new ApplicationCache(operations, 4, new ManualClock()) val metrics = cache.metrics val appId = "app1" val attempt1 = Some("01") val attempt2 = Some("02") val attempt3 = Some("03") - operations.putAppUI(appId, attempt1, true, 100, 110, 110) - operations.putAppUI(appId, attempt2, true, 200, 210, 210) - operations.putAppUI(appId, attempt3, true, 300, 310, 310) + operations.putAppUI(appId, attempt1, true, 100, 110) + operations.putAppUI(appId, attempt2, true, 200, 210) + operations.putAppUI(appId, attempt3, true, 300, 310) val attempt4 = Some("04") - operations.putAppUI(appId, attempt4, true, 400, 410, 410) + operations.putAppUI(appId, attempt4, true, 400, 410) val attempt5 = Some("05") - operations.putAppUI(appId, attempt5, true, 500, 510, 510) + operations.putAppUI(appId, attempt5, true, 500, 510) def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: Int): Unit = { assertMetric("loadCount", metrics.loadCount, expectedLoad) @@ -457,20 +361,14 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar } - test("Instantiate Filter") { - // this is a regression test on the filter being constructable - val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) - val instance = clazz.newInstance() - instance shouldBe a [Filter] - } - test("redirect includes query params") { - val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) - val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter] - filter.appId = "local-123" + val operations = new StubCacheOperations() + val ui = operations.putAndAttach("foo", None, true, 0, 10) val cache = mock[ApplicationCache] - when(cache.checkForUpdates(any(), any())).thenReturn(true) - ApplicationCacheCheckFilterRelay.setApplicationCache(cache) + when(cache.operations).thenReturn(operations) + val filter = new ApplicationCacheCheckFilter(new CacheKey("foo", None), ui, cache) + ui.invalidate() + val request = mock[HttpServletRequest] when(request.getMethod()).thenReturn("GET") when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 03bd3eaf579f3..86c8cdf43258c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.io._ import org.apache.spark.scheduler._ import org.apache.spark.security.GroupMappingServiceProvider +import org.apache.spark.status.AppStatusStore import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { @@ -612,7 +613,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc // Manually overwrite the version in the listing db; this should cause the new provider to // discard all data because the versions don't match. val meta = new FsHistoryProviderMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1, - conf.get(LOCAL_STORE_DIR).get) + AppStatusStore.CURRENT_VERSION, conf.get(LOCAL_STORE_DIR).get) oldProvider.listing.setMetadata(meta) oldProvider.stop() @@ -620,6 +621,43 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) === 0) } + test("invalidate cached UI") { + val provider = new FsHistoryProvider(createTestConf()) + val appId = "new1" + + // Write an incomplete app log. + val appLog = newLogFile(appId, None, inProgress = true) + writeFile(appLog, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None) + ) + provider.checkForLogs() + + // Load the app UI. + val oldUI = provider.getAppUI(appId, None) + assert(oldUI.isDefined) + intercept[NoSuchElementException] { + oldUI.get.ui.store.job(0) + } + + // Add more info to the app log, and trigger the provider to update things. + writeFile(appLog, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None), + SparkListenerJobStart(0, 1L, Nil, null), + SparkListenerApplicationEnd(5L) + ) + provider.checkForLogs() + + // Manually detach the old UI; ApplicationCache would do this automatically in a real SHS + // when the app's UI was requested. + provider.onUIDetached(appId, None, oldUI.get.ui) + + // Load the UI again and make sure we can get the new info added to the logs. + val freshUI = provider.getAppUI(appId, None) + assert(freshUI.isDefined) + assert(freshUI != oldUI) + freshUI.get.ui.store.job(0) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index c11543a4b3ba2..010a8dd004d4f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -72,6 +72,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers private var port: Int = -1 def init(extraConf: (String, String)*): Unit = { + Utils.deleteRecursively(storeDir) + assert(storeDir.mkdir()) val conf = new SparkConf() .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") @@ -292,21 +294,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val uiRoot = "/testwebproxybase" System.setProperty("spark.ui.proxyBase", uiRoot) - server.stop() - - val conf = new SparkConf() - .set("spark.history.fs.logDirectory", logDir) - .set("spark.history.fs.update.interval", "0") - .set("spark.testing", "true") - .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) - - provider = new FsHistoryProvider(conf) - provider.checkForLogs() - val securityManager = HistoryServer.createSecurityManager(conf) - - server = new HistoryServer(conf, provider, securityManager, 18080) - server.initialize() - server.bind() + stop() + init() val port = server.boundPort @@ -375,8 +364,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("incomplete apps get refreshed") { - server.stop() - implicit val webDriver: WebDriver = new HtmlUnitDriver implicit val formats = org.json4s.DefaultFormats @@ -386,6 +373,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers // a new conf is used with the background thread set and running at its fastest // allowed refresh rate (1Hz) + stop() val myConf = new SparkConf() .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) .set("spark.eventLog.dir", logDir.getAbsolutePath) @@ -418,7 +406,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - server = new HistoryServer(myConf, provider, securityManager, 18080) + server = new HistoryServer(myConf, provider, securityManager, 0) server.initialize() server.bind() val port = server.boundPort @@ -464,7 +452,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers rootAppPage should not be empty def getAppUI: SparkUI = { - provider.getAppUI(appId, None).get.ui + server.withSparkUI(appId, None) { ui => ui } } // selenium isn't that useful on failures...add our own reporting @@ -519,7 +507,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers getNumJobs("") should be (1) getNumJobs("/jobs") should be (1) getNumJobsRestful() should be (1) - assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics") + assert(metrics.lookupCount.getCount > 0, s"lookup count too low in $metrics") // dump state before the next bit of test, which is where update // checking really gets stressed diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 6f7a0c14dd684..7ac1ce19f8ddf 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.status import java.io.File -import java.util.{Date, Properties} +import java.lang.{Integer => JInteger, Long => JLong} +import java.util.{Arrays, Date, Properties} import scala.collection.JavaConverters._ import scala.reflect.{classTag, ClassTag} @@ -36,6 +37,10 @@ import org.apache.spark.util.kvstore._ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { + import config._ + + private val conf = new SparkConf().set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + private var time: Long = _ private var testDir: File = _ private var store: KVStore = _ @@ -52,7 +57,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("scheduler events") { - val listener = new AppStatusListener(store) + val listener = new AppStatusListener(store, conf, true) // Start the application. time += 1 @@ -174,6 +179,14 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { s1Tasks.foreach { task => check[TaskDataWrapper](task.taskId) { wrapper => assert(wrapper.info.taskId === task.taskId) + assert(wrapper.stageId === stages.head.stageId) + assert(wrapper.stageAttemptId === stages.head.attemptId) + assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptId))) + + val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger, + -1L: JLong) + assert(Arrays.equals(wrapper.runtime, runtime)) + assert(wrapper.info.index === task.index) assert(wrapper.info.attempt === task.attemptNumber) assert(wrapper.info.launchTime === new Date(task.launchTime)) @@ -510,7 +523,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("storage events") { - val listener = new AppStatusListener(store) + val listener = new AppStatusListener(store, conf, true) val maxMemory = 42L // Register a couple of block managers. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 45b8870f3b62f..99cac34c85ebc 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -38,6 +38,8 @@ object MimaExcludes { lazy val v23excludes = v22excludes ++ Seq( // SPARK-18085: Better History Server scalability for many / large applications ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"), + // [SPARK-20495][SQL] Add StorageLevel to cacheTable API ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable"), From eaf3c857ff4c06c09459f35de79fa84dd3c35b21 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 3 Nov 2017 11:04:49 -0700 Subject: [PATCH 2/3] Call System.nanoTime() only once per event. --- .../spark/status/AppStatusListener.scala | 98 ++++++++++--------- .../org/apache/spark/status/LiveEntity.scala | 4 +- 2 files changed, 56 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index b96987e3d2bdc..cd43612fae357 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -120,13 +120,13 @@ private[spark] class AppStatusListener( exec.totalCores = event.executorInfo.totalCores exec.maxTasks = event.executorInfo.totalCores / coresPerTask exec.executorLogs = event.executorInfo.logUrlMap - liveUpdate(exec) + liveUpdate(exec, System.nanoTime()) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { liveExecutors.remove(event.executorId).foreach { exec => exec.isActive = false - update(exec) + update(exec, System.nanoTime()) } } @@ -149,21 +149,25 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted - liveUpdate(exec) + liveUpdate(exec, System.nanoTime()) } } private def updateNodeBlackList(host: String, blacklisted: Boolean): Unit = { + val now = System.nanoTime() + // Implicitly (un)blacklist every executor associated with the node. liveExecutors.values.foreach { exec => if (exec.hostname == host) { exec.isBlacklisted = blacklisted - liveUpdate(exec) + liveUpdate(exec, now) } } } override def onJobStart(event: SparkListenerJobStart): Unit = { + val now = System.nanoTime() + // Compute (a potential over-estimate of) the number of tasks that will be run by this job. // This may be an over-estimate because the job start event references all of the result // stages' transitive stage dependencies, but some of these stages might be skipped if their @@ -188,7 +192,7 @@ private[spark] class AppStatusListener( jobGroup, numTasks) liveJobs.put(event.jobId, job) - liveUpdate(job) + liveUpdate(job, now) event.stageInfos.foreach { stageInfo => // A new job submission may re-use an existing stage, so this code needs to do an update @@ -196,7 +200,7 @@ private[spark] class AppStatusListener( val stage = getOrCreateStage(stageInfo) stage.jobs :+= job stage.jobIds += event.jobId - liveUpdate(stage) + liveUpdate(stage, now) } } @@ -208,11 +212,12 @@ private[spark] class AppStatusListener( } job.completionTime = Some(new Date(event.time)) - update(job) + update(job, System.nanoTime()) } } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + val now = System.nanoTime() val stage = getOrCreateStage(event.stageInfo) stage.status = v1.StageStatus.ACTIVE stage.schedulingPool = Option(event.properties).flatMap { p => @@ -228,38 +233,39 @@ private[spark] class AppStatusListener( stage.jobs.foreach { job => job.completedStages = job.completedStages - event.stageInfo.stageId job.activeStages += 1 - liveUpdate(job) + liveUpdate(job, now) } event.stageInfo.rddInfos.foreach { info => if (info.storageLevel.isValid) { - liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info))) + liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now) } } - liveUpdate(stage) + liveUpdate(stage, now) } override def onTaskStart(event: SparkListenerTaskStart): Unit = { + val now = System.nanoTime() val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId) liveTasks.put(event.taskInfo.taskId, task) - liveUpdate(task) + liveUpdate(task, now) liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage => stage.activeTasks += 1 stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime) - maybeUpdate(stage) + maybeUpdate(stage, now) stage.jobs.foreach { job => job.activeTasks += 1 - maybeUpdate(job) + maybeUpdate(job, now) } } liveExecutors.get(event.taskInfo.executorId).foreach { exec => exec.activeTasks += 1 exec.totalTasks += 1 - maybeUpdate(exec) + maybeUpdate(exec, now) } } @@ -267,7 +273,7 @@ private[spark] class AppStatusListener( // Call update on the task so that the "getting result" time is written to the store; the // value is part of the mutable TaskInfo state that the live entity already references. liveTasks.get(event.taskInfo.taskId).foreach { task => - maybeUpdate(task) + maybeUpdate(task, System.nanoTime()) } } @@ -277,6 +283,8 @@ private[spark] class AppStatusListener( return } + val now = System.nanoTime() + val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task => val errorMessage = event.reason match { case Success => @@ -293,7 +301,7 @@ private[spark] class AppStatusListener( } task.errorMessage = errorMessage val delta = task.updateMetrics(event.taskMetrics) - update(task) + update(task, now) delta }.orNull @@ -311,13 +319,13 @@ private[spark] class AppStatusListener( stage.activeTasks -= 1 stage.completedTasks += completedDelta stage.failedTasks += failedDelta - maybeUpdate(stage) + maybeUpdate(stage, now) stage.jobs.foreach { job => job.activeTasks -= 1 job.completedTasks += completedDelta job.failedTasks += failedDelta - maybeUpdate(job) + maybeUpdate(job, now) } val esummary = stage.executorSummary(event.taskInfo.executorId) @@ -327,7 +335,7 @@ private[spark] class AppStatusListener( if (metricsDelta != null) { esummary.metrics.update(metricsDelta) } - maybeUpdate(esummary) + maybeUpdate(esummary, now) } liveExecutors.get(event.taskInfo.executorId).foreach { exec => @@ -343,12 +351,13 @@ private[spark] class AppStatusListener( exec.completedTasks += completedDelta exec.failedTasks += failedDelta exec.totalDuration += event.taskInfo.duration - maybeUpdate(exec) + maybeUpdate(exec, now) } } override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptId)).foreach { stage => + val now = System.nanoTime() stage.info = event.stageInfo // Because of SPARK-20205, old event logs may contain valid stages without a submission time @@ -371,11 +380,11 @@ private[spark] class AppStatusListener( job.failedStages += 1 } job.activeStages -= 1 - liveUpdate(job) + liveUpdate(job, now) } - stage.executorSummaries.values.foreach(update) - update(stage) + stage.executorSummaries.values.foreach(update(_, now)) + update(stage, now) } } @@ -390,7 +399,7 @@ private[spark] class AppStatusListener( } exec.isActive = true exec.maxMemory = event.maxMem - liveUpdate(exec) + liveUpdate(exec, System.nanoTime()) } override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { @@ -403,19 +412,21 @@ private[spark] class AppStatusListener( } override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + val now = System.nanoTime() + event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) => liveTasks.get(taskId).foreach { task => val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) val delta = task.updateMetrics(metrics) - maybeUpdate(task) + maybeUpdate(task, now) liveStages.get((sid, sAttempt)).foreach { stage => stage.metrics.update(delta) - maybeUpdate(stage) + maybeUpdate(stage, now) val esummary = stage.executorSummary(event.execId) esummary.metrics.update(delta) - maybeUpdate(esummary) + maybeUpdate(esummary, now) } } } @@ -430,14 +441,16 @@ private[spark] class AppStatusListener( /** Flush all live entities' data to the underlying store. */ def flush(): Unit = { - liveStages.values.foreach(update) - liveJobs.values.foreach(update) - liveExecutors.values.foreach(update) - liveTasks.values.foreach(update) - liveRDDs.values.foreach(update) + val now = System.nanoTime() + liveStages.values.foreach(update(_, now)) + liveJobs.values.foreach(update(_, now)) + liveExecutors.values.foreach(update(_, now)) + liveTasks.values.foreach(update(_, now)) + liveRDDs.values.foreach(update(_, now)) } private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { + val now = System.nanoTime() val executorId = event.blockUpdatedInfo.blockManagerId.executorId // Whether values are being added to or removed from the existing accounting. @@ -512,7 +525,7 @@ private[spark] class AppStatusListener( } rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta) rdd.diskUsed = newValue(rdd.diskUsed, diskDelta) - update(rdd) + update(rdd, now) } maybeExec.foreach { exec => @@ -526,7 +539,7 @@ private[spark] class AppStatusListener( exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta) exec.diskUsed = newValue(exec.diskUsed, diskDelta) exec.rddBlocks += rddBlocksDelta - maybeUpdate(exec) + maybeUpdate(exec, now) } } @@ -540,24 +553,21 @@ private[spark] class AppStatusListener( stage } - private def update(entity: LiveEntity): Unit = { - entity.write(kvstore) + private def update(entity: LiveEntity, now: Long): Unit = { + entity.write(kvstore, now) } /** Update a live entity only if it hasn't been updated in the last configured period. */ - private def maybeUpdate(entity: LiveEntity): Unit = { - if (liveUpdatePeriodNs >= 0) { - val now = System.nanoTime() - if (now - entity.lastWriteTime > liveUpdatePeriodNs) { - update(entity) - } + private def maybeUpdate(entity: LiveEntity, now: Long): Unit = { + if (liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) { + update(entity, now) } } /** Update an entity only if in a live app; avoids redundant writes when replaying logs. */ - private def liveUpdate(entity: LiveEntity): Unit = { + private def liveUpdate(entity: LiveEntity, now: Long): Unit = { if (live) { - update(entity) + update(entity, now) } } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 337ef0b3e6c2b..041dfe1ef915e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -39,9 +39,9 @@ private[spark] abstract class LiveEntity { var lastWriteTime = 0L - def write(store: KVStore): Unit = { + def write(store: KVStore, now: Long): Unit = { store.write(doUpdate()) - lastWriteTime = System.nanoTime() + lastWriteTime = now } /** From 537c7b4bf27e1392fd4868db688f7d2f48baa3a5 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 3 Nov 2017 16:43:09 -0700 Subject: [PATCH 3/3] Style, more comments. --- .../deploy/history/ApplicationCache.scala | 49 +++---------------- .../history/ApplicationHistoryProvider.scala | 21 ++++++-- .../deploy/history/FsHistoryProvider.scala | 19 +++---- .../org/apache/spark/status/storeTypes.scala | 3 +- 4 files changed, 34 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala index 60c24cb0a38bd..8c63fa65b40fd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -23,7 +23,6 @@ import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, Servlet import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.JavaConverters._ -import scala.util.control.NonFatal import com.codahale.metrics.{Counter, MetricRegistry, Timer} import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} @@ -36,13 +35,11 @@ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Clock /** - * Cache for applications. + * Cache for application UIs. * - * Completed applications are cached for as long as there is capacity for them. - * Incompleted applications have their update time checked on every - * retrieval; if the cached entry is out of date, it is refreshed. + * Applications are cached for as long as there is capacity for them. See [[LoadedAppUI]] for a + * discussion of the UI lifecycle. * - * Creating multiple instances will break this routing. * @param operations implementation of record access operations * @param retainedApplications number of retained applications * @param clock time source @@ -52,9 +49,6 @@ private[history] class ApplicationCache( val retainedApplications: Int, val clock: Clock) extends Logging { - /** - * Services the load request from the cache. - */ private val appLoader = new CacheLoader[CacheKey, CacheEntry] { /** the cache key doesn't match a cached entry, or the entry is out-of-date, so load it. */ @@ -64,9 +58,6 @@ private[history] class ApplicationCache( } - /** - * Handler for callbacks from the cache of entry removal. - */ private val removalListener = new RemovalListener[CacheKey, CacheEntry] { /** @@ -81,11 +72,6 @@ private[history] class ApplicationCache( } } - /** - * The cache of applications. - * - * Tagged as `protected` so as to allow subclasses in tests to access it directly - */ private val appCache: LoadingCache[CacheKey, CacheEntry] = { CacheBuilder.newBuilder() .maximumSize(retainedApplications) @@ -122,7 +108,7 @@ private[history] class ApplicationCache( entry.loadedUI.lock.readLock().unlock() entry = null try { - appCache.invalidate(new CacheKey(appId, attemptId)) + invalidate(new CacheKey(appId, attemptId)) entry = get(appId, attemptId) metrics.loadCount.inc() } finally { @@ -140,25 +126,9 @@ private[history] class ApplicationCache( } } - /** - * Size probe, primarily for testing. - * @return size - */ + /** @return Number of cached UIs. */ def size(): Long = appCache.size() - /** - * Emptiness predicate, primarily for testing. - * @return true if the cache is empty - */ - def isEmpty: Boolean = appCache.size() == 0 - - /** - * Time a closure, returning its output. - * @param t timer - * @param f function - * @tparam T type of return value of time - * @return the result of the function. - */ private def time[T](t: Timer)(f: => T): T = { val timeCtx = t.time() try { @@ -206,7 +176,7 @@ private[history] class ApplicationCache( val completed = loadedUI.ui.getApplicationInfoList.exists(_.attempts.last.completed) if (!completed) { // incomplete UIs have the cache-check filter put in front of them. - registerFilter(new CacheKey(appId, attemptId), loadedUI, this) + registerFilter(new CacheKey(appId, attemptId), loadedUI) } operations.attachSparkUI(appId, attemptId, loadedUI.ui, completed) new CacheEntry(loadedUI, completed) @@ -243,10 +213,10 @@ private[history] class ApplicationCache( * @param appId application ID * @param attemptId attempt ID */ - def registerFilter(key: CacheKey, loadedUI: LoadedAppUI, cache: ApplicationCache): Unit = { + private def registerFilter(key: CacheKey, loadedUI: LoadedAppUI): Unit = { require(loadedUI != null) val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) - val filter = new ApplicationCacheCheckFilter(key, loadedUI, cache) + val filter = new ApplicationCacheCheckFilter(key, loadedUI, this) val holder = new FilterHolder(filter) require(loadedUI.ui.getHandlers != null, "null handlers") loadedUI.ui.getHandlers.foreach { handler => @@ -264,9 +234,6 @@ private[history] class ApplicationCache( * @param ui Spark UI * @param completed Flag to indicated that the application has completed (and so * does not need refreshing). - * @param updateProbe function to call to see if the application has been updated and - * therefore that the cached value needs to be refreshed. - * @param probeTime Times in milliseconds when the probe was last executed. */ private[history] final class CacheEntry( val loadedUI: LoadedAppUI, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 96a80c9a6665c..38f0d6f2afa5e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -49,10 +49,25 @@ private[spark] case class ApplicationHistoryInfo( } /** - * All the information returned from a call to `getAppUI()`: the new UI - * and any required update state. + * A loaded UI for a Spark application. + * + * Loaded UIs are valid once created, and can be invalidated once the history provider detects + * changes in the underlying app data (e.g. an updated event log). Invalidating a UI does not + * unload it; it just signals the [[ApplicationCache]] that the UI should not be used to serve + * new requests. + * + * Reloading of the UI with new data requires collaboration between the cache and the provider; + * the provider invalidates the UI when it detects updated information, and the cache invalidates + * the cache entry when it detects the UI has been invalidated. That will trigger a callback + * on the provider to finally clean up any UI state. The cache should hold read locks when + * using the UI, and the provider should grab the UI's write lock before making destructive + * operations. + * + * Note that all this means that an invalidated UI will still stay in-memory, and any resources it + * references will remain open, until the cache either sees that it's invalidated, or evicts it to + * make room for another UI. + * * @param ui Spark UI - * @param updateProbe probe to call to check on the update state of this application attempt */ private[history] case class LoadedAppUI(ui: SparkUI) { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5d595325a23ad..f16dddea9f784 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -61,9 +61,6 @@ import org.apache.spark.util.kvstore._ * and update or create a matching application info element in the list of applications. * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the * attempt is replaced by another one with a larger log size. - * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]] - * instance is out of date, the log size of the cached instance is checked against the app last - * loaded by [[checkForLogs]]. * * The use of log size, rather than simply relying on modification times, is needed to * address the following issues @@ -581,10 +578,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) listener.applicationInfo.foreach { app => - // Invalidate the existing UI for the reloaded app attempt, if any. Note that this does - // not remove the UI from the active list; that has to be done in onUIDetached, so that - // cleanup of files can be done in a thread-safe manner. It does mean the UI will remain - // in memory for longer than it should. + // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a + // discussion on the UI lifecycle. synchronized { activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui => ui.invalidate() @@ -796,13 +791,13 @@ private[history] object FsHistoryProvider { } private[history] case class FsHistoryProviderMetadata( - version: Long, - uiVersion: Long, - logDir: String) + version: Long, + uiVersion: Long, + logDir: String) private[history] case class LogInfo( - @KVIndexParam logPath: String, - fileSize: Long) + @KVIndexParam logPath: String, + fileSize: Long) private[history] class AttemptInfoWrapper( val info: v1.ApplicationAttemptInfo, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 340d5994a0012..a445435809f3a 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -25,8 +25,7 @@ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1._ import org.apache.spark.util.kvstore.KVIndex -private[spark] case class AppStatusStoreMetadata( - val version: Long) +private[spark] case class AppStatusStoreMetadata(version: Long) private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) {