Skip to content

Commit

Permalink
[SPARK-20653][CORE] Add cleaning of old elements from the status store.
Browse files Browse the repository at this point in the history
This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.

The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.

The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.

The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19751 from vanzin/SPARK-20653.
  • Loading branch information
Marcelo Vanzin authored and squito committed Dec 18, 2017
1 parent fb3636b commit 772e464
Show file tree
Hide file tree
Showing 22 changed files with 713 additions and 81 deletions.
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
Expand Down Expand Up @@ -304,6 +305,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val (kvstore, needReplay) = uiStorePath match {
case Some(path) =>
try {
// The store path is not guaranteed to exist - maybe it hasn't been created, or was
// invalidated because changes to the event log were detected. Need to replay in that
// case.
val _replay = !path.isDirectory()
(createDiskStore(path, conf), _replay)
} catch {
Expand All @@ -318,24 +322,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
(new InMemoryStore(), true)
}

val trackingStore = new ElementTrackingStore(kvstore, conf)
if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(kvstore, conf, false,
val listener = new AppStatusListener(trackingStore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false)
}
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
listener.flush()
trackingStore.close(false)
} catch {
case e: Exception =>
try {
kvstore.close()
} catch {
case _e: Exception => logInfo("Error closing store.", _e)
Utils.tryLogNonFatalError {
trackingStore.close()
}
uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {
Expand Down
Expand Up @@ -240,11 +240,6 @@ package object config {
.stringConf
.createOptional

// To limit memory usage, we only track information for a fixed number of tasks
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)

// To limit how many applications are shown in the History Server summary ui
private[spark] val HISTORY_UI_MAX_APPS =
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
Expand Down
188 changes: 176 additions & 12 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.scope._
import org.apache.spark.util.kvstore.KVStore

/**
* A Spark listener that writes application information to a data store. The types written to the
Expand All @@ -42,7 +41,7 @@ import org.apache.spark.util.kvstore.KVStore
* unfinished tasks can be more accurately calculated (see SPARK-21922).
*/
private[spark] class AppStatusListener(
kvstore: KVStore,
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
Expand All @@ -51,13 +50,15 @@ private[spark] class AppStatusListener(

private var sparkVersion = SPARK_VERSION
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1

// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
// operations that we can live without when rapidly processing incoming task events.
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)

// Keep track of live entities, so that task metrics can be efficiently updated (without
Expand All @@ -68,10 +69,25 @@ private[spark] class AppStatusListener(
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
// Keep the active executor count as a separate variable to avoid having to do synchronization
// around liveExecutors.
@volatile private var activeExecutorCount = 0

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(version) => sparkVersion = version
case _ =>
kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }

kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
cleanupJobs(count)
}

kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
cleanupStages(count)
}

kvstore.onFlush {
if (!live) {
flush()
}
}

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
Expand All @@ -97,6 +113,7 @@ private[spark] class AppStatusListener(
Seq(attempt))

kvstore.write(new ApplicationInfoWrapper(appInfo))
kvstore.write(appSummary)
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
Expand Down Expand Up @@ -158,10 +175,11 @@ private[spark] class AppStatusListener(
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
liveExecutors.remove(event.executorId).foreach { exec =>
val now = System.nanoTime()
activeExecutorCount = math.max(0, activeExecutorCount - 1)
exec.isActive = false
exec.removeTime = new Date(event.time)
exec.removeReason = event.reason
update(exec, now)
update(exec, now, last = true)

// Remove all RDD distributions that reference the removed executor, in case there wasn't
// a corresponding event.
Expand Down Expand Up @@ -290,8 +308,11 @@ private[spark] class AppStatusListener(
}

job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
update(job, now)
update(job, now, last = true)
}

appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
kvstore.write(appSummary)
}

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
Expand Down Expand Up @@ -350,6 +371,13 @@ private[spark] class AppStatusListener(
job.activeTasks += 1
maybeUpdate(job, now)
}

if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
Expand Down Expand Up @@ -449,6 +477,13 @@ private[spark] class AppStatusListener(
esummary.metrics.update(metricsDelta)
}
maybeUpdate(esummary, now)

if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
Expand Down Expand Up @@ -516,8 +551,11 @@ private[spark] class AppStatusListener(
}

stage.executorSummaries.values.foreach(update(_, now))
update(stage, now)
update(stage, now, last = true)
}

appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
kvstore.write(appSummary)
}

override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
Expand Down Expand Up @@ -573,7 +611,7 @@ private[spark] class AppStatusListener(
}

/** Flush all live entities' data to the underlying store. */
def flush(): Unit = {
private def flush(): Unit = {
val now = System.nanoTime()
liveStages.values.asScala.foreach { stage =>
update(stage, now)
Expand Down Expand Up @@ -708,7 +746,10 @@ private[spark] class AppStatusListener(
}

private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId, addTime))
liveExecutors.getOrElseUpdate(executorId, {
activeExecutorCount += 1
new LiveExecutor(executorId, addTime)
})
}

private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
Expand Down Expand Up @@ -754,8 +795,8 @@ private[spark] class AppStatusListener(
}
}

private def update(entity: LiveEntity, now: Long): Unit = {
entity.write(kvstore, now)
private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = {
entity.write(kvstore, now, checkTriggers = last)
}

/** Update a live entity only if it hasn't been updated in the last configured period. */
Expand All @@ -772,4 +813,127 @@ private[spark] class AppStatusListener(
}
}

private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
val dead = count - activeExecutorCount

if (dead > threshold) {
val countToDelete = calculateNumberToRemove(dead, threshold)
val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
.max(countToDelete).first(false).last(false).asScala.toSeq
toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
}
}

private def cleanupJobs(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
if (countToDelete <= 0L) {
return
}

val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
countToDelete.toInt) { j =>
j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
}

private def cleanupStages(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES))
if (countToDelete <= 0L) {
return
}

val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
countToDelete.toInt) { s =>
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}

stages.foreach { s =>
val key = s.id
kvstore.delete(s.getClass(), key)

val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper])
.index("stage")
.first(key)
.last(key)
.asScala
.toSeq
execSummaries.foreach { e =>
kvstore.delete(e.getClass(), e.id)
}

val tasks = kvstore.view(classOf[TaskDataWrapper])
.index("stage")
.first(key)
.last(key)
.asScala

tasks.foreach { t =>
kvstore.delete(t.getClass(), t.info.taskId)
}

// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
.index("stageId")
.first(s.stageId)
.last(s.stageId)
.closeableIterator()

val hasMoreAttempts = try {
remainingAttempts.asScala.exists { other =>
other.info.attemptId != s.info.attemptId
}
} finally {
remainingAttempts.close()
}

if (!hasMoreAttempts) {
kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
}
}
}

private def cleanupTasks(stage: LiveStage): Unit = {
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
if (countToDelete > 0) {
val stageKey = Array(stage.info.stageId, stage.info.attemptId)
val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
.last(stageKey)

// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
!live || t.info.status != TaskState.RUNNING.toString()
}
toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
stage.savedTasks.addAndGet(-toDelete.size)

// If there are more running tasks than the configured limit, delete running tasks. This
// should be extremely rare since the limit should generally far exceed the number of tasks
// that can run in parallel.
val remaining = countToDelete - toDelete.size
if (remaining > 0) {
val runningTasksToDelete = view.max(remaining).iterator().asScala.toList
runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
stage.savedTasks.addAndGet(-remaining)
}
}
stage.cleaning = false
}

/**
* Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
* asynchronously, this method may return 0 in case enough items have been deleted already.
*/
private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = {
if (dataSize > retainedSize) {
math.max(retainedSize / 10L, dataSize - retainedSize)
} else {
0L
}
}

}
Expand Up @@ -48,7 +48,7 @@ private[spark] trait AppStatusPlugin {
*/
def setupListeners(
conf: SparkConf,
store: KVStore,
store: ElementTrackingStore,
addListenerFn: SparkListener => Unit,
live: Boolean): Unit

Expand Down
Expand Up @@ -330,6 +330,10 @@ private[spark] class AppStatusStore(
store.read(classOf[PoolData], name)
}

def appSummary(): AppSummary = {
store.read(classOf[AppSummary], classOf[AppSummary].getName())
}

def close(): Unit = {
store.close()
}
Expand All @@ -347,7 +351,7 @@ private[spark] object AppStatusStore {
* @param addListenerFn Function to register a listener with a bus.
*/
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
val store = new InMemoryStore()
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true)
addListenerFn(listener)
AppStatusPlugin.loadPlugins().foreach { p =>
Expand Down

0 comments on commit 772e464

Please sign in to comment.