Skip to content

Commit

Permalink
Rename references to viz scopes to viz clusters
Browse files Browse the repository at this point in the history
The term 'scope' is really overloaded. A scope should be logically
agnostic to how it is represented in the UI. We rename all such
references to 'cluster', which represents either an operator scope,
a stage, or a job.
  • Loading branch information
Andrew Or committed May 4, 2015
1 parent ee33d52 commit 5a7faf4
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 45 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ private[spark] object UIUtils extends Logging {
<div id="dag-viz-metadata">
{
graphs.map { g =>
<div class="stage-metadata" stageId={g.rootScope.id} style="display:none">
<div class="stage-metadata" stageId={g.rootCluster.id} style="display:none">
<div class="dot-file">{VizGraph.makeDotFile(g, forJob)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI

/**
* A SparkListener that constructs RDD DAG visualization for the UI.
* A SparkListener that constructs the RDD DAG visualization for the UI.
*/
private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener {
private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
Expand Down
103 changes: 60 additions & 43 deletions core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,86 +25,97 @@ import org.apache.spark.rdd.RDDScope
import org.apache.spark.scheduler.StageInfo

/**
* A representation of a generic scoped graph used for storing visualization information.
* A representation of a generic cluster graph used for storing visualization information.
*
* Each graph is defined with a set of edges and a root scope, which may contain children
* nodes and children scopes. Additionally, a graph may also have edges that enter or exit
* Each graph is defined with a set of edges and a root cluster, which may contain children
* nodes and children clusters. Additionally, a graph may also have edges that enter or exit
* the graph from nodes that belong to adjacent graphs.
*/
private[ui] case class VizGraph(
edges: Seq[VizEdge],
outgoingEdges: Seq[VizEdge],
incomingEdges: Seq[VizEdge],
rootScope: VizScope)
rootCluster: VizCluster)

/** A node in a VizGraph. This represents an RDD. */
private[ui] case class VizNode(id: Int, name: String)

/** A directed edge connecting two nodes in a VizGraph. This represents an RDD dependency. */
private[ui] case class VizEdge(fromId: Int, toId: Int)

/** A cluster in the graph that represents a level in the scope hierarchy. */
private[ui] class VizScope(val id: String, val name: String) {
/**
* A cluster that groups nodes together in a VizGraph.
*
* This represents any grouping of RDDs, including operator scopes (e.g. textFile, flatMap),
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
*/
private[ui] class VizCluster(val id: String, val name: String) {
private val _childrenNodes = new ListBuffer[VizNode]
private val _childrenScopes = new ListBuffer[VizScope]
private val _childrenClusters = new ListBuffer[VizCluster]

def this(id: String) { this(id, id.split(RDDScope.SCOPE_NAME_DELIMITER).head) }

def childrenNodes: Seq[VizNode] = _childrenNodes.iterator.toSeq
def childrenScopes: Seq[VizScope] = _childrenScopes.iterator.toSeq
def childrenClusters: Seq[VizCluster] = _childrenClusters.iterator.toSeq
def attachChildNode(childNode: VizNode): Unit = { _childrenNodes += childNode }
def attachChildScope(childScope: VizScope): Unit = { _childrenScopes += childScope }
def attachChildCluster(childCluster: VizCluster): Unit = { _childrenClusters += childCluster }
}

private[ui] object VizGraph extends Logging {

/**
* Construct a VizGraph for a given stage.
*
* The root scope represents the stage, and all children scopes represent individual
* levels of RDD scopes. Each node represents an RDD, and each edge represents a dependency
* between two RDDs from the parent to the child.
* The root cluster represents the stage, and all children clusters represent RDD operations.
* Each node represents an RDD, and each edge represents a dependency between two RDDs pointing
* from the parent to the child.
*
* This does not currently merge common scopes across stages. This may be worth supporting in
* the future when we decide to group certain stages within the same job under a common scope
* (e.g. part of a SQL query).
*/
def makeVizGraph(stage: StageInfo): VizGraph = {
val edges = new ListBuffer[VizEdge]
val nodes = new mutable.HashMap[Int, VizNode]
val scopes = new mutable.HashMap[String, VizScope] // scope ID -> viz scope
val clusters = new mutable.HashMap[String, VizCluster] // cluster ID -> VizCluster

// Root scope is the stage scope
val stageScopeId = s"stage_${stage.stageId}"
val stageScopeName = s"Stage ${stage.stageId}" +
// Root cluster is the stage cluster
val stageClusterId = s"stage_${stage.stageId}"
val stageClusterName = s"Stage ${stage.stageId}" +
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootScope = new VizScope(stageScopeId, stageScopeName)
val rootCluster = new VizCluster(stageClusterId, stageClusterName)

// Find nodes, edges, and children scopes that belong to this stage. Each node is an RDD
// that lives either directly in the root scope or in one of the children scopes. Each
// children scope represents a level of the RDD scope and must contain at least one RDD.
// Children scopes can be nested if one RDD operation calls another.
// Find nodes, edges, and operator scopes that belong to this stage
stage.rddInfos.foreach { rdd =>
edges ++= rdd.parentIds.map { parentId => VizEdge(parentId, rdd.id) }
val node = nodes.getOrElseUpdate(rdd.id, VizNode(rdd.id, rdd.name))

if (rdd.scope == null) {
// This RDD has no encompassing scope, so we put it directly in the root scope
// This RDD has no encompassing scope, so we put it directly in the root cluster
// This should happen only if an RDD is instantiated outside of a public RDD API
rootScope.attachChildNode(node)
rootCluster.attachChildNode(node)
} else {
// Otherwise, this RDD belongs to an inner scope
val rddScopes = rdd.scope
// Otherwise, this RDD belongs to an inner cluster,
// which may be nested inside of other clusters
val rddClusters = rdd.scope
.split(RDDScope.SCOPE_NESTING_DELIMITER)
.map { scopeId => scopes.getOrElseUpdate(scopeId, new VizScope(scopeId)) }
// Build the scope hierarchy for this RDD
rddScopes.sliding(2).foreach { pc =>
.map { scopeId => clusters.getOrElseUpdate(scopeId, new VizCluster(scopeId)) }
// Build the cluster hierarchy for this RDD
rddClusters.sliding(2).foreach { pc =>
if (pc.size == 2) {
val parentScope = pc(0)
val childScope = pc(1)
parentScope.attachChildScope(childScope)
val parentCluster = pc(0)
val childCluster = pc(1)
parentCluster.attachChildCluster(childCluster)
}
}
// Attach the outermost scope to the root scope, and the RDD to the innermost scope
rddScopes.headOption.foreach { scope => rootScope.attachChildScope(scope) }
rddScopes.lastOption.foreach { scope => scope.attachChildNode(node) }
// Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster
rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) }
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
}
}

// Classify each edge as internal, outgoing or incoming
// This information is needed to reason about how stages relate to each other
val internalEdges = new ListBuffer[VizEdge]
val outgoingEdges = new ListBuffer[VizEdge]
val incomingEdges = new ListBuffer[VizEdge]
Expand All @@ -120,21 +131,23 @@ private[ui] object VizGraph extends Logging {
}
}

VizGraph(internalEdges, outgoingEdges, incomingEdges, rootScope)
VizGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
}

/**
* Generate the content of a dot file that describes the specified graph.
*
* Note that this only uses a minimal subset of features available to the DOT specification.
* More style is added in the visualization later through post-processing in JavaScript.
* Part of the styling must be done here because the rendering library must take certain
* attributes into account when arranging the graph elements. More style is added in the
* visualization later through post-processing in JavaScript.
*
* For the complete specification, see http://www.graphviz.org/Documentation/dotguide.pdf.
* For the complete DOT specification, see http://www.graphviz.org/Documentation/dotguide.pdf.
*/
def makeDotFile(graph: VizGraph, forJob: Boolean): String = {
val dotFile = new StringBuilder
dotFile.append("digraph G {\n")
dotFile.append(makeDotSubgraph(graph.rootScope, forJob, indent = " "))
dotFile.append(makeDotSubgraph(graph.rootCluster, forJob, indent = " "))
graph.edges.foreach { edge =>
dotFile.append(s""" ${edge.fromId}->${edge.toId} [lineInterpolate="basis"];\n""")
}
Expand All @@ -144,25 +157,29 @@ private[ui] object VizGraph extends Logging {
result
}

/** Return the dot representation of a node. */
/**
* Return the dot representation of a node.
*
* On the job page, is displayed as a small circle without labels.
* On the stage page, it is displayed as a box with an embedded label.
*/
private def makeDotNode(node: VizNode, forJob: Boolean): String = {
if (forJob) {
// On the job page, we display RDDs as dots without labels
s"""${node.id} [label=" " shape="circle" padding="5" labelStyle="font-size: 0"]"""
} else {
s"""${node.id} [label="${node.name} (${node.id})"]"""
}
}

/** Return the dot representation of a subgraph recursively. */
private def makeDotSubgraph(scope: VizScope, forJob: Boolean, indent: String): String = {
/** Return the dot representation of a subgraph. */
private def makeDotSubgraph(scope: VizCluster, forJob: Boolean, indent: String): String = {
val subgraph = new StringBuilder
subgraph.append(indent + s"subgraph cluster${scope.id} {\n")
subgraph.append(indent + s""" label="${scope.name}";\n""")
scope.childrenNodes.foreach { node =>
subgraph.append(indent + s" ${makeDotNode(node, forJob)};\n")
}
scope.childrenScopes.foreach { cscope =>
scope.childrenClusters.foreach { cscope =>
subgraph.append(makeDotSubgraph(cscope, forJob, indent + " "))
}
subgraph.append(indent + "}\n")
Expand Down

0 comments on commit 5a7faf4

Please sign in to comment.