Skip to content

Commit

Permalink
Expose the necessary information in RDDInfo
Browse files Browse the repository at this point in the history
This includes the scope field that we added in previous commits,
and the parent IDs for tracking the lineage through the listener
API.
  • Loading branch information
Andrew Or committed Apr 17, 2015
1 parent a9ed4f9 commit 5143523
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 16 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ abstract class RDD[T: ClassTag](
@transient private[spark] val creationSite = sc.getCallSite()

/** Dem scopes. Tis null if de scope is not defined'eh. TODO: Make this private[spark]. */
@transient val scope = sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)
@transient private[spark] val scope = sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)

private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")

Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
var storageLevel: StorageLevel)
var storageLevel: StorageLevel,
val scope: String,
val parentIds: Seq[Int])
extends Ordered[RDDInfo] {

var numCachedPartitions = 0
Expand All @@ -52,6 +54,7 @@ class RDDInfo(
private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel)
val parentIds = rdd.dependencies.map(_.rdd.id)
new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, rdd.scope, parentIds)
}
}
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
import org.apache.spark.ui.viz.VisualizationListener

/**
* Top level user interface for a Spark application.
Expand All @@ -38,6 +39,7 @@ private[spark] class SparkUI private (
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
val visualizationListener: VisualizationListener,
var appName: String,
val basePath: String)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
Expand Down Expand Up @@ -142,13 +144,16 @@ private[spark] object SparkUI {
val storageStatusListener = new StorageStatusListener
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
val visualizationListener = new VisualizationListener

listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(visualizationListener)

new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, appName, basePath)
executorsListener, _jobProgressListener, storageListener, visualizationListener,
appName, basePath)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui.viz

import org.apache.spark.scheduler._

/**
* A SparkListener that...
*/
private[spark] class VisualizationListener extends SparkListener {

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
synchronized {
stageSubmitted.stageInfo.rddInfos
}
}
}
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,11 @@ private[spark] object JsonProtocol {

def rddInfoToJson(rddInfo: RDDInfo): JValue = {
val storageLevel = storageLevelToJson(rddInfo.storageLevel)
val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList)
("RDD ID" -> rddInfo.id) ~
("Name" -> rddInfo.name) ~
("Scope" -> Option(rddInfo.scope)) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
Expand Down Expand Up @@ -597,7 +600,7 @@ private[spark] object JsonProtocol {
val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson(_))
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
val details = (json \ "Details").extractOpt[String].getOrElse("")
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
Expand Down Expand Up @@ -783,14 +786,18 @@ private[spark] object JsonProtocol {
def rddInfoFromJson(json: JValue): RDDInfo = {
val rddId = (json \ "RDD ID").extract[Int]
val name = (json \ "Name").extract[String]
val scope = Utils.jsonOption(json \ "Scope").map(_.extract[String]).orNull
val parentIds = Utils.jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
val storageLevel = storageLevelFromJson(json \ "Storage Level")
val numPartitions = (json \ "Number of Partitions").extract[Int]
val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
val memSize = (json \ "Memory Size").extract[Long]
val tachyonSize = (json \ "Tachyon Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]

val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, scope, parentIds)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
rddInfo.tachyonSize = tachyonSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ class StorageSuite extends FunSuite {

// For testing StorageUtils.updateRddInfo
private def stockRDDInfos: Seq[RDDInfo] = {
val info0 = new RDDInfo(0, "0", 10, memAndDisk)
val info1 = new RDDInfo(1, "1", 3, memAndDisk)
val info0 = new RDDInfo(0, "0", 10, memAndDisk, "scoop", Seq(3))
val info1 = new RDDInfo(1, "1", 3, memAndDisk, "scoop", Seq(4))
Seq(info0, info1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
private val none = StorageLevel.NONE
private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
private val taskInfo1 = new TaskInfo(1, 1, 1, 1, "big", "cat", TaskLocality.ANY, false)
private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly)
private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk)
private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly, "scoop", Seq(10))
private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly, "scoop", Seq(10))
private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk, "scoop", Seq(10))
private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk, "scoop", Seq(10))
private val bm1 = BlockManagerId("big", "dog", 1)

before {
Expand Down Expand Up @@ -70,7 +70,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
assert(storageListener.rddInfoList.size === 2)

// Submitting RDDInfos with duplicate IDs does nothing
val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY)
val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, "scoop", Seq(10))
rddInfo0Cached.numCachedPartitions = 1
val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
Expand Down Expand Up @@ -166,8 +166,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {

test("verify StorageTab contains all cached rdds") {

val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly)
val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly)
val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, "scoop", Seq(4))
val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, "scoop", Seq(4))
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), "details")
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), "details")
val taskMetrics0 = new TaskMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,16 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
}

test("RDDInfo backward compatibility") {
// Prior to Spark 1.4.0, RDDInfo did not have a "Scope" and "Parent IDs" properties
val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, "fable", Seq(1, 6, 8))
val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
.removeField({ _._1 == "Scope"})
.removeField({ _._1 == "Parent IDs"})
val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, null, Seq.empty)
assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
}

/** -------------------------- *
| Helper test running methods |
* --------------------------- */
Expand Down Expand Up @@ -642,7 +652,7 @@ class JsonProtocolSuite extends FunSuite {
}

private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK)
val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, "layer", Seq(1, 4, 7))
r.numCachedPartitions = c
r.memSize = d
r.diskSize = e
Expand Down Expand Up @@ -782,6 +792,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 101,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand Down Expand Up @@ -1165,6 +1177,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 1,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand Down Expand Up @@ -1204,6 +1218,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 2,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand All @@ -1220,6 +1236,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 3,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand Down Expand Up @@ -1259,6 +1277,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 3,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand All @@ -1275,6 +1295,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 4,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand All @@ -1291,6 +1313,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 5,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand Down Expand Up @@ -1330,6 +1354,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 4,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand All @@ -1346,6 +1372,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 5,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand All @@ -1362,6 +1390,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 6,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand All @@ -1378,6 +1408,8 @@ class JsonProtocolSuite extends FunSuite {
| {
| "RDD ID": 7,
| "Name": "mayor",
| "Scope": "layer",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
Expand Down

0 comments on commit 5143523

Please sign in to comment.