From b08b711662b7853bc4081f85b2fcc265c47de410 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 27 Oct 2016 14:29:41 -0700 Subject: [PATCH 1/6] SHS-NG M3: Add initial listener implementation to collect app state. The initial listener is based on the existing JobProgressListener (and others), and tries to mimin their behavior as much as possible. The change also includes some minor code movement so that some types and methods from the initial history server code code can be reused. The code introduces a few mutable versions of public API types, used internally, to make it easier to update information without ugly copy methods, and also to make certain updates cheaper. Note the code here is not 100% correct. This is meant as a building ground for the UI integration in the next milestones. As different parts of the UI are ported, fixes will be made to the different parts of this code to account for the needed behavior. I also added annotations to API types so that Jackson is able to correctly deserialize options, sequences and maps that store primitive types. --- .../apache/spark/util/kvstore/KVTypeInfo.java | 2 + .../apache/spark/util/kvstore/LevelDB.java | 2 +- .../spark/status/api/v1/StageStatus.java | 3 +- .../deploy/history/FsHistoryProvider.scala | 1 + .../apache/spark/deploy/history/config.scala | 6 - .../spark/status/AppStatusListener.scala | 535 ++++++++++++++ .../org/apache/spark/status/KVUtils.scala | 73 ++ .../org/apache/spark/status/LiveEntity.scala | 522 +++++++++++++ .../status/api/v1/AllStagesResource.scala | 4 +- .../org/apache/spark/status/api/v1/api.scala | 11 +- .../org/apache/spark/status/storeTypes.scala | 98 +++ .../spark/status/AppStatusListenerSuite.scala | 688 ++++++++++++++++++ project/MimaExcludes.scala | 2 + 13 files changed, 1934 insertions(+), 13 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/status/AppStatusListener.scala create mode 100644 core/src/main/scala/org/apache/spark/status/KVUtils.scala create mode 100644 core/src/main/scala/org/apache/spark/status/LiveEntity.scala create mode 100644 core/src/main/scala/org/apache/spark/status/storeTypes.scala create mode 100644 core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java index a2b077e4531ee..870b484f99068 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java @@ -46,6 +46,7 @@ public KVTypeInfo(Class type) throws Exception { KVIndex idx = f.getAnnotation(KVIndex.class); if (idx != null) { checkIndex(idx, indices); + f.setAccessible(true); indices.put(idx.value(), idx); f.setAccessible(true); accessors.put(idx.value(), new FieldAccessor(f)); @@ -58,6 +59,7 @@ public KVTypeInfo(Class type) throws Exception { checkIndex(idx, indices); Preconditions.checkArgument(m.getParameterTypes().length == 0, "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName()); + m.setAccessible(true); indices.put(idx.value(), idx); m.setAccessible(true); accessors.put(idx.value(), new MethodAccessor(m)); diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index ff48b155fab31..4f9e10ca20066 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -76,7 +76,7 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception { this.types = new ConcurrentHashMap<>(); Options options = new Options(); - options.createIfMissing(!path.exists()); + options.createIfMissing(true); this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options)); byte[] versionData = db().get(STORE_VERSION_KEY); diff --git a/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java index 9dbb565aab707..40b5f627369d5 100644 --- a/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java +++ b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java @@ -23,7 +23,8 @@ public enum StageStatus { ACTIVE, COMPLETE, FAILED, - PENDING; + PENDING, + SKIPPED; public static StageStatus fromString(String str) { return EnumUtil.parseIgnoreCase(StageStatus.class, str); diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3889dd097ee59..f9ef4f53e5c32 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -42,6 +42,7 @@ import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala index fb9e997def0dd..52dedc1a2ed41 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -19,16 +19,10 @@ package org.apache.spark.deploy.history import java.util.concurrent.TimeUnit -import scala.annotation.meta.getter - import org.apache.spark.internal.config.ConfigBuilder -import org.apache.spark.util.kvstore.KVIndex private[spark] object config { - /** Use this to annotate constructor params to be used as KVStore indices. */ - type KVIndexParam = KVIndex @getter - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala new file mode 100644 index 0000000000000..870459a09c108 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.status.api.v1 +import org.apache.spark.storage._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.scope._ +import org.apache.spark.util.kvstore.KVStore + +/** + * A Spark listener that writes application information to a data store. The types written to the + * store are defined in the `storeTypes.scala` file and are based on the public REST API. + */ +private class AppStatusListener(kvstore: KVStore) extends SparkListener with Logging { + + private var sparkVersion = SPARK_VERSION + private var appInfo: v1.ApplicationInfo = null + private var coresPerTask: Int = 1 + + // Keep track of live entities, so that task metrics can be efficiently updated (without + // causing too many writes to the underlying store, and other expensive operations). + private val liveStages = new HashMap[(Int, Int), LiveStage]() + private val liveJobs = new HashMap[Int, LiveJob]() + private val liveExecutors = new HashMap[String, LiveExecutor]() + private val liveTasks = new HashMap[Long, LiveTask]() + private val liveRDDs = new HashMap[Int, LiveRDD]() + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case SparkListenerLogStart(version) => sparkVersion = version + case _ => + } + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + assert(event.appId.isDefined, "Application without IDs are not supported.") + + val attempt = new v1.ApplicationAttemptInfo( + event.appAttemptId, + new Date(event.time), + new Date(-1), + new Date(event.time), + -1L, + event.sparkUser, + false, + sparkVersion) + + appInfo = new v1.ApplicationInfo( + event.appId.get, + event.appName, + None, + None, + None, + None, + Seq(attempt)) + + kvstore.write(new ApplicationInfoWrapper(appInfo)) + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + val old = appInfo.attempts.head + val attempt = new v1.ApplicationAttemptInfo( + old.attemptId, + old.startTime, + new Date(event.time), + new Date(event.time), + event.time - old.startTime.getTime(), + old.sparkUser, + true, + old.appSparkVersion) + + appInfo = new v1.ApplicationInfo( + appInfo.id, + appInfo.name, + None, + None, + None, + None, + Seq(attempt)) + kvstore.write(new ApplicationInfoWrapper(appInfo)) + } + + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { + // This needs to be an update in case an executor re-registers after the driver has + // marked it as "dead". + val exec = getOrCreateExecutor(event.executorId) + exec.host = event.executorInfo.executorHost + exec.isActive = true + exec.totalCores = event.executorInfo.totalCores + exec.maxTasks = event.executorInfo.totalCores / coresPerTask + exec.executorLogs = event.executorInfo.logUrlMap + update(exec) + } + + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + liveExecutors.remove(event.executorId).foreach { exec => + exec.isActive = false + update(exec) + } + } + + override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { + updateBlackListStatus(event.executorId, true) + } + + override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { + updateBlackListStatus(event.executorId, false) + } + + override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { + updateNodeBlackList(event.hostId, true) + } + + override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { + updateNodeBlackList(event.hostId, false) + } + + private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { + liveExecutors.get(execId).foreach { exec => + exec.isBlacklisted = blacklisted + update(exec) + } + } + + private def updateNodeBlackList(host: String, blacklisted: Boolean): Unit = { + // Implicitly (un)blacklist every executor associated with the node. + liveExecutors.values.foreach { exec => + if (exec.hostname == host) { + exec.isBlacklisted = blacklisted + update(exec) + } + } + } + + override def onJobStart(event: SparkListenerJobStart): Unit = { + // Compute (a potential underestimate of) the number of tasks that will be run by this job. + // This may be an underestimate because the job start event references all of the result + // stages' transitive stage dependencies, but some of these stages might be skipped if their + // output is available from earlier runs. + // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. + val numTasks = { + val missingStages = event.stageInfos.filter(_.completionTime.isEmpty) + missingStages.map(_.numTasks).sum + } + + val lastStageInfo = event.stageInfos.lastOption + val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") + + val jobGroup = Option(event.properties) + .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) } + + val job = new LiveJob( + event.jobId, + lastStageName, + Option(event.time).filter(_ >= 0).map(new Date(_)), + event.stageIds, + jobGroup, + numTasks) + liveJobs.put(event.jobId, job) + update(job) + + event.stageInfos.foreach { stageInfo => + // A new job submission may re-use an existing stage, so this code needs to do an update + // instead of just a write. + val stage = getOrCreateStage(stageInfo) + stage.jobs = stage.jobs :+ job + stage.jobIds += event.jobId + update(stage) + } + + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + liveJobs.remove(event.jobId).foreach { job => + job.status = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case JobFailed(_) => JobExecutionStatus.FAILED + } + + job.completionTime = if (event.time != -1) Some(new Date(event.time)) else None + update(job) + } + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + val stage = getOrCreateStage(event.stageInfo) + stage.status = v1.StageStatus.ACTIVE + + // Look at all active jobs to find the ones that mention this stage. + stage.jobs = liveJobs.values + .filter(_.stageIds.contains(event.stageInfo.stageId)) + .toSeq + stage.jobIds = stage.jobs.map(_.jobId).toSet + + stage.schedulingPool = Option(event.properties).flatMap { p => + Option(p.getProperty("spark.scheduler.pool")) + }.getOrElse(SparkUI.DEFAULT_POOL_NAME) + + stage.jobs.foreach { job => + job.completedStages = job.completedStages - event.stageInfo.stageId + job.activeStages += 1 + update(job) + } + + event.stageInfo.rddInfos.foreach { info => + if (info.storageLevel.isValid) { + update(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info))) + } + } + + update(stage) + } + + override def onTaskStart(event: SparkListenerTaskStart): Unit = { + val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId) + liveTasks.put(event.taskInfo.taskId, task) + update(task) + + liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage => + stage.activeTasks += 1 + stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime) + update(stage) + + stage.jobs.foreach { job => + job.activeTasks += 1 + update(job) + } + } + + liveExecutors.get(event.taskInfo.executorId).foreach { exec => + exec.activeTasks += 1 + exec.totalTasks += 1 + update(exec) + } + } + + override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { + liveTasks.get(event.taskInfo.taskId).foreach { task => + update(task) + } + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { + // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task + // completion event is for. Let's just drop it here. This means we might have some speculation + // tasks on the web ui that are never marked as complete. + if (event.taskInfo == null || event.stageAttemptId == -1) { + return + } + + val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task => + val errorMessage = event.reason match { + case Success => + None + case k: TaskKilled => + Some(k.reason) + case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates + Some(e.toErrorString) + case e: TaskFailedReason => // All other failure cases + Some(e.toErrorString) + case other => + logInfo(s"Unhandled task end reason: $other") + None + } + task.errorMessage = errorMessage + val delta = task.updateMetrics(event.taskMetrics) + update(task) + delta + }.orNull + + val (completedDelta, failedDelta) = event.reason match { + case Success => + (1, 0) + case _ => + (0, 1) + } + + liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage => + if (metricsDelta != null) { + stage.metrics.update(metricsDelta) + } + stage.activeTasks -= 1 + stage.completedTasks += completedDelta + stage.failedTasks += failedDelta + update(stage) + + stage.jobs.foreach { job => + job.activeTasks -= 1 + job.completedTasks += completedDelta + job.failedTasks += failedDelta + update(job) + } + + val esummary = stage.executorSummary(event.taskInfo.executorId) + esummary.taskTime += event.taskInfo.duration + esummary.succeededTasks += completedDelta + esummary.failedTasks += failedDelta + if (metricsDelta != null) { + esummary.metrics.update(metricsDelta) + } + update(esummary) + } + + liveExecutors.get(event.taskInfo.executorId).foreach { exec => + if (event.taskMetrics != null) { + val readMetrics = event.taskMetrics.shuffleReadMetrics + exec.totalGcTime += event.taskMetrics.jvmGCTime + exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead + exec.totalShuffleRead += readMetrics.localBytesRead + readMetrics.remoteBytesRead + exec.totalShuffleWrite += event.taskMetrics.shuffleWriteMetrics.bytesWritten + } + + exec.activeTasks -= 1 + exec.completedTasks += completedDelta + exec.failedTasks += failedDelta + exec.totalDuration += event.taskInfo.duration + update(exec) + } + } + + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptId)).foreach { stage => + stage.info = event.stageInfo + + // Because of SPARK-20205, old event logs may contain valid stages without a submission time + // in their start event. In those cases, we can only detect whether a stage was skipped by + // waiting until the completion event, at which point the field would have been set. + val skipped = !event.stageInfo.submissionTime.isDefined + stage.status = event.stageInfo.failureReason match { + case Some(_) => v1.StageStatus.FAILED + case None => if (skipped) v1.StageStatus.SKIPPED else v1.StageStatus.COMPLETE + } + update(stage) + + stage.jobs.foreach { job => + stage.status match { + case v1.StageStatus.COMPLETE => + job.completedStages = job.completedStages + event.stageInfo.stageId + case v1.StageStatus.SKIPPED => + job.skippedStages += event.stageInfo.stageId + job.skippedTasks += event.stageInfo.numTasks + case _ => + job.failedStages += 1 + } + job.activeStages -= 1 + update(job) + } + + stage.executorSummaries.values.foreach(update) + update(stage) + } + } + + override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { + // This needs to set fields that are already set by onExecutorAdded because the driver is + // considered an "executor" in the UI, but does not have a SparkListenerExecutorAdded event. + val exec = getOrCreateExecutor(event.blockManagerId.executorId) + exec.hostPort = event.blockManagerId.hostPort + event.maxOnHeapMem.foreach { _ => + exec.totalOnHeap = event.maxOnHeapMem.get + exec.totalOffHeap = event.maxOffHeapMem.get + } + exec.isActive = true + exec.maxMemory = event.maxMem + update(exec) + } + + override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { + // Nothing to do here. Covered by onExecutorRemoved. + } + + override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { + liveRDDs.remove(event.rddId) + kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) => + liveTasks.get(taskId).foreach { task => + val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) + val delta = task.updateMetrics(metrics) + update(task) + + liveStages.get((sid, sAttempt)).foreach { stage => + stage.metrics.update(delta) + update(stage) + + val esummary = stage.executorSummary(event.execId) + esummary.metrics.update(delta) + update(esummary) + } + } + } + } + + override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + event.blockUpdatedInfo.blockId match { + case block: RDDBlockId => updateRDDBlock(event, block) + case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too. + } + } + + private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { + val executorId = event.blockUpdatedInfo.blockManagerId.executorId + + // Whether values are being added to or removed from the existing accounting. + val storageLevel = event.blockUpdatedInfo.storageLevel + val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) + val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) + + // Function to apply a delta to a value, but ensure that it doesn't go negative. + def newValue(old: Long, delta: Long): Long = math.max(0, old + delta) + + val updatedStorageLevel = if (storageLevel.isValid) { + Some(storageLevel.description) + } else { + None + } + + // We need information about the executor to update some memory accounting values in the + // RDD info, so read that beforehand. + val maybeExec = liveExecutors.get(executorId) + var rddBlocksDelta = 0 + + // Update the block entry in the RDD info, keeping track of the deltas above so that we + // can update the executor information too. + liveRDDs.get(block.rddId).foreach { rdd => + val partition = rdd.partition(block.name) + + val executors = if (updatedStorageLevel.isDefined) { + if (!partition.executors.contains(executorId)) { + rddBlocksDelta = 1 + } + partition.executors + executorId + } else { + rddBlocksDelta = -1 + partition.executors - executorId + } + + // Only update the partition if it's still stored in some executor, otherwise get rid of it. + if (executors.nonEmpty) { + if (updatedStorageLevel.isDefined) { + partition.storageLevel = updatedStorageLevel.get + } + partition.memoryUsed = newValue(partition.memoryUsed, memoryDelta) + partition.diskUsed = newValue(partition.diskUsed, diskDelta) + partition.executors = executors + } else { + rdd.removePartition(block.name) + } + + maybeExec.foreach { exec => + if (exec.rddBlocks + rddBlocksDelta > 0) { + val dist = rdd.distribution(exec) + dist.memoryRemaining = newValue(dist.memoryRemaining, -memoryDelta) + dist.memoryUsed = newValue(dist.memoryUsed, memoryDelta) + dist.diskUsed = newValue(dist.diskUsed, diskDelta) + + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + dist.offHeapUsed = newValue(dist.offHeapUsed, memoryDelta) + dist.offHeapRemaining = newValue(dist.offHeapRemaining, -memoryDelta) + } else { + dist.onHeapUsed = newValue(dist.onHeapUsed, memoryDelta) + dist.onHeapRemaining = newValue(dist.onHeapRemaining, -memoryDelta) + } + } + } else { + rdd.removeDistribution(exec) + } + } + + if (updatedStorageLevel.isDefined) { + rdd.storageLevel = updatedStorageLevel.get + } + rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta) + rdd.diskUsed = newValue(rdd.diskUsed, diskDelta) + update(rdd) + } + + maybeExec.foreach { exec => + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = newValue(exec.usedOffHeap, memoryDelta) + } else { + exec.usedOnHeap = newValue(exec.usedOnHeap, memoryDelta) + } + } + exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta) + exec.diskUsed = newValue(exec.diskUsed, diskDelta) + exec.rddBlocks += rddBlocksDelta + if (exec.hasMemoryInfo || rddBlocksDelta != 0) { + update(exec) + } + } + } + + private def getOrCreateExecutor(executorId: String): LiveExecutor = { + liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId)) + } + + private def getOrCreateStage(info: StageInfo): LiveStage = { + val stage = liveStages.getOrElseUpdate((info.stageId, info.attemptId), new LiveStage()) + stage.info = info + stage + } + + private def update(entity: LiveEntity): Unit = { + entity.write(kvstore) + } + +} diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala new file mode 100644 index 0000000000000..a5f0d02e2e6e8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File + +import scala.annotation.meta.getter +import scala.language.implicitConversions +import scala.reflect.{classTag, ClassTag} + +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.spark.internal.Logging +import org.apache.spark.util.kvstore._ + +private[spark] object KVUtils extends Logging { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + /** + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. + */ + class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + + } + + /** + * Open or create a LevelDB store. + * + * @param path Location of the store. + * @param metadata Metadata value to compare to the data in the store. If the store does not + * contain any metadata (e.g. it's a new store), this value is written as + * the store's metadata. + */ + def open[M: ClassTag](path: File, metadata: M): LevelDB = { + require(metadata != null, "Metadata is required.") + + val db = new LevelDB(path, new KVStoreScalaSerializer()) + val dbMeta = db.getMetadata(classTag[M].runtimeClass) + if (dbMeta == null) { + db.setMetadata(metadata) + } else if (dbMeta != metadata) { + db.close() + throw new MetadataMismatchException() + } + + db + } + + class MetadataMismatchException extends Exception + +} diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala new file mode 100644 index 0000000000000..a92c78d6e057c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.Date + +import scala.collection.mutable.HashMap + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} +import org.apache.spark.status.api.v1 +import org.apache.spark.storage.RDDInfo +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.AccumulatorContext +import org.apache.spark.util.kvstore.KVStore + +/** + * A mutable representation of a live entity in Spark (jobs, stages, tasks, et al). Every live + * entity uses one of these instances to keep track of their evolving state, and periodically + * flush an immutable view of the entity to the app state store. + */ +private[spark] abstract class LiveEntity { + + def write(store: KVStore): Unit = { + store.write(doUpdate()) + } + + protected def doUpdate(): Any + +} + +private class LiveJob( + val jobId: Int, + name: String, + submissionTime: Option[Date], + val stageIds: Seq[Int], + jobGroup: Option[String], + numTasks: Int) extends LiveEntity { + + var activeTasks = 0 + var completedTasks = 0 + var failedTasks = 0 + + var skippedTasks = 0 + var skippedStages = Set[Int]() + + var status = JobExecutionStatus.RUNNING + var completionTime: Option[Date] = None + + var completedStages: Set[Int] = Set() + var activeStages = 0 + var failedStages = 0 + + override protected def doUpdate(): Any = { + val info = new v1.JobData( + jobId, + name, + None, // description is always None? + submissionTime, + completionTime, + stageIds, + jobGroup, + status, + numTasks, + activeTasks, + completedTasks, + skippedTasks, + failedTasks, + activeStages, + completedStages.size, + skippedStages.size, + failedStages) + new JobDataWrapper(info, skippedStages) + } + +} + +private class LiveTask( + info: TaskInfo, + stageId: Int, + stageAttemptId: Int) extends LiveEntity { + + import LiveEntityHelpers._ + + private var recordedMetrics: v1.TaskMetrics = null + + var errorMessage: Option[String] = None + + /** + * Update the metrics for the task and return the difference between the previous and new + * values. + */ + def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = { + if (metrics != null) { + val old = recordedMetrics + recordedMetrics = new v1.TaskMetrics( + metrics.executorDeserializeTime, + metrics.executorDeserializeCpuTime, + metrics.executorRunTime, + metrics.executorCpuTime, + metrics.resultSize, + metrics.jvmGCTime, + metrics.resultSerializationTime, + metrics.memoryBytesSpilled, + metrics.diskBytesSpilled, + new v1.InputMetrics( + metrics.inputMetrics.bytesRead, + metrics.inputMetrics.recordsRead), + new v1.OutputMetrics( + metrics.outputMetrics.bytesWritten, + metrics.outputMetrics.recordsWritten), + new v1.ShuffleReadMetrics( + metrics.shuffleReadMetrics.remoteBlocksFetched, + metrics.shuffleReadMetrics.localBlocksFetched, + metrics.shuffleReadMetrics.fetchWaitTime, + metrics.shuffleReadMetrics.remoteBytesRead, + metrics.shuffleReadMetrics.remoteBytesReadToDisk, + metrics.shuffleReadMetrics.localBytesRead, + metrics.shuffleReadMetrics.recordsRead), + new v1.ShuffleWriteMetrics( + metrics.shuffleWriteMetrics.bytesWritten, + metrics.shuffleWriteMetrics.writeTime, + metrics.shuffleWriteMetrics.recordsWritten)) + if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics + } else { + null + } + } + + /** + * Return a new TaskMetrics object containing the delta of the various fields of the given + * metrics objects. This is currently targeted at updating stage data, so it does not + * necessarily calculate deltas for all the fields. + */ + private def calculateMetricsDelta( + metrics: v1.TaskMetrics, + old: v1.TaskMetrics): v1.TaskMetrics = { + val shuffleWriteDelta = new v1.ShuffleWriteMetrics( + metrics.shuffleWriteMetrics.bytesWritten - old.shuffleWriteMetrics.bytesWritten, + 0L, + metrics.shuffleWriteMetrics.recordsWritten - old.shuffleWriteMetrics.recordsWritten) + + val shuffleReadDelta = new v1.ShuffleReadMetrics( + 0L, 0L, 0L, + metrics.shuffleReadMetrics.remoteBytesRead - old.shuffleReadMetrics.remoteBytesRead, + metrics.shuffleReadMetrics.remoteBytesReadToDisk - + old.shuffleReadMetrics.remoteBytesReadToDisk, + metrics.shuffleReadMetrics.localBytesRead - old.shuffleReadMetrics.localBytesRead, + metrics.shuffleReadMetrics.recordsRead - old.shuffleReadMetrics.recordsRead) + + val inputDelta = new v1.InputMetrics( + metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead, + metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead) + + val outputDelta = new v1.OutputMetrics( + metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten, + metrics.outputMetrics.recordsWritten - old.outputMetrics.recordsWritten) + + new v1.TaskMetrics( + 0L, 0L, + metrics.executorRunTime - old.executorRunTime, + metrics.executorCpuTime - old.executorCpuTime, + 0L, 0L, 0L, + metrics.memoryBytesSpilled - old.memoryBytesSpilled, + metrics.diskBytesSpilled - old.diskBytesSpilled, + inputDelta, + outputDelta, + shuffleReadDelta, + shuffleWriteDelta) + } + + override protected def doUpdate(): Any = { + val task = new v1.TaskData( + info.taskId, + info.index, + info.attemptNumber, + new Date(info.launchTime), + if (info.finished) Some(info.duration) else None, + info.executorId, + info.host, + info.status, + info.taskLocality.toString(), + info.speculative, + newAccumulatorInfos(info.accumulables), + errorMessage, + Option(recordedMetrics)) + new TaskDataWrapper(task) + } + +} + +private class LiveExecutor(val executorId: String) extends LiveEntity { + + var hostPort: String = null + var host: String = null + var isActive = true + var totalCores = 0 + + var rddBlocks = 0 + var memoryUsed = 0L + var diskUsed = 0L + var maxTasks = 0 + var maxMemory = 0L + + var totalTasks = 0 + var activeTasks = 0 + var completedTasks = 0 + var failedTasks = 0 + var totalDuration = 0L + var totalGcTime = 0L + var totalInputBytes = 0L + var totalShuffleRead = 0L + var totalShuffleWrite = 0L + var isBlacklisted = false + + var executorLogs = Map[String, String]() + + // Memory metrics. They may not be recorded (e.g. old event logs) so if totalOnHeap is not + // initialized, the store will not contain this information. + var totalOnHeap = -1L + var totalOffHeap = 0L + var usedOnHeap = 0L + var usedOffHeap = 0L + + def hasMemoryInfo: Boolean = totalOnHeap >= 0L + + def hostname: String = if (host != null) host else hostPort.split(":")(0) + + override protected def doUpdate(): Any = { + val memoryMetrics = if (totalOnHeap >= 0) { + Some(new v1.MemoryMetrics(usedOnHeap, usedOffHeap, totalOnHeap, totalOffHeap)) + } else { + None + } + + val info = new v1.ExecutorSummary( + executorId, + if (hostPort != null) hostPort else host, + isActive, + rddBlocks, + memoryUsed, + diskUsed, + totalCores, + maxTasks, + activeTasks, + failedTasks, + completedTasks, + totalTasks, + totalDuration, + totalGcTime, + totalInputBytes, + totalShuffleRead, + totalShuffleWrite, + isBlacklisted, + maxMemory, + executorLogs, + memoryMetrics) + new ExecutorSummaryWrapper(info) + } + +} + +/** Metrics tracked per stage (both total and per executor). */ +private class MetricsTracker { + var executorRunTime = 0L + var executorCpuTime = 0L + var inputBytes = 0L + var inputRecords = 0L + var outputBytes = 0L + var outputRecords = 0L + var shuffleReadBytes = 0L + var shuffleReadRecords = 0L + var shuffleWriteBytes = 0L + var shuffleWriteRecords = 0L + var memoryBytesSpilled = 0L + var diskBytesSpilled = 0L + + def update(delta: v1.TaskMetrics): Unit = { + executorRunTime += delta.executorRunTime + executorCpuTime += delta.executorCpuTime + inputBytes += delta.inputMetrics.bytesRead + inputRecords += delta.inputMetrics.recordsRead + outputBytes += delta.outputMetrics.bytesWritten + outputRecords += delta.outputMetrics.recordsWritten + shuffleReadBytes += delta.shuffleReadMetrics.localBytesRead + + delta.shuffleReadMetrics.remoteBytesRead + shuffleReadRecords += delta.shuffleReadMetrics.recordsRead + shuffleWriteBytes += delta.shuffleWriteMetrics.bytesWritten + shuffleWriteRecords += delta.shuffleWriteMetrics.recordsWritten + memoryBytesSpilled += delta.memoryBytesSpilled + diskBytesSpilled += delta.diskBytesSpilled + } + +} + +private class LiveExecutorStageSummary( + stageId: Int, + attemptId: Int, + executorId: String) extends LiveEntity { + + var taskTime = 0L + var succeededTasks = 0 + var failedTasks = 0 + var killedTasks = 0 + + val metrics = new MetricsTracker() + + override protected def doUpdate(): Any = { + val info = new v1.ExecutorStageSummary( + taskTime, + failedTasks, + succeededTasks, + metrics.inputBytes, + metrics.outputBytes, + metrics.shuffleReadBytes, + metrics.shuffleWriteBytes, + metrics.memoryBytesSpilled, + metrics.diskBytesSpilled) + new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info) + } + +} + +private class LiveStage extends LiveEntity { + + import LiveEntityHelpers._ + + var jobs = Seq[LiveJob]() + var jobIds = Set[Int]() + + var info: StageInfo = null + var status = v1.StageStatus.PENDING + + var schedulingPool: String = SparkUI.DEFAULT_POOL_NAME + + var activeTasks = 0 + var completedTasks = 0 + var failedTasks = 0 + + var firstLaunchTime = Long.MaxValue + + val metrics = new MetricsTracker() + + val executorSummaries = new HashMap[String, LiveExecutorStageSummary]() + + def executorSummary(executorId: String): LiveExecutorStageSummary = { + executorSummaries.getOrElseUpdate(executorId, + new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId)) + } + + override protected def doUpdate(): Any = { + val update = new v1.StageData( + status, + info.stageId, + info.attemptId, + + activeTasks, + completedTasks, + failedTasks, + + metrics.executorRunTime, + metrics.executorCpuTime, + info.submissionTime.map(new Date(_)), + if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) else None, + info.completionTime.map(new Date(_)), + + metrics.inputBytes, + metrics.inputRecords, + metrics.outputBytes, + metrics.outputRecords, + metrics.shuffleReadBytes, + metrics.shuffleReadRecords, + metrics.shuffleWriteBytes, + metrics.shuffleWriteRecords, + metrics.memoryBytesSpilled, + metrics.diskBytesSpilled, + + info.name, + info.details, + schedulingPool, + + newAccumulatorInfos(info.accumulables.values), + None, + None) + + new StageDataWrapper(update, jobIds) + } + +} + +private class LiveRDDPartition(val blockName: String) { + + var executors = Set[String]() + var storageLevel: String = null + var memoryUsed = 0L + var diskUsed = 0L + + def toApi(): v1.RDDPartitionInfo = { + new v1.RDDPartitionInfo( + blockName, + storageLevel, + memoryUsed, + diskUsed, + executors.toSeq.sorted) + } + +} + +private class LiveRDDDistribution(val exec: LiveExecutor) { + + var memoryRemaining = exec.maxMemory + var memoryUsed = 0L + var diskUsed = 0L + + var onHeapUsed = 0L + var offHeapUsed = 0L + var onHeapRemaining = 0L + var offHeapRemaining = 0L + + def toApi(): v1.RDDDataDistribution = { + new v1.RDDDataDistribution( + exec.hostPort, + memoryUsed, + memoryRemaining, + diskUsed, + if (exec.hasMemoryInfo) Some(onHeapUsed) else None, + if (exec.hasMemoryInfo) Some(offHeapUsed) else None, + if (exec.hasMemoryInfo) Some(onHeapRemaining) else None, + if (exec.hasMemoryInfo) Some(offHeapRemaining) else None) + } + +} + +private class LiveRDD(info: RDDInfo) extends LiveEntity { + + var storageLevel: String = info.storageLevel.description + var memoryUsed = 0L + var diskUsed = 0L + + private val partitions = new HashMap[String, LiveRDDPartition]() + private val distributions = new HashMap[String, LiveRDDDistribution]() + + def partition(blockName: String): LiveRDDPartition = { + partitions.getOrElseUpdate(blockName, new LiveRDDPartition(blockName)) + } + + def removePartition(blockName: String): Unit = partitions.remove(blockName) + + def distribution(exec: LiveExecutor): LiveRDDDistribution = { + distributions.getOrElseUpdate(exec.hostPort, new LiveRDDDistribution(exec)) + } + + def removeDistribution(exec: LiveExecutor): Unit = { + distributions.remove(exec.hostPort) + } + + override protected def doUpdate(): Any = { + val parts = if (partitions.nonEmpty) { + Some(partitions.values.toList.sortBy(_.blockName).map(_.toApi())) + } else { + None + } + + val dists = if (distributions.nonEmpty) { + Some(distributions.values.toList.sortBy(_.exec.executorId).map(_.toApi())) + } else { + None + } + + val rdd = new v1.RDDStorageInfo( + info.id, + info.name, + info.numPartitions, + partitions.size, + storageLevel, + memoryUsed, + diskUsed, + dists, + parts) + + new RDDStorageInfoWrapper(rdd) + } + +} + +private object LiveEntityHelpers { + + def newAccumulatorInfos(accums: Iterable[AccumulableInfo]): Seq[v1.AccumulableInfo] = { + accums + .filter { acc => + // We don't need to store internal or SQL accumulables as their values will be shown in + // other places, so drop them to reduce the memory usage. + !acc.internal && (!acc.metadata.isDefined || + acc.metadata.get != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) + } + .map { acc => + new v1.AccumulableInfo( + acc.id, + acc.name.map(_.intern()).orNull, + acc.update.map(_.toString()), + acc.value.map(_.toString()).orNull) + } + .toSeq + } + +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 4a4ed954d689e..5f69949c618fd 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -71,7 +71,7 @@ private[v1] object AllStagesResource { val taskData = if (includeDetails) { Some(stageUiData.taskData.map { case (k, v) => - k -> convertTaskData(v, stageUiData.lastUpdateTime) }) + k -> convertTaskData(v, stageUiData.lastUpdateTime) }.toMap) } else { None } @@ -88,7 +88,7 @@ private[v1] object AllStagesResource { memoryBytesSpilled = summary.memoryBytesSpilled, diskBytesSpilled = summary.diskBytesSpilled ) - }) + }.toMap) } else { None } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 31659b25db318..bff6f90823f40 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -16,11 +16,11 @@ */ package org.apache.spark.status.api.v1 +import java.lang.{Long => JLong} import java.util.Date -import scala.collection.Map - import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus @@ -129,9 +129,13 @@ class RDDDataDistribution private[spark]( val memoryUsed: Long, val memoryRemaining: Long, val diskUsed: Long, + @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryUsed: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val offHeapMemoryUsed: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryRemaining: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val offHeapMemoryRemaining: Option[Long]) class RDDPartitionInfo private[spark]( @@ -179,7 +183,8 @@ class TaskData private[spark]( val index: Int, val attempt: Int, val launchTime: Date, - val duration: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[JLong]) + val duration: Option[Long], val executorId: String, val host: String, val status: String, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala new file mode 100644 index 0000000000000..9579accd2cba7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import com.fasterxml.jackson.annotation.JsonIgnore + +import org.apache.spark.status.KVUtils._ +import org.apache.spark.status.api.v1._ +import org.apache.spark.util.kvstore.KVIndex + +private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { + + @JsonIgnore @KVIndex + def id: String = info.id + +} + +private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { + + @JsonIgnore @KVIndex + private[this] val id: String = info.id + + @JsonIgnore @KVIndex("active") + private[this] val active: Boolean = info.isActive + + @JsonIgnore @KVIndex("host") + val host: String = info.hostPort.split(":")(0) + +} + +/** + * Keep track of the existing stages when the job was submitted, and those that were + * completed during the job's execution. This allows a more accurate acounting of how + * many tasks were skipped for the job. + */ +private[spark] class JobDataWrapper( + val info: JobData, + val skippedStages: Set[Int]) { + + @JsonIgnore @KVIndex + private[this] val id: Int = info.jobId + +} + +private[spark] class StageDataWrapper( + val info: StageData, + val jobIds: Set[Int]) { + + @JsonIgnore @KVIndex + def id: Array[Int] = Array(info.stageId, info.attemptId) + +} + +private[spark] class TaskDataWrapper(val info: TaskData) { + + @JsonIgnore @KVIndex + def id: Long = info.taskId + +} + +private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { + + @JsonIgnore @KVIndex + def id: Int = info.id + + @JsonIgnore @KVIndex("cached") + def cached: Boolean = info.numCachedPartitions > 0 + +} + +private[spark] class ExecutorStageSummaryWrapper( + val stageId: Int, + val stageAttemptId: Int, + val executorId: String, + val info: ExecutorStageSummary) { + + @JsonIgnore @KVIndex + val id: Array[Any] = Array(stageId, stageAttemptId, executorId) + + @JsonIgnore @KVIndex("stage") + private[this] val stage: Array[Int] = Array(stageId, stageAttemptId) + +} diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala new file mode 100644 index 0000000000000..afb116ac08cb2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.util.{Date, Properties} + +import scala.collection.JavaConverters._ +import scala.reflect.{classTag, ClassTag} + +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster._ +import org.apache.spark.status.api.v1 +import org.apache.spark.storage._ +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore._ + +class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { + + private var time: Long = _ + private var testDir: File = _ + private var store: KVStore = _ + + before { + time = 0L + testDir = Utils.createTempDir() + store = KVUtils.open(testDir, getClass().getName()) + } + + after { + store.close() + Utils.deleteRecursively(testDir) + } + + test("scheduler events") { + val listener = new AppStatusListener(store) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + check[ApplicationInfoWrapper]("id") { app => + assert(app.info.name === "name") + assert(app.info.id === "id") + assert(app.info.attempts.size === 1) + + val attempt = app.info.attempts.head + assert(attempt.attemptId === Some("attempt")) + assert(attempt.startTime === new Date(time)) + assert(attempt.lastUpdated === new Date(time)) + assert(attempt.endTime.getTime() === -1L) + assert(attempt.sparkUser === "user") + assert(!attempt.completed) + } + + // Start a couple of executors. + time += 1 + val execIds = Array("1", "2") + + execIds.foreach { id => + listener.onExecutorAdded(SparkListenerExecutorAdded(time, id, + new ExecutorInfo(s"$id.example.com", 1, Map()))) + } + + execIds.foreach { id => + check[ExecutorSummaryWrapper](id) { exec => + assert(exec.info.id === id) + assert(exec.info.hostPort === s"$id.example.com") + assert(exec.info.isActive) + } + } + + // Start a job with 2 stages / 4 tasks each + time += 1 + val stages = Seq( + new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"), + new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2")) + + val stageProps = new Properties() + stageProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") + stageProps.setProperty("spark.scheduler.pool", "schedPool") + + listener.onJobStart(SparkListenerJobStart(1, time, stages, null)) + + check[JobDataWrapper](1) { job => + assert(job.info.jobId === 1) + assert(job.info.name === stages.last.name) + assert(job.info.description === None) + assert(job.info.status === JobExecutionStatus.RUNNING) + assert(job.info.submissionTime === Some(new Date(time))) + } + + stages.foreach { info => + check[StageDataWrapper](key(info)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.jobIds === Set(1)) + } + } + + // Submit stage 1 + time += 1 + stages.head.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, stageProps)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 1) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.status === v1.StageStatus.ACTIVE) + assert(stage.info.submissionTime === Some(new Date(stages.head.submissionTime.get))) + } + + // Start tasks from stage 1 + time += 1 + var _taskIdTracker = -1L + def nextTaskId(): Long = { + _taskIdTracker += 1 + _taskIdTracker + } + + def createTasks(count: Int, time: Long): Seq[TaskInfo] = { + (1 to count).map { id => + val exec = execIds(id.toInt % execIds.length) + val taskId = nextTaskId() + new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, id % 2 == 0) + } + } + + val s1Tasks = createTasks(4, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task)) + } + + assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveTasks === s1Tasks.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numActiveTasks === s1Tasks.size) + assert(stage.info.firstTaskLaunchedTime === Some(new Date(s1Tasks.head.launchTime))) + } + + s1Tasks.foreach { task => + check[TaskDataWrapper](task.taskId) { wrapper => + assert(wrapper.info.taskId === task.taskId) + assert(wrapper.info.index === task.index) + assert(wrapper.info.attempt === task.attemptNumber) + assert(wrapper.info.launchTime === new Date(task.launchTime)) + assert(wrapper.info.executorId === task.executorId) + assert(wrapper.info.host === task.host) + assert(wrapper.info.status === task.status) + assert(wrapper.info.taskLocality === task.taskLocality.toString()) + assert(wrapper.info.speculative === task.speculative) + } + } + + // Send executor metrics update. Only update one metric to avoid a lot of boilerplate code. + s1Tasks.foreach { task => + val accum = new AccumulableInfo(1L, Some(InternalAccumulator.MEMORY_BYTES_SPILLED), + Some(1L), None, true, false, None) + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate( + task.executorId, + Seq((task.taskId, stages.head.stageId, stages.head.attemptId, Seq(accum))))) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.memoryBytesSpilled === s1Tasks.size) + + val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") + .first(key(stages.head)).last(key(stages.head)).asScala.toSeq + assert(execs.size > 0) + execs.foreach { exec => + assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2) + } + } + + // Fail one of the tasks, re-start it. + time += 1 + s1Tasks.head.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId, + "taskType", TaskResultLost, s1Tasks.head, null)) + + time += 1 + val reattempt = { + val orig = s1Tasks.head + // Task reattempts have a different ID, but the same index as the original. + new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, orig.executorId, + s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) + } + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, + reattempt)) + + assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1) + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1) + assert(job.info.numActiveTasks === s1Tasks.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === s1Tasks.size) + } + + check[TaskDataWrapper](s1Tasks.head.taskId) { task => + assert(task.info.status === s1Tasks.head.status) + assert(task.info.duration === Some(s1Tasks.head.duration)) + assert(task.info.errorMessage == Some(TaskResultLost.toErrorString)) + } + + check[TaskDataWrapper](reattempt.taskId) { task => + assert(task.info.index === s1Tasks.head.index) + assert(task.info.attempt === reattempt.attemptNumber) + } + + // Succeed all tasks in stage 1. + val pending = s1Tasks.drop(1) ++ Seq(reattempt) + + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) + + time += 1 + pending.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId, + "taskType", Success, task, s1Metrics)) + } + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1) + assert(job.info.numActiveTasks === 0) + assert(job.info.numCompletedTasks === pending.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === pending.size) + } + + pending.foreach { task => + check[TaskDataWrapper](task.taskId) { wrapper => + assert(wrapper.info.errorMessage === None) + assert(wrapper.info.taskMetrics.get.executorCpuTime === 2L) + assert(wrapper.info.taskMetrics.get.executorRunTime === 4L) + } + } + + assert(store.count(classOf[TaskDataWrapper]) === pending.size + 1) + + // End stage 1. + time += 1 + stages.head.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 0) + assert(job.info.numCompletedStages === 1) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.status === v1.StageStatus.COMPLETE) + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === pending.size) + } + + // Submit stage 2. + time += 1 + stages.last.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, stageProps)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 1) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.status === v1.StageStatus.ACTIVE) + assert(stage.info.submissionTime === Some(new Date(stages.last.submissionTime.get))) + } + + // Start and fail all tasks of stage 2. + time += 1 + val s2Tasks = createTasks(4, time) + s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task)) + } + + time += 1 + s2Tasks.foreach { task => + task.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptId, + "taskType", TaskResultLost, task, null)) + } + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1 + s2Tasks.size) + assert(job.info.numActiveTasks === 0) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.numFailedTasks === s2Tasks.size) + assert(stage.info.numActiveTasks === 0) + } + + // Fail stage 2. + time += 1 + stages.last.completionTime = Some(time) + stages.last.failureReason = Some("uh oh") + listener.onStageCompleted(SparkListenerStageCompleted(stages.last)) + + check[JobDataWrapper](1) { job => + assert(job.info.numCompletedStages === 1) + assert(job.info.numFailedStages === 1) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.status === v1.StageStatus.FAILED) + assert(stage.info.numFailedTasks === s2Tasks.size) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + // - Re-submit stage 2, all tasks, and succeed them and the stage. + val oldS2 = stages.last + val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1, oldS2.name, oldS2.numTasks, + oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics) + + time += 1 + newS2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, stageProps)) + assert(store.count(classOf[StageDataWrapper]) === 3) + + val newS2Tasks = createTasks(4, time) + + newS2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task)) + } + + time += 1 + newS2Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptId, "taskType", Success, + task, null)) + } + + time += 1 + newS2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(newS2)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 0) + assert(job.info.numFailedStages === 1) + assert(job.info.numCompletedStages === 2) + } + + check[StageDataWrapper](key(newS2)) { stage => + assert(stage.info.status === v1.StageStatus.COMPLETE) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === newS2Tasks.size) + } + + // End job. + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.status === JobExecutionStatus.SUCCEEDED) + } + + // Submit a second job that re-uses stage 1 and stage 2. Stage 1 won't be re-run, but + // stage 2 will. In any case, the DAGScheduler creates new info structures that are copies + // of the old stages, so mimic that behavior here. The "new" stage 1 is submitted without + // a submission time, which means it is "skipped", and the stage 2 re-execution should not + // change the stats of the already finished job. + time += 1 + val j2Stages = Seq( + new StageInfo(3, 0, "stage1", 4, Nil, Nil, "details1"), + new StageInfo(4, 0, "stage2", 4, Nil, Seq(3), "details2")) + j2Stages.last.submissionTime = Some(time) + listener.onJobStart(SparkListenerJobStart(2, time, j2Stages, null)) + assert(store.count(classOf[JobDataWrapper]) === 2) + + listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.head, stageProps)) + listener.onStageCompleted(SparkListenerStageCompleted(j2Stages.head)) + listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.last, stageProps)) + assert(store.count(classOf[StageDataWrapper]) === 5) + + time += 1 + val j2s2Tasks = createTasks(4, time) + + j2s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, j2Stages.last.attemptId, + task)) + } + + time += 1 + j2s2Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, j2Stages.last.attemptId, + "taskType", Success, task, null)) + } + + time += 1 + j2Stages.last.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(j2Stages.last)) + + time += 1 + listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.numCompletedStages === 2) + assert(job.info.numCompletedTasks === s1Tasks.size + s2Tasks.size) + } + + check[JobDataWrapper](2) { job => + assert(job.info.status === JobExecutionStatus.SUCCEEDED) + assert(job.info.numCompletedStages === 1) + assert(job.info.numCompletedTasks === j2s2Tasks.size) + assert(job.info.numSkippedStages === 1) + assert(job.info.numSkippedTasks === s1Tasks.size) + } + + // Blacklist an executor. + time += 1 + listener.onExecutorBlacklisted(SparkListenerExecutorBlacklisted(time, "1", 42)) + check[ExecutorSummaryWrapper]("1") { exec => + assert(exec.info.isBlacklisted) + } + + time += 1 + listener.onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted(time, "1")) + check[ExecutorSummaryWrapper]("1") { exec => + assert(!exec.info.isBlacklisted) + } + + // Blacklist a node. + time += 1 + listener.onNodeBlacklisted(SparkListenerNodeBlacklisted(time, "1.example.com", 2)) + check[ExecutorSummaryWrapper]("1") { exec => + assert(exec.info.isBlacklisted) + } + + time += 1 + listener.onNodeUnblacklisted(SparkListenerNodeUnblacklisted(time, "1.example.com")) + check[ExecutorSummaryWrapper]("1") { exec => + assert(!exec.info.isBlacklisted) + } + + // Stop executors. + listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "1", "Test")) + listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "2", "Test")) + + Seq("1", "2").foreach { id => + check[ExecutorSummaryWrapper](id) { exec => + assert(exec.info.id === id) + assert(!exec.info.isActive) + } + } + + // End the application. + listener.onApplicationEnd(SparkListenerApplicationEnd(42L)) + + check[ApplicationInfoWrapper]("id") { app => + assert(app.info.name === "name") + assert(app.info.id === "id") + assert(app.info.attempts.size === 1) + + val attempt = app.info.attempts.head + assert(attempt.attemptId === Some("attempt")) + assert(attempt.startTime === new Date(1L)) + assert(attempt.lastUpdated === new Date(42L)) + assert(attempt.endTime === new Date(42L)) + assert(attempt.duration === 41L) + assert(attempt.sparkUser === "user") + assert(attempt.completed) + } + } + + test("storage events") { + val listener = new AppStatusListener(store) + val maxMemory = 42L + + // Register a couple of block managers. + val bm1 = BlockManagerId("1", "1.example.com", 42) + val bm2 = BlockManagerId("2", "2.example.com", 84) + Seq(bm1, bm2).foreach { bm => + listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId, + new ExecutorInfo(bm.host, 1, Map()))) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory)) + check[ExecutorSummaryWrapper](bm.executorId) { exec => + assert(exec.info.maxMemory === maxMemory) + } + } + + val rdd1b1 = RDDBlockId(1, 1) + val level = StorageLevel.MEMORY_AND_DISK + + // Submit a stage and make sure the RDD is recorded. + val rddInfo = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil) + val stage = new StageInfo(1, 0, "stage1", 4, Seq(rddInfo), Nil, "details1") + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.name === rddInfo.name) + assert(wrapper.info.numPartitions === rddInfo.numPartitions) + assert(wrapper.info.storageLevel === rddInfo.storageLevel.description) + } + + // Add partition 1 replicated on two block managers. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, level, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 1L) + assert(wrapper.info.diskUsed === 1L) + + assert(wrapper.info.dataDistribution.isDefined) + assert(wrapper.info.dataDistribution.get.size === 1) + + val dist = wrapper.info.dataDistribution.get.head + assert(dist.address === bm1.hostPort) + assert(dist.memoryUsed === 1L) + assert(dist.diskUsed === 1L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + assert(wrapper.info.partitions.isDefined) + assert(wrapper.info.partitions.get.size === 1) + + val part = wrapper.info.partitions.get.head + assert(part.blockName === rdd1b1.name) + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 1L) + assert(part.diskUsed === 1L) + assert(part.executors === Seq(bm1.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 1L) + assert(exec.info.diskUsed === 1L) + } + + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, level, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 2L) + assert(wrapper.info.diskUsed === 2L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 1L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm2.hostPort).get + assert(dist.memoryUsed === 1L) + assert(dist.diskUsed === 1L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get(0) + assert(part.memoryUsed === 2L) + assert(part.diskUsed === 2L) + assert(part.executors === Seq(bm1.executorId, bm2.executorId)) + } + + check[ExecutorSummaryWrapper](bm2.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 1L) + assert(exec.info.diskUsed === 1L) + } + + // Add a second partition only to bm 1. + val rdd1b2 = RDDBlockId(1, 2) + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b2, level, + 3L, 3L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 5L) + assert(wrapper.info.diskUsed === 5L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 2L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get + assert(dist.memoryUsed === 4L) + assert(dist.diskUsed === 4L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get.find(_.blockName === rdd1b2.name).get + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 3L) + assert(part.diskUsed === 3L) + assert(part.executors === Seq(bm1.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 2L) + assert(exec.info.memoryUsed === 4L) + assert(exec.info.diskUsed === 4L) + } + + // Remove block 1 from bm 1. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, + StorageLevel.NONE, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 4L) + assert(wrapper.info.diskUsed === 4L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 2L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get + assert(dist.memoryUsed === 3L) + assert(dist.diskUsed === 3L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get.find(_.blockName === rdd1b1.name).get + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 1L) + assert(part.diskUsed === 1L) + assert(part.executors === Seq(bm2.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 3L) + assert(exec.info.diskUsed === 3L) + } + + // Remove block 2 from bm 2. This should leave only block 2 info in the store. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, + StorageLevel.NONE, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 3L) + assert(wrapper.info.diskUsed === 3L) + assert(wrapper.info.dataDistribution.get.size === 1L) + assert(wrapper.info.partitions.get.size === 1L) + assert(wrapper.info.partitions.get(0).blockName === rdd1b2.name) + } + + check[ExecutorSummaryWrapper](bm2.executorId) { exec => + assert(exec.info.rddBlocks === 0L) + assert(exec.info.memoryUsed === 0L) + assert(exec.info.diskUsed === 0L) + } + + // Unpersist RDD1. + listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd1b1.rddId)) + intercept[NoSuchElementException] { + check[RDDStorageInfoWrapper](rdd1b1.rddId) { _ => () } + } + + } + + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) + + private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = { + val value = store.read(classTag[T].runtimeClass, key).asInstanceOf[T] + fn(value) + } + +} diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dd299e074535e..45b8870f3b62f 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,8 @@ object MimaExcludes { // Exclude rules for 2.3.x lazy val v23excludes = v22excludes ++ Seq( + // SPARK-18085: Better History Server scalability for many / large applications + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), // [SPARK-20495][SQL] Add StorageLevel to cacheTable API ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable"), From 886903cf2e9cc6d4e6df359892e1e5a4417d9a49 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 29 Sep 2017 14:29:06 -0700 Subject: [PATCH 2/6] Hide a couple of classes a little more. --- core/src/main/scala/org/apache/spark/status/KVUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index a5f0d02e2e6e8..a2c9c9cca8e7a 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -38,7 +38,7 @@ private[spark] object KVUtils extends Logging { * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as * the API serializer. */ - class KVStoreScalaSerializer extends KVStoreSerializer { + private[spark] class KVStoreScalaSerializer extends KVStoreSerializer { mapper.registerModule(DefaultScalaModule) mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) @@ -68,6 +68,6 @@ private[spark] object KVUtils extends Logging { db } - class MetadataMismatchException extends Exception + private[spark]class MetadataMismatchException extends Exception } From dc7bb5cdcbd7f72844c33c7c6fbc36f00c9b2c3b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 2 Oct 2017 11:04:01 -0700 Subject: [PATCH 3/6] Feedback. --- .../spark/status/AppStatusListener.scala | 25 +++++++++---------- .../spark/status/AppStatusListenerSuite.scala | 20 ++++++++------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 870459a09c108..0ee01c0a55380 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -19,7 +19,6 @@ package org.apache.spark.status import java.util.Date -import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark._ @@ -29,7 +28,6 @@ import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.scope._ import org.apache.spark.util.kvstore.KVStore /** @@ -156,8 +154,8 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } override def onJobStart(event: SparkListenerJobStart): Unit = { - // Compute (a potential underestimate of) the number of tasks that will be run by this job. - // This may be an underestimate because the job start event references all of the result + // Compute (a potential over-estimate of) the number of tasks that will be run by this job. + // This may be an over-estimate because the job start event references all of the result // stages' transitive stage dependencies, but some of these stages might be skipped if their // output is available from earlier runs. // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. @@ -175,19 +173,24 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log val job = new LiveJob( event.jobId, lastStageName, - Option(event.time).filter(_ >= 0).map(new Date(_)), + Some(new Date(event.time)), event.stageIds, jobGroup, numTasks) liveJobs.put(event.jobId, job) update(job) + val schedulingPool = Option(event.properties).flatMap { p => + Option(p.getProperty("spark.scheduler.pool")) + }.getOrElse(SparkUI.DEFAULT_POOL_NAME) + event.stageInfos.foreach { stageInfo => // A new job submission may re-use an existing stage, so this code needs to do an update // instead of just a write. val stage = getOrCreateStage(stageInfo) - stage.jobs = stage.jobs :+ job + stage.jobs :+= job stage.jobIds += event.jobId + stage.schedulingPool = schedulingPool update(stage) } @@ -200,7 +203,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log case JobFailed(_) => JobExecutionStatus.FAILED } - job.completionTime = if (event.time != -1) Some(new Date(event.time)) else None + job.completionTime = Some(new Date(event.time)) update(job) } } @@ -215,10 +218,6 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log .toSeq stage.jobIds = stage.jobs.map(_.jobId).toSet - stage.schedulingPool = Option(event.properties).flatMap { p => - Option(p.getProperty("spark.scheduler.pool")) - }.getOrElse(SparkUI.DEFAULT_POOL_NAME) - stage.jobs.foreach { job => job.completedStages = job.completedStages - event.stageInfo.stageId job.activeStages += 1 @@ -348,10 +347,10 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log // Because of SPARK-20205, old event logs may contain valid stages without a submission time // in their start event. In those cases, we can only detect whether a stage was skipped by // waiting until the completion event, at which point the field would have been set. - val skipped = !event.stageInfo.submissionTime.isDefined stage.status = event.stageInfo.failureReason match { case Some(_) => v1.StageStatus.FAILED - case None => if (skipped) v1.StageStatus.SKIPPED else v1.StageStatus.COMPLETE + case _ if event.stageInfo.submissionTime.isDefined => v1.StageStatus.COMPLETE + case _ => v1.StageStatus.SKIPPED } update(stage) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index afb116ac08cb2..9fff1297b1d2b 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -101,11 +101,11 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"), new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2")) - val stageProps = new Properties() - stageProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") - stageProps.setProperty("spark.scheduler.pool", "schedPool") + val jobProps = new Properties() + jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") + jobProps.setProperty("spark.scheduler.pool", "schedPool") - listener.onJobStart(SparkListenerJobStart(1, time, stages, null)) + listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps)) check[JobDataWrapper](1) { job => assert(job.info.jobId === 1) @@ -113,11 +113,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(job.info.description === None) assert(job.info.status === JobExecutionStatus.RUNNING) assert(job.info.submissionTime === Some(new Date(time))) + assert(job.info.jobGroup === Some("jobGroup")) } stages.foreach { info => check[StageDataWrapper](key(info)) { stage => assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.info.schedulingPool === "schedPool") assert(stage.jobIds === Set(1)) } } @@ -125,7 +127,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Submit stage 1 time += 1 stages.head.submissionTime = Some(time) - listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, stageProps)) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, jobProps)) check[JobDataWrapper](1) { job => assert(job.info.numActiveStages === 1) @@ -298,7 +300,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Submit stage 2. time += 1 stages.last.submissionTime = Some(time) - listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, stageProps)) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, jobProps)) check[JobDataWrapper](1) { job => assert(job.info.numActiveStages === 1) @@ -358,7 +360,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { time += 1 newS2.submissionTime = Some(time) - listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, stageProps)) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps)) assert(store.count(classOf[StageDataWrapper]) === 3) val newS2Tasks = createTasks(4, time) @@ -411,9 +413,9 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { listener.onJobStart(SparkListenerJobStart(2, time, j2Stages, null)) assert(store.count(classOf[JobDataWrapper]) === 2) - listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.head, stageProps)) + listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.head, jobProps)) listener.onStageCompleted(SparkListenerStageCompleted(j2Stages.head)) - listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.last, stageProps)) + listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.last, jobProps)) assert(store.count(classOf[StageDataWrapper]) === 5) time += 1 From d1fc7ac1bf30e6a3a21a6dbbc670072d5557435a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 2 Oct 2017 11:15:57 -0700 Subject: [PATCH 4/6] Use more code that was moved to KVUtils. --- .../deploy/history/FsHistoryProvider.scala | 34 +++---------------- .../org/apache/spark/status/KVUtils.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 2 +- 3 files changed, 7 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f9ef4f53e5c32..3e44abce19397 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -130,29 +130,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Visible for testing. private[history] val listing: KVStore = storePath.map { path => val dbPath = new File(path, "listing.ldb") + val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, logDir.toString()) def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) try { - val db = openDB() - val meta = db.getMetadata(classOf[KVStoreMetadata]) - - if (meta == null) { - db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir)) - db - } else if (meta.version != CURRENT_LISTING_VERSION || !logDir.equals(meta.logDir)) { - logInfo("Detected mismatched config in existing DB, deleting...") - db.close() - Utils.deleteRecursively(dbPath) - openDB() - } else { - db - } + open(new File(path, "listing.ldb"), metadata) } catch { - case _: UnsupportedStoreVersionException => + case _: UnsupportedStoreVersionException | _: MetadataMismatchException => logInfo("Detected incompatible DB versions, deleting...") Utils.deleteRecursively(dbPath) - openDB() + open(new File(path, "listing.ldb"), metadata) } }.getOrElse(new InMemoryStore()) @@ -721,19 +709,7 @@ private[history] object FsHistoryProvider { private[history] val CURRENT_LISTING_VERSION = 1L } -/** - * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as - * the API serializer. - */ -private class KVStoreScalaSerializer extends KVStoreSerializer { - - mapper.registerModule(DefaultScalaModule) - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) - mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) - -} - -private[history] case class KVStoreMetadata( +private[history] case class FsHistoryProviderMetadata( version: Long, logDir: String) diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index a2c9c9cca8e7a..4638511944c61 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -68,6 +68,6 @@ private[spark] object KVUtils extends Logging { db } - private[spark]class MetadataMismatchException extends Exception + private[spark] class MetadataMismatchException extends Exception } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 2141934c92640..03bd3eaf579f3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -611,7 +611,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc // Manually overwrite the version in the listing db; this should cause the new provider to // discard all data because the versions don't match. - val meta = new KVStoreMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1, + val meta = new FsHistoryProviderMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1, conf.get(LOCAL_STORE_DIR).get) oldProvider.listing.setMetadata(meta) oldProvider.stop() From ca4374612abdaab7cd1c449e65d87878c68e15d2 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 18 Oct 2017 10:50:10 -0700 Subject: [PATCH 5/6] Add method javadoc. --- core/src/main/scala/org/apache/spark/status/LiveEntity.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index a92c78d6e057c..63fa36580bc7d 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -41,6 +41,10 @@ private[spark] abstract class LiveEntity { store.write(doUpdate()) } + /** + * Returns an updated view of entity data, to be stored in the status store, reflecting the + * latest information collected by the listener. + */ protected def doUpdate(): Any } From 53357a1bcca24149d258f6c1118ee2166a8389bb Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 24 Oct 2017 15:12:23 -0700 Subject: [PATCH 6/6] Feedback. --- .../deploy/history/FsHistoryProvider.scala | 2 -- .../spark/status/AppStatusListener.scala | 21 ++++++++----------- .../spark/status/AppStatusListenerSuite.scala | 14 ++++++------- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3e44abce19397..cf97597b484d8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -132,8 +132,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val dbPath = new File(path, "listing.ldb") val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, logDir.toString()) - def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) - try { open(new File(path, "listing.ldb"), metadata) } catch { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 0ee01c0a55380..f120685c941df 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -180,20 +180,14 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log liveJobs.put(event.jobId, job) update(job) - val schedulingPool = Option(event.properties).flatMap { p => - Option(p.getProperty("spark.scheduler.pool")) - }.getOrElse(SparkUI.DEFAULT_POOL_NAME) - event.stageInfos.foreach { stageInfo => // A new job submission may re-use an existing stage, so this code needs to do an update // instead of just a write. val stage = getOrCreateStage(stageInfo) stage.jobs :+= job stage.jobIds += event.jobId - stage.schedulingPool = schedulingPool update(stage) } - } override def onJobEnd(event: SparkListenerJobEnd): Unit = { @@ -211,6 +205,9 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { val stage = getOrCreateStage(event.stageInfo) stage.status = v1.StageStatus.ACTIVE + stage.schedulingPool = Option(event.properties).flatMap { p => + Option(p.getProperty("spark.scheduler.pool")) + }.getOrElse(SparkUI.DEFAULT_POOL_NAME) // Look at all active jobs to find the ones that mention this stage. stage.jobs = liveJobs.values @@ -257,16 +254,16 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log } override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { + // Call update on the task so that the "getting result" time is written to the store; the + // value is part of the mutable TaskInfo state that the live entity already references. liveTasks.get(event.taskInfo.taskId).foreach { task => update(task) } } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { - // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task - // completion event is for. Let's just drop it here. This means we might have some speculation - // tasks on the web ui that are never marked as complete. - if (event.taskInfo == null || event.stageAttemptId == -1) { + // TODO: can this really happen? + if (event.taskInfo == null) { return } @@ -357,7 +354,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log stage.jobs.foreach { job => stage.status match { case v1.StageStatus.COMPLETE => - job.completedStages = job.completedStages + event.stageInfo.stageId + job.completedStages += event.stageInfo.stageId case v1.StageStatus.SKIPPED => job.skippedStages += event.stageInfo.stageId job.skippedTasks += event.stageInfo.numTasks @@ -418,7 +415,7 @@ private class AppStatusListener(kvstore: KVStore) extends SparkListener with Log override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { event.blockUpdatedInfo.blockId match { case block: RDDBlockId => updateRDDBlock(event, block) - case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too. + case _ => // TODO: API only covers RDD storage. } } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 9fff1297b1d2b..6f7a0c14dd684 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -119,7 +119,6 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { stages.foreach { info => check[StageDataWrapper](key(info)) { stage => assert(stage.info.status === v1.StageStatus.PENDING) - assert(stage.info.schedulingPool === "schedPool") assert(stage.jobIds === Set(1)) } } @@ -136,6 +135,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { check[StageDataWrapper](key(stages.head)) { stage => assert(stage.info.status === v1.StageStatus.ACTIVE) assert(stage.info.submissionTime === Some(new Date(stages.head.submissionTime.get))) + assert(stage.info.schedulingPool === "schedPool") } // Start tasks from stage 1 @@ -196,13 +196,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { check[StageDataWrapper](key(stages.head)) { stage => assert(stage.info.memoryBytesSpilled === s1Tasks.size) + } - val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") - .first(key(stages.head)).last(key(stages.head)).asScala.toSeq - assert(execs.size > 0) - execs.foreach { exec => - assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2) - } + val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage") + .first(key(stages.head)).last(key(stages.head)).asScala.toSeq + assert(execs.size > 0) + execs.foreach { exec => + assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2) } // Fail one of the tasks, re-start it.