From 772e4648d95bda3353723337723543c741ea8476 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 18 Dec 2017 14:08:48 -0600 Subject: [PATCH] [SPARK-20653][CORE] Add cleaning of old elements from the status store. This change restores the functionality that keeps a limited number of different types (jobs, stages, etc) depending on configuration, to avoid the store growing indefinitely over time. The feature is implemented by creating a new type (ElementTrackingStore) that wraps a KVStore and allows triggers to be set up for when elements of a certain type meet a certain threshold. Triggers don't need to necessarily only delete elements, but the current API is set up in a way that makes that use case easier. The new store also has a trigger for the "close" call, which makes it easier for listeners to register code for cleaning things up and flushing partial state to the store. The old configurations for cleaning up the stored elements from the core and SQL UIs are now active again, and the old unit tests are re-enabled. Author: Marcelo Vanzin Closes #19751 from vanzin/SPARK-20653. --- .../deploy/history/FsHistoryProvider.scala | 17 +- .../spark/internal/config/package.scala | 5 - .../spark/status/AppStatusListener.scala | 188 ++++++++++++++++-- .../apache/spark/status/AppStatusPlugin.scala | 2 +- .../apache/spark/status/AppStatusStore.scala | 6 +- .../spark/status/ElementTrackingStore.scala | 160 +++++++++++++++ .../org/apache/spark/status/KVUtils.scala | 14 ++ .../org/apache/spark/status/LiveEntity.scala | 13 +- .../spark/status/api/v1/StagesResource.scala | 10 +- .../org/apache/spark/status/config.scala | 20 ++ .../org/apache/spark/status/storeTypes.scala | 16 ++ .../scala/org/apache/spark/ui/SparkUI.scala | 2 - .../apache/spark/ui/jobs/AllJobsPage.scala | 8 +- .../apache/spark/ui/jobs/AllStagesPage.scala | 8 +- .../deploy/history/HistoryServerSuite.scala | 2 +- .../spark/status/AppStatusListenerSuite.scala | 148 +++++++++++--- .../status/ElementTrackingStoreSuite.scala | 91 +++++++++ .../org/apache/spark/ui/UISeleniumSuite.scala | 8 +- .../spark/sql/internal/StaticSQLConf.scala | 7 + .../execution/ui/SQLAppStatusListener.scala | 33 ++- .../sql/execution/ui/SQLAppStatusStore.scala | 7 +- .../sql/execution/ui/SQLListenerSuite.scala | 29 +-- 22 files changed, 713 insertions(+), 81 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala create mode 100644 core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a8e1becc56ab7..fa2c5194aa41b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -44,6 +44,7 @@ import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} +import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.kvstore._ @@ -304,6 +305,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val (kvstore, needReplay) = uiStorePath match { case Some(path) => try { + // The store path is not guaranteed to exist - maybe it hasn't been created, or was + // invalidated because changes to the event log were detected. Need to replay in that + // case. val _replay = !path.isDirectory() (createDiskStore(path, conf), _replay) } catch { @@ -318,24 +322,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) (new InMemoryStore(), true) } + val trackingStore = new ElementTrackingStore(kvstore, conf) if (needReplay) { val replayBus = new ReplayListenerBus() - val listener = new AppStatusListener(kvstore, conf, false, + val listener = new AppStatusListener(trackingStore, conf, false, lastUpdateTime = Some(attempt.info.lastUpdated.getTime())) replayBus.addListener(listener) AppStatusPlugin.loadPlugins().foreach { plugin => - plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false) + plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false) } try { val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - listener.flush() + trackingStore.close(false) } catch { case e: Exception => - try { - kvstore.close() - } catch { - case _e: Exception => logInfo("Error closing store.", _e) + Utils.tryLogNonFatalError { + trackingStore.close() } uiStorePath.foreach(Utils.deleteRecursively) if (e.isInstanceOf[FileNotFoundException]) { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 172ba85359da7..eb12ddf961314 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -240,11 +240,6 @@ package object config { .stringConf .createOptional - // To limit memory usage, we only track information for a fixed number of tasks - private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks") - .intConf - .createWithDefault(100000) - // To limit how many applications are shown in the History Server summary ui private[spark] val HISTORY_UI_MAX_APPS = ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 6da44cbc44c4d..1fb7b76d43d04 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -32,7 +32,6 @@ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ -import org.apache.spark.util.kvstore.KVStore /** * A Spark listener that writes application information to a data store. The types written to the @@ -42,7 +41,7 @@ import org.apache.spark.util.kvstore.KVStore * unfinished tasks can be more accurately calculated (see SPARK-21922). */ private[spark] class AppStatusListener( - kvstore: KVStore, + kvstore: ElementTrackingStore, conf: SparkConf, live: Boolean, lastUpdateTime: Option[Long] = None) extends SparkListener with Logging { @@ -51,6 +50,7 @@ private[spark] class AppStatusListener( private var sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null + private var appSummary = new AppSummary(0, 0) private var coresPerTask: Int = 1 // How often to update live entities. -1 means "never update" when replaying applications, @@ -58,6 +58,7 @@ private[spark] class AppStatusListener( // operations that we can live without when rapidly processing incoming task events. private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE) private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES) // Keep track of live entities, so that task metrics can be efficiently updated (without @@ -68,10 +69,25 @@ private[spark] class AppStatusListener( private val liveTasks = new HashMap[Long, LiveTask]() private val liveRDDs = new HashMap[Int, LiveRDD]() private val pools = new HashMap[String, SchedulerPool]() + // Keep the active executor count as a separate variable to avoid having to do synchronization + // around liveExecutors. + @volatile private var activeExecutorCount = 0 - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerLogStart(version) => sparkVersion = version - case _ => + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) + { count => cleanupExecutors(count) } + + kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count => + cleanupJobs(count) + } + + kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count => + cleanupStages(count) + } + + kvstore.onFlush { + if (!live) { + flush() + } } override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { @@ -97,6 +113,7 @@ private[spark] class AppStatusListener( Seq(attempt)) kvstore.write(new ApplicationInfoWrapper(appInfo)) + kvstore.write(appSummary) } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { @@ -158,10 +175,11 @@ private[spark] class AppStatusListener( override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { liveExecutors.remove(event.executorId).foreach { exec => val now = System.nanoTime() + activeExecutorCount = math.max(0, activeExecutorCount - 1) exec.isActive = false exec.removeTime = new Date(event.time) exec.removeReason = event.reason - update(exec, now) + update(exec, now, last = true) // Remove all RDD distributions that reference the removed executor, in case there wasn't // a corresponding event. @@ -290,8 +308,11 @@ private[spark] class AppStatusListener( } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None - update(job, now) + update(job, now, last = true) } + + appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) + kvstore.write(appSummary) } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { @@ -350,6 +371,13 @@ private[spark] class AppStatusListener( job.activeTasks += 1 maybeUpdate(job, now) } + + if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) { + stage.cleaning = true + kvstore.doAsync { + cleanupTasks(stage) + } + } } liveExecutors.get(event.taskInfo.executorId).foreach { exec => @@ -449,6 +477,13 @@ private[spark] class AppStatusListener( esummary.metrics.update(metricsDelta) } maybeUpdate(esummary, now) + + if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) { + stage.cleaning = true + kvstore.doAsync { + cleanupTasks(stage) + } + } } liveExecutors.get(event.taskInfo.executorId).foreach { exec => @@ -516,8 +551,11 @@ private[spark] class AppStatusListener( } stage.executorSummaries.values.foreach(update(_, now)) - update(stage, now) + update(stage, now, last = true) } + + appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) + kvstore.write(appSummary) } override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { @@ -573,7 +611,7 @@ private[spark] class AppStatusListener( } /** Flush all live entities' data to the underlying store. */ - def flush(): Unit = { + private def flush(): Unit = { val now = System.nanoTime() liveStages.values.asScala.foreach { stage => update(stage, now) @@ -708,7 +746,10 @@ private[spark] class AppStatusListener( } private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = { - liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId, addTime)) + liveExecutors.getOrElseUpdate(executorId, { + activeExecutorCount += 1 + new LiveExecutor(executorId, addTime) + }) } private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = { @@ -754,8 +795,8 @@ private[spark] class AppStatusListener( } } - private def update(entity: LiveEntity, now: Long): Unit = { - entity.write(kvstore, now) + private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { + entity.write(kvstore, now, checkTriggers = last) } /** Update a live entity only if it hasn't been updated in the last configured period. */ @@ -772,4 +813,127 @@ private[spark] class AppStatusListener( } } + private def cleanupExecutors(count: Long): Unit = { + // Because the limit is on the number of *dead* executors, we need to calculate whether + // there are actually enough dead executors to be deleted. + val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS) + val dead = count - activeExecutorCount + + if (dead > threshold) { + val countToDelete = calculateNumberToRemove(dead, threshold) + val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active") + .max(countToDelete).first(false).last(false).asScala.toSeq + toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) } + } + } + + private def cleanupJobs(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS)) + if (countToDelete <= 0L) { + return + } + + val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]), + countToDelete.toInt) { j => + j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN + } + toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } + } + + private def cleanupStages(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) + if (countToDelete <= 0L) { + return + } + + val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]), + countToDelete.toInt) { s => + s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING + } + + stages.foreach { s => + val key = s.id + kvstore.delete(s.getClass(), key) + + val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper]) + .index("stage") + .first(key) + .last(key) + .asScala + .toSeq + execSummaries.foreach { e => + kvstore.delete(e.getClass(), e.id) + } + + val tasks = kvstore.view(classOf[TaskDataWrapper]) + .index("stage") + .first(key) + .last(key) + .asScala + + tasks.foreach { t => + kvstore.delete(t.getClass(), t.info.taskId) + } + + // Check whether there are remaining attempts for the same stage. If there aren't, then + // also delete the RDD graph data. + val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) + .index("stageId") + .first(s.stageId) + .last(s.stageId) + .closeableIterator() + + val hasMoreAttempts = try { + remainingAttempts.asScala.exists { other => + other.info.attemptId != s.info.attemptId + } + } finally { + remainingAttempts.close() + } + + if (!hasMoreAttempts) { + kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId) + } + } + } + + private def cleanupTasks(stage: LiveStage): Unit = { + val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt + if (countToDelete > 0) { + val stageKey = Array(stage.info.stageId, stage.info.attemptId) + val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey) + .last(stageKey) + + // Try to delete finished tasks only. + val toDelete = KVUtils.viewToSeq(view, countToDelete) { t => + !live || t.info.status != TaskState.RUNNING.toString() + } + toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) } + stage.savedTasks.addAndGet(-toDelete.size) + + // If there are more running tasks than the configured limit, delete running tasks. This + // should be extremely rare since the limit should generally far exceed the number of tasks + // that can run in parallel. + val remaining = countToDelete - toDelete.size + if (remaining > 0) { + val runningTasksToDelete = view.max(remaining).iterator().asScala.toList + runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) } + stage.savedTasks.addAndGet(-remaining) + } + } + stage.cleaning = false + } + + /** + * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done + * asynchronously, this method may return 0 in case enough items have been deleted already. + */ + private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = { + if (dataSize > retainedSize) { + math.max(retainedSize / 10L, dataSize - retainedSize) + } else { + 0L + } + } + } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala index 69ca02ec76293..4cada5c7b0de4 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala @@ -48,7 +48,7 @@ private[spark] trait AppStatusPlugin { */ def setupListeners( conf: SparkConf, - store: KVStore, + store: ElementTrackingStore, addListenerFn: SparkListener => Unit, live: Boolean): Unit diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 22d768b3cb990..9987419b170f6 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -330,6 +330,10 @@ private[spark] class AppStatusStore( store.read(classOf[PoolData], name) } + def appSummary(): AppSummary = { + store.read(classOf[AppSummary], classOf[AppSummary].getName()) + } + def close(): Unit = { store.close() } @@ -347,7 +351,7 @@ private[spark] object AppStatusStore { * @param addListenerFn Function to register a listener with a bus. */ def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = { - val store = new InMemoryStore() + val store = new ElementTrackingStore(new InMemoryStore(), conf) val listener = new AppStatusListener(store, conf, true) addListenerFn(listener) AppStatusPlugin.loadPlugins().foreach { p => diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala new file mode 100644 index 0000000000000..863b0967f765e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import com.google.common.util.concurrent.MoreExecutors + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on a separate thread by default; they can be forced to run on + * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker") + } else { + MoreExecutors.sameThreadExecutor() + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { + val existing = triggers.getOrElse(klass, Seq()) + triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { + flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. The task will run on the calling thread if + * `ASYNC_TRACKING_ENABLED` is `false`. + */ + def doAsync(fn: => Unit): Unit = { + executor.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } + }) + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally checking for whether to fire triggers. */ + def write(value: Any, checkTriggers: Boolean): Unit = { + write(value) + + if (checkTriggers && !stopped) { + triggers.get(value.getClass()).foreach { list => + doAsync { + val count = store.count(value.getClass()) + list.foreach { t => + if (count > t.threshold) { + t.action(count) + } + } + } + } + } + } + + override def delete(klass: Class[_], naturalKey: Any): Unit = store.delete(klass, naturalKey) + + override def getMetadata[T](klass: Class[T]): T = store.getMetadata(klass) + + override def setMetadata(value: Any): Unit = store.setMetadata(value) + + override def view[T](klass: Class[T]): KVStoreView[T] = store.view(klass) + + override def count(klass: Class[_]): Long = store.count(klass) + + override def count(klass: Class[_], index: String, indexedValue: Any): Long = { + store.count(klass, index, indexedValue) + } + + override def close(): Unit = { + close(true) + } + + /** A close() method that optionally leaves the parent store open. */ + def close(closeParent: Boolean): Unit = synchronized { + if (stopped) { + return + } + + stopped = true + executor.shutdown() + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow() + } + + flushTriggers.foreach { trigger => + Utils.tryLog(trigger()) + } + + if (closeParent) { + store.close() + } + } + + private case class Trigger[T]( + threshold: Long, + action: Long => Unit) + +} diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index 4638511944c61..99b1843d8e1c0 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.status import java.io.File import scala.annotation.meta.getter +import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} @@ -68,6 +69,19 @@ private[spark] object KVUtils extends Logging { db } + /** Turns a KVStoreView into a Scala sequence, applying a filter. */ + def viewToSeq[T]( + view: KVStoreView[T], + max: Int) + (filter: T => Boolean): Seq[T] = { + val iter = view.closeableIterator() + try { + iter.asScala.filter(filter).take(max).toList + } finally { + iter.close() + } + } + private[spark] class MetadataMismatchException extends Exception } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 983c58a607aa8..52e83f250d34e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -18,6 +18,7 @@ package org.apache.spark.status import java.util.Date +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.HashMap @@ -38,10 +39,12 @@ import org.apache.spark.util.kvstore.KVStore */ private[spark] abstract class LiveEntity { - var lastWriteTime = 0L + var lastWriteTime = -1L - def write(store: KVStore, now: Long): Unit = { - store.write(doUpdate()) + def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { + // Always check triggers on the first write, since adding an element to the store may + // cause the maximum count for the element type to be exceeded. + store.write(doUpdate(), checkTriggers || lastWriteTime == -1L) lastWriteTime = now } @@ -403,6 +406,10 @@ private class LiveStage extends LiveEntity { val executorSummaries = new HashMap[String, LiveExecutorStageSummary]() + // Used for cleanup of tasks after they reach the configured limit. Not written to the store. + @volatile var cleaning = false + var savedTasks = new AtomicInteger(0) + def executorSummary(executorId: String): LiveExecutorStageSummary = { executorSummaries.getOrElseUpdate(executorId, new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId)) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index b3561109bc636..3b879545b3d2e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -59,7 +59,15 @@ private[v1] class StagesResource extends BaseAppResource { ui.store.stageAttempt(stageId, stageAttemptId, details = details) } catch { case _: NoSuchElementException => - throw new NotFoundException(s"unknown attempt $stageAttemptId for stage $stageId.") + // Change the message depending on whether there are any attempts for the requested stage. + val all = ui.store.stageData(stageId) + val msg = if (all.nonEmpty) { + val ids = all.map(_.attemptId) + s"unknown attempt for stage $stageId. Found attempts: [${ids.mkString(",")}]" + } else { + s"unknown stage: $stageId" + } + throw new NotFoundException(msg) } } diff --git a/core/src/main/scala/org/apache/spark/status/config.scala b/core/src/main/scala/org/apache/spark/status/config.scala index 7af9dff977a86..67801b8f046f4 100644 --- a/core/src/main/scala/org/apache/spark/status/config.scala +++ b/core/src/main/scala/org/apache/spark/status/config.scala @@ -23,10 +23,30 @@ import org.apache.spark.internal.config._ private[spark] object config { + val ASYNC_TRACKING_ENABLED = ConfigBuilder("spark.appStateStore.asyncTracking.enable") + .booleanConf + .createWithDefault(true) + val LIVE_ENTITY_UPDATE_PERIOD = ConfigBuilder("spark.ui.liveUpdate.period") .timeConf(TimeUnit.NANOSECONDS) .createWithDefaultString("100ms") + val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs") + .intConf + .createWithDefault(1000) + + val MAX_RETAINED_STAGES = ConfigBuilder("spark.ui.retainedStages") + .intConf + .createWithDefault(1000) + + val MAX_RETAINED_TASKS_PER_STAGE = ConfigBuilder("spark.ui.retainedTasks") + .intConf + .createWithDefault(100000) + + val MAX_RETAINED_DEAD_EXECUTORS = ConfigBuilder("spark.ui.retainedDeadExecutors") + .intConf + .createWithDefault(100) + val MAX_RETAINED_ROOT_NODES = ConfigBuilder("spark.ui.dagGraph.retainedRootRDDs") .intConf .createWithDefault(Int.MaxValue) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index c1ea87542d6cc..d9ead0071d3bf 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -112,6 +112,9 @@ private[spark] class TaskDataWrapper( Array(stageId: JInteger, stageAttemptId: JInteger, info.launchTime.getTime(): JLong) } + @JsonIgnore @KVIndex("active") + def active: Boolean = info.duration.isEmpty + } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { @@ -187,3 +190,16 @@ private[spark] class RDDOperationGraphWrapper( private[spark] class PoolData( @KVIndexParam val name: String, val stageIds: Set[Int]) + +/** + * A class with information about an app, to be used by the UI. There's only one instance of + * this summary per application, so its ID in the store is the class name. + */ +private[spark] class AppSummary( + val numCompletedJobs: Int, + val numCompletedStages: Int) { + + @KVIndex + def id: String = classOf[AppSummary].getName() + +} diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 35da3c3bfd1a2..b44ac0ea1febc 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -154,8 +154,6 @@ private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" val DEFAULT_POOL_NAME = "default" - val DEFAULT_RETAINED_STAGES = 1000 - val DEFAULT_RETAINED_JOBS = 1000 def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index b60d39b21b4bf..37e3b3b304a63 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -300,7 +300,13 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We val shouldShowCompletedJobs = completedJobs.nonEmpty val shouldShowFailedJobs = failedJobs.nonEmpty - val completedJobNumStr = s"${completedJobs.size}" + val appSummary = store.appSummary() + val completedJobNumStr = if (completedJobs.size == appSummary.numCompletedJobs) { + s"${completedJobs.size}" + } else { + s"${appSummary.numCompletedJobs}, only showing ${completedJobs.size}" + } + val schedulingMode = store.environmentInfo().sparkProperties.toMap .get("spark.scheduler.mode") .map { mode => SchedulingMode.withName(mode).toString } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index e4cf99e7b9e04..b1e343451e28e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -39,7 +39,6 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val completedStages = allStages.filter(_.status == StageStatus.COMPLETE) val failedStages = allStages.filter(_.status == StageStatus.FAILED).reverse - val numCompletedStages = completedStages.size val numFailedStages = failedStages.size val subPath = "stages" @@ -69,10 +68,11 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val shouldShowCompletedStages = completedStages.nonEmpty val shouldShowFailedStages = failedStages.nonEmpty - val completedStageNumStr = if (numCompletedStages == completedStages.size) { - s"$numCompletedStages" + val appSummary = parent.store.appSummary() + val completedStageNumStr = if (appSummary.numCompletedStages == completedStages.size) { + s"${appSummary.numCompletedStages}" } else { - s"$numCompletedStages, only showing ${completedStages.size}" + s"${appSummary.numCompletedStages}, only showing ${completedStages.size}" } val summary: NodeSeq = diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 3a9790cd57270..3738f85da5831 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -264,7 +264,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val badStageAttemptId = getContentAndCode("applications/local-1422981780767/stages/1/1") badStageAttemptId._1 should be (HttpServletResponse.SC_NOT_FOUND) - badStageAttemptId._3 should be (Some("unknown attempt 1 for stage 1.")) + badStageAttemptId._3 should be (Some("unknown attempt for stage 1. Found attempts: [0]")) val badStageId2 = getContentAndCode("applications/local-1422981780767/stages/flimflam") badStageId2._1 should be (HttpServletResponse.SC_NOT_FOUND) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 88fe6bd70a14e..9cf4f7efb24a8 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -39,16 +39,20 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { import config._ - private val conf = new SparkConf().set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + private val conf = new SparkConf() + .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + .set(ASYNC_TRACKING_ENABLED, false) private var time: Long = _ private var testDir: File = _ - private var store: KVStore = _ + private var store: ElementTrackingStore = _ + private var taskIdTracker = -1L before { time = 0L testDir = Utils.createTempDir() - store = KVUtils.open(testDir, getClass().getName()) + store = new ElementTrackingStore(KVUtils.open(testDir, getClass().getName()), conf) + taskIdTracker = -1L } after { @@ -185,22 +189,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Start tasks from stage 1 time += 1 - var _taskIdTracker = -1L - def nextTaskId(): Long = { - _taskIdTracker += 1 - _taskIdTracker - } - - def createTasks(count: Int, time: Long): Seq[TaskInfo] = { - (1 to count).map { id => - val exec = execIds(id.toInt % execIds.length) - val taskId = nextTaskId() - new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com", - TaskLocality.PROCESS_LOCAL, id % 2 == 0) - } - } - val s1Tasks = createTasks(4, time) + val s1Tasks = createTasks(4, execIds) s1Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task)) } @@ -419,7 +409,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Start and fail all tasks of stage 2. time += 1 - val s2Tasks = createTasks(4, time) + val s2Tasks = createTasks(4, execIds) s2Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task)) } @@ -470,7 +460,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps)) assert(store.count(classOf[StageDataWrapper]) === 3) - val newS2Tasks = createTasks(4, time) + val newS2Tasks = createTasks(4, execIds) newS2Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task)) @@ -526,7 +516,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(store.count(classOf[StageDataWrapper]) === 5) time += 1 - val j2s2Tasks = createTasks(4, time) + val j2s2Tasks = createTasks(4, execIds) j2s2Tasks.foreach { task => listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, j2Stages.last.attemptId, @@ -587,8 +577,9 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } // Stop executors. - listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "1", "Test")) - listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "2", "Test")) + time += 1 + listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "1", "Test")) + listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "2", "Test")) Seq("1", "2").foreach { id => check[ExecutorSummaryWrapper](id) { exec => @@ -851,6 +842,103 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("eviction of old data") { + val testConf = conf.clone() + .set(MAX_RETAINED_JOBS, 2) + .set(MAX_RETAINED_STAGES, 2) + .set(MAX_RETAINED_TASKS_PER_STAGE, 2) + .set(MAX_RETAINED_DEAD_EXECUTORS, 1) + val listener = new AppStatusListener(store, testConf, true) + + // Start 3 jobs, all should be kept. Stop one, it should be evicted. + time += 1 + listener.onJobStart(SparkListenerJobStart(1, time, Nil, null)) + listener.onJobStart(SparkListenerJobStart(2, time, Nil, null)) + listener.onJobStart(SparkListenerJobStart(3, time, Nil, null)) + assert(store.count(classOf[JobDataWrapper]) === 3) + + time += 1 + listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) + assert(store.count(classOf[JobDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[JobDataWrapper], 2) + } + + // Start 3 stages, all should be kept. Stop 2 of them, the stopped one with the lowest id should + // be deleted. Start a new attempt of the second stopped one, and verify that the stage graph + // data is not deleted. + time += 1 + val stages = Seq( + new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"), + new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2"), + new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")) + + // Graph data is generated by the job start event, so fire it. + listener.onJobStart(SparkListenerJobStart(4, time, stages, null)) + + stages.foreach { s => + time += 1 + s.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(s, new Properties())) + } + + assert(store.count(classOf[StageDataWrapper]) === 3) + assert(store.count(classOf[RDDOperationGraphWrapper]) === 3) + + stages.drop(1).foreach { s => + time += 1 + s.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(s)) + } + + assert(store.count(classOf[StageDataWrapper]) === 2) + assert(store.count(classOf[RDDOperationGraphWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[StageDataWrapper], Array(2, 0)) + } + + val attempt2 = new StageInfo(3, 1, "stage3", 4, Nil, Nil, "details3") + time += 1 + attempt2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(attempt2, new Properties())) + + assert(store.count(classOf[StageDataWrapper]) === 2) + assert(store.count(classOf[RDDOperationGraphWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[StageDataWrapper], Array(2, 0)) + } + intercept[NoSuchElementException] { + store.read(classOf[StageDataWrapper], Array(3, 0)) + } + store.read(classOf[StageDataWrapper], Array(3, 1)) + + // Start 2 tasks. Finish the second one. + time += 1 + val tasks = createTasks(2, Array("1")) + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task)) + } + assert(store.count(classOf[TaskDataWrapper]) === 2) + + // Start a 3rd task. The finished tasks should be deleted. + createTasks(1, Array("1")).foreach { task => + listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task)) + } + assert(store.count(classOf[TaskDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[TaskDataWrapper], tasks.last.id) + } + + // Start a 4th task. The first task should be deleted, even if it's still running. + createTasks(1, Array("1")).foreach { task => + listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task)) + } + assert(store.count(classOf[TaskDataWrapper]) === 2) + intercept[NoSuchElementException] { + store.read(classOf[TaskDataWrapper], tasks.head.id) + } + } + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = { @@ -864,6 +952,20 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) } + private def createTasks(count: Int, execs: Array[String]): Seq[TaskInfo] = { + (1 to count).map { id => + val exec = execs(id.toInt % execs.length) + val taskId = nextTaskId() + new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, id % 2 == 0) + } + } + + private def nextTaskId(): Long = { + taskIdTracker += 1 + taskIdTracker + } + private case class RddBlock( rddId: Int, partId: Int, diff --git a/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala new file mode 100644 index 0000000000000..07a7b58404c29 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import org.mockito.Mockito._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.util.kvstore._ + +class ElementTrackingStoreSuite extends SparkFunSuite { + + import config._ + + test("tracking for multiple types") { + val store = mock(classOf[KVStore]) + val tracking = new ElementTrackingStore(store, new SparkConf() + .set(ASYNC_TRACKING_ENABLED, false)) + + var type1 = 0L + var type2 = 0L + var flushed = false + + tracking.addTrigger(classOf[Type1], 100) { count => + type1 = count + } + tracking.addTrigger(classOf[Type2], 1000) { count => + type2 = count + } + tracking.onFlush { + flushed = true + } + + when(store.count(classOf[Type1])).thenReturn(1L) + tracking.write(new Type1, true) + assert(type1 === 0L) + assert(type2 === 0L) + + when(store.count(classOf[Type1])).thenReturn(100L) + tracking.write(new Type1, true) + assert(type1 === 0L) + assert(type2 === 0L) + + when(store.count(classOf[Type1])).thenReturn(101L) + tracking.write(new Type1, true) + assert(type1 === 101L) + assert(type2 === 0L) + + when(store.count(classOf[Type1])).thenReturn(200L) + tracking.write(new Type1, true) + assert(type1 === 200L) + assert(type2 === 0L) + + when(store.count(classOf[Type2])).thenReturn(500L) + tracking.write(new Type2, true) + assert(type1 === 200L) + assert(type2 === 0L) + + when(store.count(classOf[Type2])).thenReturn(1000L) + tracking.write(new Type2, true) + assert(type1 === 200L) + assert(type2 === 0L) + + when(store.count(classOf[Type2])).thenReturn(2000L) + tracking.write(new Type2, true) + assert(type1 === 200L) + assert(type2 === 2000L) + + tracking.close(false) + assert(flushed) + verify(store, never()).close() + } + + private class Type1 + private class Type2 + +} diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index df5f0b5335e82..326546787ab6c 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -42,6 +42,7 @@ import org.apache.spark.deploy.history.HistoryServerSuite import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus} +import org.apache.spark.status.config._ private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler { @@ -525,14 +526,15 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } - ignore("stage & job retention") { + test("stage & job retention") { val conf = new SparkConf() .setMaster("local") .setAppName("test") .set("spark.ui.enabled", "true") .set("spark.ui.port", "0") - .set("spark.ui.retainedStages", "3") - .set("spark.ui.retainedJobs", "2") + .set(MAX_RETAINED_STAGES, 3) + .set(MAX_RETAINED_JOBS, 2) + .set(ASYNC_TRACKING_ENABLED, false) val sc = new SparkContext(conf) assert(sc.ui.isDefined) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index c018fc8a332fa..fe0ad39c29025 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -95,4 +95,11 @@ object StaticSQLConf { .stringConf .toSequence .createOptional + + val UI_RETAINED_EXECUTIONS = + buildStaticConf("spark.sql.ui.retainedExecutions") + .doc("Number of executions to retain in the Spark UI.") + .intConf + .createWithDefault(1000) + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 43cec4807ae4d..cf0000c6393a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -27,14 +27,15 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric._ -import org.apache.spark.status.LiveEntity +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.kvstore.KVStore private[sql] class SQLAppStatusListener( conf: SparkConf, - kvstore: KVStore, + kvstore: ElementTrackingStore, live: Boolean, ui: Option[SparkUI] = None) extends SparkListener with Logging { @@ -51,6 +52,23 @@ private[sql] class SQLAppStatusListener( private var uiInitialized = false + kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count => + cleanupExecutions(count) + } + + kvstore.onFlush { + if (!live) { + val now = System.nanoTime() + liveExecutions.values.asScala.foreach { exec => + // This saves the partial aggregated metrics to the store; this works currently because + // when the SHS sees an updated event log, all old data for the application is thrown + // away. + exec.metricsValues = aggregateMetrics(exec) + exec.write(kvstore, now) + } + } + } + override def onJobStart(event: SparkListenerJobStart): Unit = { val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) if (executionIdString == null) { @@ -317,6 +335,17 @@ private[sql] class SQLAppStatusListener( } } + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS) + if (countToDelete <= 0) { + return + } + + val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[SQLExecutionUIData]), + countToDelete.toInt) { e => e.completionTime.isDefined } + toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) } + } + } private class LiveExecutionData(val executionId: Long) extends LiveEntity { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 586d3ae411c74..7fd5f7395cdf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.scheduler.SparkListener -import org.apache.spark.status.AppStatusPlugin +import org.apache.spark.status.{AppStatusPlugin, ElementTrackingStore} import org.apache.spark.status.KVUtils.KVIndexParam import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils @@ -84,7 +84,7 @@ private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { override def setupListeners( conf: SparkConf, - store: KVStore, + store: ElementTrackingStore, addListenerFn: SparkListener => Unit, live: Boolean): Unit = { // For live applications, the listener is installed in [[setupUI]]. This also avoids adding @@ -100,7 +100,8 @@ private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { case Some(sc) => // If this is a live application, then install a listener that will enable the SQL // tab as soon as there's a SQL event posted to the bus. - val listener = new SQLAppStatusListener(sc.conf, ui.store.store, true, Some(ui)) + val listener = new SQLAppStatusListener(sc.conf, + ui.store.store.asInstanceOf[ElementTrackingStore], true, Some(ui)) sc.listenerBus.addToStatusQueue(listener) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index eba8d55daad58..932950687942c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.status.ElementTrackingStore import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} import org.apache.spark.util.kvstore.InMemoryStore @@ -43,7 +44,9 @@ import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ - override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + override protected def sparkConf = { + super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L).set(ASYNC_TRACKING_ENABLED, false) + } private def createTestDataFrame: DataFrame = { Seq( @@ -107,10 +110,12 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest private def sqlStoreTest(name: String) (fn: (SQLAppStatusStore, SparkListenerBus) => Unit): Unit = { test(name) { - val store = new InMemoryStore() + val conf = sparkConf + val store = new ElementTrackingStore(new InMemoryStore(), conf) val bus = new ReplayListenerBus() - val listener = new SQLAppStatusListener(sparkConf, store, true) + val listener = new SQLAppStatusListener(conf, store, true) bus.addListener(listener) + store.close(false) val sqlStore = new SQLAppStatusStore(store, Some(listener)) fn(sqlStore, bus) } @@ -491,15 +496,15 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe class SQLListenerMemoryLeakSuite extends SparkFunSuite { - // TODO: this feature is not yet available in SQLAppStatusStore. - ignore("no memory leak") { - quietly { - val conf = new SparkConf() - .setMaster("local") - .setAppName("test") - .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly - .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly - withSpark(new SparkContext(conf)) { sc => + test("no memory leak") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly + .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly + .set(ASYNC_TRACKING_ENABLED, false) + withSpark(new SparkContext(conf)) { sc => + quietly { val spark = new SparkSession(sc) import spark.implicits._ // Run 100 successful executions and 100 failed executions.