diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index dabfb5900ea0d..c4d6a2ba34bcc 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -25,7 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab} import org.apache.spark.ui.storage.{StorageListener, StorageTab} -import org.apache.spark.ui.viz.VisualizationListener +import org.apache.spark.ui.scope.OperationGraphListener /** * Top level user interface for a Spark application. @@ -39,7 +39,7 @@ private[spark] class SparkUI private ( val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, - val visualizationListener: VisualizationListener, + val operationGraphListener: OperationGraphListener, var appName: String, val basePath: String) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") @@ -149,16 +149,16 @@ private[spark] object SparkUI { val storageStatusListener = new StorageStatusListener val executorsListener = new ExecutorsListener(storageStatusListener) val storageListener = new StorageListener(storageStatusListener) - val visualizationListener = new VisualizationListener(conf) + val operationGraphListener = new OperationGraphListener(conf) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) - listenerBus.addListener(visualizationListener) + listenerBus.addListener(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, visualizationListener, + executorsListener, _jobProgressListener, storageListener, operationGraphListener, appName, basePath) } } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 0f1ea20413db6..08917caa30849 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -23,7 +23,7 @@ import java.util.{Locale, Date} import scala.xml.{Node, Text} import org.apache.spark.Logging -import org.apache.spark.ui.viz.VizGraph +import org.apache.spark.ui.scope.RDDOperationGraph /** Utility functions for generating XML pages with spark content. */ private[spark] object UIUtils extends Logging { @@ -332,12 +332,12 @@ private[spark] object UIUtils extends Logging { } /** Return a "DAG visualization" DOM element that expands into a visualization for a stage. */ - def showDagVizForStage(stageId: Int, graph: Option[VizGraph]): Seq[Node] = { + def showDagVizForStage(stageId: Int, graph: Option[RDDOperationGraph]): Seq[Node] = { showDagViz(graph.toSeq, forJob = false) } /** Return a "DAG visualization" DOM element that expands into a visualization for a job. */ - def showDagVizForJob(jobId: Int, graphs: Seq[VizGraph]): Seq[Node] = { + def showDagVizForJob(jobId: Int, graphs: Seq[RDDOperationGraph]): Seq[Node] = { showDagViz(graphs, forJob = true) } @@ -348,7 +348,7 @@ private[spark] object UIUtils extends Logging { * a format that is expected by spark-dag-viz.js. Any changes in the format here must be * reflected there. */ - private def showDagViz(graphs: Seq[VizGraph], forJob: Boolean): Seq[Node] = { + private def showDagViz(graphs: Seq[RDDOperationGraph], forJob: Boolean): Seq[Node] = {
@@ -359,7 +359,7 @@ private[spark] object UIUtils extends Logging { { graphs.map { g => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 3c18a5d875079..aa7156dd83657 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -304,12 +304,13 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { var content = summary val appStartTime = listener.startTime val executorListener = parent.executorListener - val vizListener = parent.vizListener + val operationGraphListener = parent.operationGraphListener content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, executorListener.executorIdToData, appStartTime) - content ++= UIUtils.showDagVizForJob(jobId, vizListener.getVizGraphsForJob(jobId)) + content ++= UIUtils.showDagVizForJob( + jobId, operationGraphListener.getOperationGraphForJob(jobId)) if (shouldShowActiveStages) { content ++=

Active Stages ({activeStages.size})

++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 4288a7bad55c8..77ca60b000a9b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -26,7 +26,7 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { val killEnabled = parent.killEnabled val jobProgresslistener = parent.jobProgressListener val executorListener = parent.executorsListener - val vizListener = parent.visualizationListener + val operationGraphListener = parent.operationGraphListener def isFairScheduler: Boolean = jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 12c1a88512d43..579310070c76c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -27,14 +27,14 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.executor.TaskMetrics import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} import org.apache.spark.ui.jobs.UIData._ -import org.apache.spark.ui.viz.VizGraph +import org.apache.spark.ui.scope.RDDOperationGraph import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val progressListener = parent.progressListener - private val vizListener = parent.vizListener + private val operationGraphListener = parent.operationGraphListener def render(request: HttpServletRequest): Seq[Node] = { progressListener.synchronized { @@ -171,7 +171,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
- val dagViz = UIUtils.showDagVizForStage(stageId, vizListener.getVizGraphForStage(stageId)) + val dagViz = UIUtils.showDagVizForStage( + stageId, operationGraphListener.getOperationGraphForStage(stageId)) val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value") def accumulableRow(acc: AccumulableInfo): Elem = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 84c21258a8c9f..55169956d8304 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -27,7 +27,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" val sc = parent.sc val killEnabled = parent.killEnabled val progressListener = parent.jobProgressListener - val vizListener = parent.visualizationListener + val operationGraphListener = parent.operationGraphListener attachPage(new AllStagesPage(this)) attachPage(new StagePage(this)) diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/OperationGraphListener.scala similarity index 78% rename from core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala rename to core/src/main/scala/org/apache/spark/ui/scope/OperationGraphListener.scala index 95a6cb729e1d1..ffe0cb814956e 100644 --- a/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/OperationGraphListener.scala @@ -15,21 +15,20 @@ * limitations under the License. */ -package org.apache.spark.ui.viz +package org.apache.spark.ui.scope import scala.collection.mutable -import scala.xml.{Node, Unparsed} import org.apache.spark.SparkConf import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI /** - * A SparkListener that constructs the RDD DAG visualization for the UI. + * A SparkListener that constructs a DAG of RDD operations. */ -private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener { +private[ui] class OperationGraphListener(conf: SparkConf) extends SparkListener { private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]] - private val stageIdToGraph = new mutable.HashMap[Int, VizGraph] + private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph] private val stageIds = new mutable.ArrayBuffer[Int] // How many jobs or stages to retain graph metadata for @@ -37,25 +36,25 @@ private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener { conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) /** Return the graph metadata for the given stage, or None if no such information exists. */ - def getVizGraphsForJob(jobId: Int): Seq[VizGraph] = { + def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = { jobIdToStageIds.get(jobId) .map { sids => sids.flatMap { sid => stageIdToGraph.get(sid) } } .getOrElse { Seq.empty } } /** Return the graph metadata for the given stage, or None if no such information exists. */ - def getVizGraphForStage(stageId: Int): Option[VizGraph] = { + def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = { stageIdToGraph.get(stageId) } - /** On job start, construct a VizGraph for each stage in the job for display later. */ + /** On job start, construct a RDDOperationGraph for each stage in the job for display later. */ override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { val jobId = jobStart.jobId val stageInfos = jobStart.stageInfos stageInfos.foreach { stageInfo => stageIds += stageInfo.stageId - stageIdToGraph(stageInfo.stageId) = VizGraph.makeVizGraph(stageInfo) + stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo) } jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala similarity index 66% rename from core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala rename to core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index fc562c97fe7d7..65941349cdc9c 100644 --- a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -15,78 +15,82 @@ * limitations under the License. */ -package org.apache.spark.ui.viz +package org.apache.spark.ui.scope import scala.collection.mutable import scala.collection.mutable.ListBuffer import org.apache.spark.Logging -import org.apache.spark.rdd.OperatorScope import org.apache.spark.scheduler.StageInfo /** - * A representation of a generic cluster graph used for storing visualization information. + * A representation of a generic cluster graph used for storing information on RDD operations. * * Each graph is defined with a set of edges and a root cluster, which may contain children * nodes and children clusters. Additionally, a graph may also have edges that enter or exit * the graph from nodes that belong to adjacent graphs. */ -private[ui] case class VizGraph( - edges: Seq[VizEdge], - outgoingEdges: Seq[VizEdge], - incomingEdges: Seq[VizEdge], - rootCluster: VizCluster) +private[ui] case class RDDOperationGraph( + edges: Seq[RDDOperationEdge], + outgoingEdges: Seq[RDDOperationEdge], + incomingEdges: Seq[RDDOperationEdge], + rootCluster: RDDOperationCluster) -/** A node in a VizGraph. This represents an RDD. */ -private[ui] case class VizNode(id: Int, name: String) +/** A node in an RDDOperationGraph. This represents an RDD. */ +private[ui] case class RDDOperationNode(id: Int, name: String) -/** A directed edge connecting two nodes in a VizGraph. This represents an RDD dependency. */ -private[ui] case class VizEdge(fromId: Int, toId: Int) +/** + * A directed edge connecting two nodes in an RDDOperationGraph. + * This represents an RDD dependency. + */ +private[ui] case class RDDOperationEdge(fromId: Int, toId: Int) /** - * A cluster that groups nodes together in a VizGraph. + * A cluster that groups nodes together in an RDDOperationGraph. * * This represents any grouping of RDDs, including operator scopes (e.g. textFile, flatMap), * stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters. */ -private[ui] class VizCluster(val id: String, val name: String) { - private val _childrenNodes = new ListBuffer[VizNode] - private val _childrenClusters = new ListBuffer[VizCluster] - - def childrenNodes: Seq[VizNode] = _childrenNodes.iterator.toSeq - def childrenClusters: Seq[VizCluster] = _childrenClusters.iterator.toSeq - def attachChildNode(childNode: VizNode): Unit = { _childrenNodes += childNode } - def attachChildCluster(childCluster: VizCluster): Unit = { _childrenClusters += childCluster } +private[ui] class RDDOperationCluster(val id: String, val name: String) { + private val _childrenNodes = new ListBuffer[RDDOperationNode] + private val _childrenClusters = new ListBuffer[RDDOperationCluster] + + def childrenNodes: Seq[RDDOperationNode] = _childrenNodes.iterator.toSeq + def childrenClusters: Seq[RDDOperationCluster] = _childrenClusters.iterator.toSeq + def attachChildNode(childNode: RDDOperationNode): Unit = { _childrenNodes += childNode } + def attachChildCluster(childCluster: RDDOperationCluster): Unit = { + _childrenClusters += childCluster + } } -private[ui] object VizGraph extends Logging { +private[ui] object RDDOperationGraph extends Logging { /** - * Construct a VizGraph for a given stage. + * Construct a RDDOperationGraph for a given stage. * * The root cluster represents the stage, and all children clusters represent RDD operations. * Each node represents an RDD, and each edge represents a dependency between two RDDs pointing * from the parent to the child. * - * This does not currently merge common scopes across stages. This may be worth supporting in - * the future when we decide to group certain stages within the same job under a common scope - * (e.g. part of a SQL query). + * This does not currently merge common operator scopes across stages. This may be worth + * supporting in the future if we decide to group certain stages within the same job under + * a common scope (e.g. part of a SQL query). */ - def makeVizGraph(stage: StageInfo): VizGraph = { - val edges = new ListBuffer[VizEdge] - val nodes = new mutable.HashMap[Int, VizNode] - val clusters = new mutable.HashMap[String, VizCluster] // cluster ID -> VizCluster + def makeOperationGraph(stage: StageInfo): RDDOperationGraph = { + val edges = new ListBuffer[RDDOperationEdge] + val nodes = new mutable.HashMap[Int, RDDOperationNode] + val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID // Root cluster is the stage cluster val stageClusterId = s"stage_${stage.stageId}" val stageClusterName = s"Stage ${stage.stageId}" + { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" } - val rootCluster = new VizCluster(stageClusterId, stageClusterName) + val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName) // Find nodes, edges, and operator scopes that belong to this stage stage.rddInfos.foreach { rdd => - edges ++= rdd.parentIds.map { parentId => VizEdge(parentId, rdd.id) } - val node = nodes.getOrElseUpdate(rdd.id, VizNode(rdd.id, rdd.name)) + edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) } + val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(rdd.id, rdd.name)) if (rdd.scope == null) { // This RDD has no encompassing scope, so we put it directly in the root cluster @@ -99,7 +103,7 @@ private[ui] object VizGraph extends Logging { val rddClusters = rddScopes.map { scope => val clusterId = scope.name + "_" + scope.id val clusterName = scope.name - clusters.getOrElseUpdate(clusterId, new VizCluster(clusterId, clusterName)) + clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName)) } // Build the cluster hierarchy for this RDD rddClusters.sliding(2).foreach { pc => @@ -117,10 +121,10 @@ private[ui] object VizGraph extends Logging { // Classify each edge as internal, outgoing or incoming // This information is needed to reason about how stages relate to each other - val internalEdges = new ListBuffer[VizEdge] - val outgoingEdges = new ListBuffer[VizEdge] - val incomingEdges = new ListBuffer[VizEdge] - edges.foreach { case e: VizEdge => + val internalEdges = new ListBuffer[RDDOperationEdge] + val outgoingEdges = new ListBuffer[RDDOperationEdge] + val incomingEdges = new ListBuffer[RDDOperationEdge] + edges.foreach { case e: RDDOperationEdge => val fromThisGraph = nodes.contains(e.fromId) val toThisGraph = nodes.contains(e.toId) (fromThisGraph, toThisGraph) match { @@ -132,7 +136,7 @@ private[ui] object VizGraph extends Logging { } } - VizGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster) + RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster) } /** @@ -145,7 +149,7 @@ private[ui] object VizGraph extends Logging { * * For the complete DOT specification, see http://www.graphviz.org/Documentation/dotguide.pdf. */ - def makeDotFile(graph: VizGraph, forJob: Boolean): String = { + def makeDotFile(graph: RDDOperationGraph, forJob: Boolean): String = { val dotFile = new StringBuilder dotFile.append("digraph G {\n") dotFile.append(makeDotSubgraph(graph.rootCluster, forJob, indent = " ")) @@ -159,12 +163,12 @@ private[ui] object VizGraph extends Logging { } /** - * Return the dot representation of a node. + * Return the dot representation of a node in an RDDOperationGraph. * * On the job page, is displayed as a small circle without labels. * On the stage page, it is displayed as a box with an embedded label. */ - private def makeDotNode(node: VizNode, forJob: Boolean): String = { + private def makeDotNode(node: RDDOperationNode, forJob: Boolean): String = { if (forJob) { s"""${node.id} [label=" " shape="circle" padding="5" labelStyle="font-size: 0"]""" } else { @@ -172,8 +176,11 @@ private[ui] object VizGraph extends Logging { } } - /** Return the dot representation of a subgraph. */ - private def makeDotSubgraph(scope: VizCluster, forJob: Boolean, indent: String): String = { + /** Return the dot representation of a subgraph in an RDDOperationGraph. */ + private def makeDotSubgraph( + scope: RDDOperationCluster, + forJob: Boolean, + indent: String): String = { val subgraph = new StringBuilder subgraph.append(indent + s"subgraph cluster${scope.id} {\n") subgraph.append(indent + s""" label="${scope.name}";\n""")