Skip to content

Commit

Permalink
Extract visualization logic from listener
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 4, 2015
1 parent 83f9c58 commit 31aae06
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 70 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
import org.apache.spark.ui.viz.VisualizationListener
import org.apache.spark.ui.scope.OperationGraphListener

/**
* Top level user interface for a Spark application.
Expand All @@ -39,7 +39,7 @@ private[spark] class SparkUI private (
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
val visualizationListener: VisualizationListener,
val operationGraphListener: OperationGraphListener,
var appName: String,
val basePath: String)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
Expand Down Expand Up @@ -149,16 +149,16 @@ private[spark] object SparkUI {
val storageStatusListener = new StorageStatusListener
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
val visualizationListener = new VisualizationListener(conf)
val operationGraphListener = new OperationGraphListener(conf)

listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(visualizationListener)
listenerBus.addListener(operationGraphListener)

new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, visualizationListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
appName, basePath)
}
}
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Expand Up @@ -23,7 +23,7 @@ import java.util.{Locale, Date}
import scala.xml.{Node, Text}

import org.apache.spark.Logging
import org.apache.spark.ui.viz.VizGraph
import org.apache.spark.ui.scope.RDDOperationGraph

/** Utility functions for generating XML pages with spark content. */
private[spark] object UIUtils extends Logging {
Expand Down Expand Up @@ -332,12 +332,12 @@ private[spark] object UIUtils extends Logging {
}

/** Return a "DAG visualization" DOM element that expands into a visualization for a stage. */
def showDagVizForStage(stageId: Int, graph: Option[VizGraph]): Seq[Node] = {
def showDagVizForStage(stageId: Int, graph: Option[RDDOperationGraph]): Seq[Node] = {
showDagViz(graph.toSeq, forJob = false)
}

/** Return a "DAG visualization" DOM element that expands into a visualization for a job. */
def showDagVizForJob(jobId: Int, graphs: Seq[VizGraph]): Seq[Node] = {
def showDagVizForJob(jobId: Int, graphs: Seq[RDDOperationGraph]): Seq[Node] = {
showDagViz(graphs, forJob = true)
}

Expand All @@ -348,7 +348,7 @@ private[spark] object UIUtils extends Logging {
* a format that is expected by spark-dag-viz.js. Any changes in the format here must be
* reflected there.
*/
private def showDagViz(graphs: Seq[VizGraph], forJob: Boolean): Seq[Node] = {
private def showDagViz(graphs: Seq[RDDOperationGraph], forJob: Boolean): Seq[Node] = {
<div>
<span class="expand-dag-viz" onclick={s"toggleDagViz($forJob);"}>
<span class="expand-dag-viz-arrow arrow-closed"></span>
Expand All @@ -359,7 +359,7 @@ private[spark] object UIUtils extends Logging {
{
graphs.map { g =>
<div class="stage-metadata" stageId={g.rootCluster.id} style="display:none">
<div class="dot-file">{VizGraph.makeDotFile(g, forJob)}</div>
<div class="dot-file">{RDDOperationGraph.makeDotFile(g, forJob)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
</div>
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Expand Up @@ -304,12 +304,13 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
var content = summary
val appStartTime = listener.startTime
val executorListener = parent.executorListener
val vizListener = parent.vizListener
val operationGraphListener = parent.operationGraphListener

content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
executorListener.executorIdToData, appStartTime)

content ++= UIUtils.showDagVizForJob(jobId, vizListener.getVizGraphsForJob(jobId))
content ++= UIUtils.showDagVizForJob(
jobId, operationGraphListener.getOperationGraphForJob(jobId))

if (shouldShowActiveStages) {
content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
Expand Up @@ -26,7 +26,7 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
val killEnabled = parent.killEnabled
val jobProgresslistener = parent.jobProgressListener
val executorListener = parent.executorsListener
val vizListener = parent.visualizationListener
val operationGraphListener = parent.operationGraphListener

def isFairScheduler: Boolean =
jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Expand Up @@ -27,14 +27,14 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.ui.viz.VizGraph
import org.apache.spark.ui.scope.RDDOperationGraph
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}

/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
private val progressListener = parent.progressListener
private val vizListener = parent.vizListener
private val operationGraphListener = parent.operationGraphListener

def render(request: HttpServletRequest): Seq[Node] = {
progressListener.synchronized {
Expand Down Expand Up @@ -171,7 +171,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</div>
</div>

val dagViz = UIUtils.showDagVizForStage(stageId, vizListener.getVizGraphForStage(stageId))
val dagViz = UIUtils.showDagVizForStage(
stageId, operationGraphListener.getOperationGraphForStage(stageId))

val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
def accumulableRow(acc: AccumulableInfo): Elem =
Expand Down
Expand Up @@ -27,7 +27,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
val sc = parent.sc
val killEnabled = parent.killEnabled
val progressListener = parent.jobProgressListener
val vizListener = parent.visualizationListener
val operationGraphListener = parent.operationGraphListener

attachPage(new AllStagesPage(this))
attachPage(new StagePage(this))
Expand Down
Expand Up @@ -15,47 +15,46 @@
* limitations under the License.
*/

package org.apache.spark.ui.viz
package org.apache.spark.ui.scope

import scala.collection.mutable
import scala.xml.{Node, Unparsed}

import org.apache.spark.SparkConf
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI

/**
* A SparkListener that constructs the RDD DAG visualization for the UI.
* A SparkListener that constructs a DAG of RDD operations.
*/
private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener {
private[ui] class OperationGraphListener(conf: SparkConf) extends SparkListener {
private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
private val stageIdToGraph = new mutable.HashMap[Int, VizGraph]
private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
private val stageIds = new mutable.ArrayBuffer[Int]

// How many jobs or stages to retain graph metadata for
private val retainedStages =
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)

/** Return the graph metadata for the given stage, or None if no such information exists. */
def getVizGraphsForJob(jobId: Int): Seq[VizGraph] = {
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
jobIdToStageIds.get(jobId)
.map { sids => sids.flatMap { sid => stageIdToGraph.get(sid) } }
.getOrElse { Seq.empty }
}

/** Return the graph metadata for the given stage, or None if no such information exists. */
def getVizGraphForStage(stageId: Int): Option[VizGraph] = {
def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = {
stageIdToGraph.get(stageId)
}

/** On job start, construct a VizGraph for each stage in the job for display later. */
/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobId = jobStart.jobId
val stageInfos = jobStart.stageInfos

stageInfos.foreach { stageInfo =>
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = VizGraph.makeVizGraph(stageInfo)
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
}
jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted

Expand Down
Expand Up @@ -15,78 +15,82 @@
* limitations under the License.
*/

package org.apache.spark.ui.viz
package org.apache.spark.ui.scope

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import org.apache.spark.Logging
import org.apache.spark.rdd.OperatorScope
import org.apache.spark.scheduler.StageInfo

/**
* A representation of a generic cluster graph used for storing visualization information.
* A representation of a generic cluster graph used for storing information on RDD operations.
*
* Each graph is defined with a set of edges and a root cluster, which may contain children
* nodes and children clusters. Additionally, a graph may also have edges that enter or exit
* the graph from nodes that belong to adjacent graphs.
*/
private[ui] case class VizGraph(
edges: Seq[VizEdge],
outgoingEdges: Seq[VizEdge],
incomingEdges: Seq[VizEdge],
rootCluster: VizCluster)
private[ui] case class RDDOperationGraph(
edges: Seq[RDDOperationEdge],
outgoingEdges: Seq[RDDOperationEdge],
incomingEdges: Seq[RDDOperationEdge],
rootCluster: RDDOperationCluster)

/** A node in a VizGraph. This represents an RDD. */
private[ui] case class VizNode(id: Int, name: String)
/** A node in an RDDOperationGraph. This represents an RDD. */
private[ui] case class RDDOperationNode(id: Int, name: String)

/** A directed edge connecting two nodes in a VizGraph. This represents an RDD dependency. */
private[ui] case class VizEdge(fromId: Int, toId: Int)
/**
* A directed edge connecting two nodes in an RDDOperationGraph.
* This represents an RDD dependency.
*/
private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)

/**
* A cluster that groups nodes together in a VizGraph.
* A cluster that groups nodes together in an RDDOperationGraph.
*
* This represents any grouping of RDDs, including operator scopes (e.g. textFile, flatMap),
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
*/
private[ui] class VizCluster(val id: String, val name: String) {
private val _childrenNodes = new ListBuffer[VizNode]
private val _childrenClusters = new ListBuffer[VizCluster]

def childrenNodes: Seq[VizNode] = _childrenNodes.iterator.toSeq
def childrenClusters: Seq[VizCluster] = _childrenClusters.iterator.toSeq
def attachChildNode(childNode: VizNode): Unit = { _childrenNodes += childNode }
def attachChildCluster(childCluster: VizCluster): Unit = { _childrenClusters += childCluster }
private[ui] class RDDOperationCluster(val id: String, val name: String) {
private val _childrenNodes = new ListBuffer[RDDOperationNode]
private val _childrenClusters = new ListBuffer[RDDOperationCluster]

def childrenNodes: Seq[RDDOperationNode] = _childrenNodes.iterator.toSeq
def childrenClusters: Seq[RDDOperationCluster] = _childrenClusters.iterator.toSeq
def attachChildNode(childNode: RDDOperationNode): Unit = { _childrenNodes += childNode }
def attachChildCluster(childCluster: RDDOperationCluster): Unit = {
_childrenClusters += childCluster
}
}

private[ui] object VizGraph extends Logging {
private[ui] object RDDOperationGraph extends Logging {

/**
* Construct a VizGraph for a given stage.
* Construct a RDDOperationGraph for a given stage.
*
* The root cluster represents the stage, and all children clusters represent RDD operations.
* Each node represents an RDD, and each edge represents a dependency between two RDDs pointing
* from the parent to the child.
*
* This does not currently merge common scopes across stages. This may be worth supporting in
* the future when we decide to group certain stages within the same job under a common scope
* (e.g. part of a SQL query).
* This does not currently merge common operator scopes across stages. This may be worth
* supporting in the future if we decide to group certain stages within the same job under
* a common scope (e.g. part of a SQL query).
*/
def makeVizGraph(stage: StageInfo): VizGraph = {
val edges = new ListBuffer[VizEdge]
val nodes = new mutable.HashMap[Int, VizNode]
val clusters = new mutable.HashMap[String, VizCluster] // cluster ID -> VizCluster
def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
val edges = new ListBuffer[RDDOperationEdge]
val nodes = new mutable.HashMap[Int, RDDOperationNode]
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID

// Root cluster is the stage cluster
val stageClusterId = s"stage_${stage.stageId}"
val stageClusterName = s"Stage ${stage.stageId}" +
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new VizCluster(stageClusterId, stageClusterName)
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)

// Find nodes, edges, and operator scopes that belong to this stage
stage.rddInfos.foreach { rdd =>
edges ++= rdd.parentIds.map { parentId => VizEdge(parentId, rdd.id) }
val node = nodes.getOrElseUpdate(rdd.id, VizNode(rdd.id, rdd.name))
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(rdd.id, rdd.name))

if (rdd.scope == null) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
Expand All @@ -99,7 +103,7 @@ private[ui] object VizGraph extends Logging {
val rddClusters = rddScopes.map { scope =>
val clusterId = scope.name + "_" + scope.id
val clusterName = scope.name
clusters.getOrElseUpdate(clusterId, new VizCluster(clusterId, clusterName))
clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
}
// Build the cluster hierarchy for this RDD
rddClusters.sliding(2).foreach { pc =>
Expand All @@ -117,10 +121,10 @@ private[ui] object VizGraph extends Logging {

// Classify each edge as internal, outgoing or incoming
// This information is needed to reason about how stages relate to each other
val internalEdges = new ListBuffer[VizEdge]
val outgoingEdges = new ListBuffer[VizEdge]
val incomingEdges = new ListBuffer[VizEdge]
edges.foreach { case e: VizEdge =>
val internalEdges = new ListBuffer[RDDOperationEdge]
val outgoingEdges = new ListBuffer[RDDOperationEdge]
val incomingEdges = new ListBuffer[RDDOperationEdge]
edges.foreach { case e: RDDOperationEdge =>
val fromThisGraph = nodes.contains(e.fromId)
val toThisGraph = nodes.contains(e.toId)
(fromThisGraph, toThisGraph) match {
Expand All @@ -132,7 +136,7 @@ private[ui] object VizGraph extends Logging {
}
}

VizGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
}

/**
Expand All @@ -145,7 +149,7 @@ private[ui] object VizGraph extends Logging {
*
* For the complete DOT specification, see http://www.graphviz.org/Documentation/dotguide.pdf.
*/
def makeDotFile(graph: VizGraph, forJob: Boolean): String = {
def makeDotFile(graph: RDDOperationGraph, forJob: Boolean): String = {
val dotFile = new StringBuilder
dotFile.append("digraph G {\n")
dotFile.append(makeDotSubgraph(graph.rootCluster, forJob, indent = " "))
Expand All @@ -159,21 +163,24 @@ private[ui] object VizGraph extends Logging {
}

/**
* Return the dot representation of a node.
* Return the dot representation of a node in an RDDOperationGraph.
*
* On the job page, is displayed as a small circle without labels.
* On the stage page, it is displayed as a box with an embedded label.
*/
private def makeDotNode(node: VizNode, forJob: Boolean): String = {
private def makeDotNode(node: RDDOperationNode, forJob: Boolean): String = {
if (forJob) {
s"""${node.id} [label=" " shape="circle" padding="5" labelStyle="font-size: 0"]"""
} else {
s"""${node.id} [label="${node.name} (${node.id})"]"""
}
}

/** Return the dot representation of a subgraph. */
private def makeDotSubgraph(scope: VizCluster, forJob: Boolean, indent: String): String = {
/** Return the dot representation of a subgraph in an RDDOperationGraph. */
private def makeDotSubgraph(
scope: RDDOperationCluster,
forJob: Boolean,
indent: String): String = {
val subgraph = new StringBuilder
subgraph.append(indent + s"subgraph cluster${scope.id} {\n")
subgraph.append(indent + s""" label="${scope.name}";\n""")
Expand Down

0 comments on commit 31aae06

Please sign in to comment.