Skip to content

Commit

Permalink
Display cached RDDs on the viz
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 4, 2015
1 parent b1f0fd1 commit 429e9e1
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@

var VizConstants = {
rddColor: "#444444",
rddCachedColor: "#7DDD00",
rddOperationColor: "#AADFFF",
stageColor: "#FFDDEE",
operationScopeColor: "#AADFFF",
clusterLabelColor: "#888888",
edgeColor: "#444444",
edgeWidth: "1.5px",
Expand Down Expand Up @@ -125,6 +126,12 @@ function renderDagViz(forJob) {
renderDagVizForStage(svg);
}

// Find cached RDDs
metadataContainer().selectAll(".cached-rdd").each(function(v) {
var nodeId = VizConstants.nodePrefix + d3.select(this).text();
graphContainer().selectAll("#" + nodeId).classed("cached", true);
});

// Set the appropriate SVG dimensions to ensure that all elements are displayed
var boundingBox = svg.node().getBBox();
svg.style("width", (boundingBox.width + VizConstants.svgMarginX) + "px");
Expand Down Expand Up @@ -240,7 +247,7 @@ function renderDot(dot, container) {
function styleDagViz(forJob) {
graphContainer().selectAll("svg g.cluster rect")
.style("fill", "white")
.style("stroke", VizConstants.operationScopeColor)
.style("stroke", VizConstants.rddOperationColor)
.style("stroke-width", "4px")
.style("stroke-opacity", "0.5");
graphContainer().selectAll("svg g.cluster text")
Expand Down Expand Up @@ -279,6 +286,9 @@ function styleDagViz(forJob) {
function styleDagVizForJob() {
graphContainer().selectAll("svg g.node circle")
.style("fill", VizConstants.rddColor);
// TODO: add a legend to explain what a highlighted dot means
graphContainer().selectAll("svg g.cached circle")
.style("fill", VizConstants.rddCachedColor);
graphContainer().selectAll("svg g#cross-stage-edges path")
.style("fill", "none");
}
Expand All @@ -289,6 +299,9 @@ function styleDagVizForStage() {
.style("fill", "none")
.style("stroke", VizConstants.rddColor)
.style("stroke-width", "2px");
// TODO: add a legend to explain what a highlighted RDD means
graphContainer().selectAll("svg g.cached rect")
.style("stroke", VizConstants.rddCachedColor);
graphContainer().selectAll("svg g.node g.label text tspan")
.style("fill", VizConstants.rddColor);
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ private[spark] object UIUtils extends Logging {
<div class="dot-file">{RDDOperationGraph.makeDotFile(g, forJob)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
{
g.rootCluster.getAllNodes.filter(_.cached).map { n =>
<div class="cached-rdd">{n.id}</div>
}
}
</div>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {

<span class="expand-application-timeline">
<span class="expand-application-timeline-arrow arrow-closed"></span>
<strong>Event Timeline</strong>
<strong>Event timeline</strong>
</span> ++
<div id="application-timeline" class="collapsed">
<div class="control-panel">
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {

<span class="expand-job-timeline">
<span class="expand-job-timeline-arrow arrow-closed"></span>
<strong>Event Timeline</strong>
<strong>Event timeline</strong>
</span> ++
<div id="job-timeline" class="collapsed">
<div class="control-panel">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer

import org.apache.spark.Logging
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.StorageLevel

/**
* A representation of a generic cluster graph used for storing information on RDD operations.
Expand All @@ -37,7 +38,7 @@ private[ui] case class RDDOperationGraph(
rootCluster: RDDOperationCluster)

/** A node in an RDDOperationGraph. This represents an RDD. */
private[ui] case class RDDOperationNode(id: Int, name: String)
private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean)

/**
* A directed edge connecting two nodes in an RDDOperationGraph.
Expand All @@ -61,6 +62,11 @@ private[ui] class RDDOperationCluster(val id: String, val name: String) {
def attachChildCluster(childCluster: RDDOperationCluster): Unit = {
_childrenClusters += childCluster
}

/** Return all the nodes container in this cluster, including ones nested in other clusters. */
def getAllNodes: Seq[RDDOperationNode] = {
_childrenNodes ++ _childrenClusters.flatMap(_.childrenNodes)
}
}

private[ui] object RDDOperationGraph extends Logging {
Expand Down Expand Up @@ -90,7 +96,10 @@ private[ui] object RDDOperationGraph extends Logging {
// Find nodes, edges, and operation scopes that belong to this stage
stage.rddInfos.foreach { rdd =>
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(rdd.id, rdd.name))

// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
val node = nodes.getOrElseUpdate(
rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE))

if (rdd.scope == null) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
Expand Down

0 comments on commit 429e9e1

Please sign in to comment.