Skip to content

Commit

Permalink
[SPARK-12902] [SQL] visualization for generated operators
Browse files Browse the repository at this point in the history
This PR brings back visualization for generated operators, they looks like:

![sql](https://cloud.githubusercontent.com/assets/40902/12460920/0dc7956a-bf6b-11e5-9c3f-8389f452526e.png)

![stage](https://cloud.githubusercontent.com/assets/40902/12460923/11806ac4-bf6b-11e5-9c72-e84a62c5ea93.png)

Note: SQL metrics are not supported right now, because they are very slow, will be supported once we have batch mode.

Author: Davies Liu <davies@databricks.com>

Closes #10828 from davies/viz_codegen.
  • Loading branch information
Davies Liu authored and davies committed Jan 25, 2016
1 parent c037d25 commit 7d877c3
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ function renderDot(dot, container, forJob) {
renderer(container, g);

// Find the stage cluster and mark it for styling and post-processing
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
container.selectAll("g.cluster[name^=\"Stage \"]").classed("stage", true);
}

/* -------------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ private[ui] object RDDOperationGraph extends Logging {
}
}
// Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster
rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) }
rddClusters.headOption.foreach { cluster =>
if (!rootCluster.childClusters.contains(cluster)) {
rootCluster.attachChildCluster(cluster)
}
}
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
text-shadow: none;
}

#plan-viz-graph svg g.cluster rect {
fill: #A0DFFF;
stroke: #3EC0FF;
stroke-width: 1px;
}

#plan-viz-graph svg g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ class SparkPlanInfo(
private[sql] object SparkPlanInfo {

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case WholeStageCodegen(child, _) => child :: Nil
case InputAdapter(child) => child :: Nil
case plan => plan.children
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
}
val children = plan.children.map(fromSparkPlan)

new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics)
new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
plan.metadata, metrics)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
}

private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = {
val metadata = graph.nodes.flatMap { node =>
val metadata = graph.allNodes.flatMap { node =>
val nodeId = s"plan-meta-data-${node.id}"
<div id={nodeId}>{node.desc}</div>
}
Expand All @@ -110,7 +110,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
<div class="dot-file">
{graph.makeDotFile(metrics)}
</div>
<div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div>
<div id="plan-viz-metadata-size">{graph.allNodes.size.toString}</div>
{metadata}
</div>
{planVisualizationResources}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
case SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) =>
val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node =>
node.metrics.map(metric => metric.accumulatorId -> metric)
}
val executionUIData = new SQLExecutionUIData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.{InputAdapter, SparkPlanInfo, WholeStageCodegen}
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
Expand All @@ -41,6 +41,16 @@ private[ui] case class SparkPlanGraph(
dotFile.append("}")
dotFile.toString()
}

/**
* All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
*/
val allNodes: Seq[SparkPlanGraphNode] = {
nodes.flatMap {
case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster
case node => Seq(node)
}
}
}

private[sql] object SparkPlanGraph {
Expand All @@ -52,30 +62,48 @@ private[sql] object SparkPlanGraph {
val nodeIdGenerator = new AtomicLong(0)
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null)
new SparkPlanGraph(nodes, edges)
}

private def buildSparkPlanGraphNode(
planInfo: SparkPlanInfo,
nodeIdGenerator: AtomicLong,
nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
SQLMetrics.getMetricParam(metric.metricParam))
edges: mutable.ArrayBuffer[SparkPlanGraphEdge],
parent: SparkPlanGraphNode,
subgraph: SparkPlanGraphCluster): Unit = {
if (planInfo.nodeName == classOf[WholeStageCodegen].getSimpleName) {
val cluster = new SparkPlanGraphCluster(
nodeIdGenerator.getAndIncrement(),
planInfo.nodeName,
planInfo.simpleString,
mutable.ArrayBuffer[SparkPlanGraphNode]())
nodes += cluster
buildSparkPlanGraphNode(
planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster)
} else if (planInfo.nodeName == classOf[InputAdapter].getSimpleName) {
buildSparkPlanGraphNode(planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null)
} else {
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
SQLMetrics.getMetricParam(metric.metricParam))
}
val node = new SparkPlanGraphNode(
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
planInfo.simpleString, planInfo.metadata, metrics)
if (subgraph == null) {
nodes += node
} else {
subgraph.nodes += node
}

if (parent != null) {
edges += SparkPlanGraphEdge(node.id, parent.id)
}
planInfo.children.foreach(
buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph))
}
val node = SparkPlanGraphNode(
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
planInfo.simpleString, planInfo.metadata, metrics)

nodes += node
val childrenNodes = planInfo.children.map(
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
for (child <- childrenNodes) {
edges += SparkPlanGraphEdge(child.id, node.id)
}
node
}
}

Expand All @@ -86,12 +114,12 @@ private[sql] object SparkPlanGraph {
* @param name the name of this SparkPlan node
* @param metrics metrics that this SparkPlan node will track
*/
private[ui] case class SparkPlanGraphNode(
id: Long,
name: String,
desc: String,
metadata: Map[String, String],
metrics: Seq[SQLPlanMetric]) {
private[ui] class SparkPlanGraphNode(
val id: Long,
val name: String,
val desc: String,
val metadata: Map[String, String],
val metrics: Seq[SQLPlanMetric]) {

def makeDotNode(metricsValue: Map[Long, String]): String = {
val builder = new mutable.StringBuilder(name)
Expand All @@ -117,6 +145,27 @@ private[ui] case class SparkPlanGraphNode(
}
}

/**
* Represent a tree of SparkPlan for WholeStageCodegen.
*/
private[ui] class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
val nodes: mutable.ArrayBuffer[SparkPlanGraphNode])
extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) {

override def makeDotNode(metricsValue: Map[Long, String]): String = {
s"""
| subgraph cluster${id} {
| label=${name};
| ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")}
| }
""".stripMargin
}
}


/**
* Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child
* node id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
// If we can track all jobs, check the metric values
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
df.queryExecution.executedPlan)).nodes.filter { node =>
df.queryExecution.executedPlan)).allNodes.filter { node =>
expectedMetrics.contains(node.id)
}.map { node =>
val nodeMetrics = node.metrics.map { metric =>
Expand Down Expand Up @@ -134,6 +134,14 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
)
}

test("WholeStageCodegen metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
// TODO: update metrics in generated operators
val df = sqlContext.range(10).filter('id < 5)
testSparkPlanMetrics(df, 1, Map.empty)
}

test("TungstenAggregate metrics") {
// Assume the execution plan is
// ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val df = createTestDataFrame
val accumulatorIds =
SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
.nodes.flatMap(_.metrics.map(_.accumulatorId))
.allNodes.flatMap(_.metrics.map(_.accumulatorId))
// Assume all accumulators are long
var accumulatorValue = 0L
val accumulatorUpdates = accumulatorIds.map { id =>
Expand Down

0 comments on commit 7d877c3

Please sign in to comment.