diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 47d3af47f4122..b9ea84368f963 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1437,7 +1437,7 @@ abstract class RDD[T: ClassTag]( @transient private[spark] val creationSite = sc.getCallSite() /** Dem scopes. Tis null if de scope is not defined'eh. TODO: Make this private[spark]. */ - @transient val scope = sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) + @transient private[spark] val scope = sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 034525b56f59c..9192bc3f9e7d9 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -26,7 +26,9 @@ class RDDInfo( val id: Int, val name: String, val numPartitions: Int, - var storageLevel: StorageLevel) + var storageLevel: StorageLevel, + val scope: String, + val parentIds: Seq[Int]) extends Ordered[RDDInfo] { var numCachedPartitions = 0 @@ -52,6 +54,7 @@ class RDDInfo( private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(rdd.id.toString) - new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel) + val parentIds = rdd.dependencies.map(_.rdd.id) + new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, rdd.scope, parentIds) } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index adfa6bbada256..183649c0a4776 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -25,6 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab} import org.apache.spark.ui.storage.{StorageListener, StorageTab} +import org.apache.spark.ui.viz.VisualizationListener /** * Top level user interface for a Spark application. @@ -38,6 +39,7 @@ private[spark] class SparkUI private ( val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, + val visualizationListener: VisualizationListener, var appName: String, val basePath: String) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") @@ -142,13 +144,16 @@ private[spark] object SparkUI { val storageStatusListener = new StorageStatusListener val executorsListener = new ExecutorsListener(storageStatusListener) val storageListener = new StorageListener(storageStatusListener) + val visualizationListener = new VisualizationListener listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) + listenerBus.addListener(visualizationListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, appName, basePath) + executorsListener, _jobProgressListener, storageListener, visualizationListener, + appName, basePath) } } diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala b/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala new file mode 100644 index 0000000000000..22cce643dacca --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.viz + +import org.apache.spark.scheduler._ + +/** + * A SparkListener that... + */ +private[spark] class VisualizationListener extends SparkListener { + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + synchronized { + stageSubmitted.stageInfo.rddInfos + } + } +} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 474f79fb756f6..5cc625a727592 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -367,8 +367,11 @@ private[spark] object JsonProtocol { def rddInfoToJson(rddInfo: RDDInfo): JValue = { val storageLevel = storageLevelToJson(rddInfo.storageLevel) + val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList) ("RDD ID" -> rddInfo.id) ~ ("Name" -> rddInfo.name) ~ + ("Scope" -> Option(rddInfo.scope)) ~ + ("Parent IDs" -> parentIds) ~ ("Storage Level" -> storageLevel) ~ ("Number of Partitions" -> rddInfo.numPartitions) ~ ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ @@ -597,7 +600,7 @@ private[spark] object JsonProtocol { val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) val stageName = (json \ "Stage Name").extract[String] val numTasks = (json \ "Number of Tasks").extract[Int] - val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson(_)) + val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson) val details = (json \ "Details").extractOpt[String].getOrElse("") val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]) val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) @@ -783,6 +786,10 @@ private[spark] object JsonProtocol { def rddInfoFromJson(json: JValue): RDDInfo = { val rddId = (json \ "RDD ID").extract[Int] val name = (json \ "Name").extract[String] + val scope = Utils.jsonOption(json \ "Scope").map(_.extract[String]).orNull + val parentIds = Utils.jsonOption(json \ "Parent IDs") + .map { l => l.extract[List[JValue]].map(_.extract[Int]) } + .getOrElse(Seq.empty) val storageLevel = storageLevelFromJson(json \ "Storage Level") val numPartitions = (json \ "Number of Partitions").extract[Int] val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] @@ -790,7 +797,7 @@ private[spark] object JsonProtocol { val tachyonSize = (json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel) + val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, scope, parentIds) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize rddInfo.tachyonSize = tachyonSize diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala index ef5c55f91c39a..8f435114072a7 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala @@ -287,8 +287,8 @@ class StorageSuite extends FunSuite { // For testing StorageUtils.updateRddInfo private def stockRDDInfos: Seq[RDDInfo] = { - val info0 = new RDDInfo(0, "0", 10, memAndDisk) - val info1 = new RDDInfo(1, "1", 3, memAndDisk) + val info0 = new RDDInfo(0, "0", 10, memAndDisk, "scoop", Seq(3)) + val info1 = new RDDInfo(1, "1", 3, memAndDisk, "scoop", Seq(4)) Seq(info0, info1) } diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 3744e479d2f05..59affc2ce7c35 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -35,10 +35,10 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { private val none = StorageLevel.NONE private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) private val taskInfo1 = new TaskInfo(1, 1, 1, 1, "big", "cat", TaskLocality.ANY, false) - private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly) - private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly) - private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk) - private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk) + private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly, "scoop", Seq(10)) + private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly, "scoop", Seq(10)) + private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk, "scoop", Seq(10)) + private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk, "scoop", Seq(10)) private val bm1 = BlockManagerId("big", "dog", 1) before { @@ -70,7 +70,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { assert(storageListener.rddInfoList.size === 2) // Submitting RDDInfos with duplicate IDs does nothing - val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY) + val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, "scoop", Seq(10)) rddInfo0Cached.numCachedPartitions = 1 val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached)) @@ -166,8 +166,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { test("verify StorageTab contains all cached rdds") { - val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly) - val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly) + val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, "scoop", Seq(4)) + val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, "scoop", Seq(4)) val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), "details") val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), "details") val taskMetrics0 = new TaskMetrics diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a2be724254d7c..30177334bdedc 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -320,6 +320,16 @@ class JsonProtocolSuite extends FunSuite { assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent)) } + test("RDDInfo backward compatibility") { + // Prior to Spark 1.4.0, RDDInfo did not have a "Scope" and "Parent IDs" properties + val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, "fable", Seq(1, 6, 8)) + val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo) + .removeField({ _._1 == "Scope"}) + .removeField({ _._1 == "Parent IDs"}) + val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, null, Seq.empty) + assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson)) + } + /** -------------------------- * | Helper test running methods | * --------------------------- */ @@ -642,7 +652,7 @@ class JsonProtocolSuite extends FunSuite { } private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { - val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK) + val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, "layer", Seq(1, 4, 7)) r.numCachedPartitions = c r.memSize = d r.diskSize = e @@ -782,6 +792,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 101, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1165,6 +1177,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 1, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1204,6 +1218,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 2, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1220,6 +1236,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 3, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1259,6 +1277,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 3, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1275,6 +1295,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 4, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1291,6 +1313,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 5, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1330,6 +1354,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 4, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1346,6 +1372,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 5, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1362,6 +1390,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 6, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true, @@ -1378,6 +1408,8 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 7, | "Name": "mayor", + | "Scope": "layer", + | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, | "Use Memory": true,