Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12902] [SQL] visualization for generated operators #10828

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
text-shadow: none;
}

#plan-viz-graph svg g.cluster rect {
fill: #E5F2ff;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a different style? I think we should make the UI stuff consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one will you suggest? This is outer of operator, should be lighter than that, I choose this because it's between white and the one used for operators.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant using the following style:

#plan-viz-graph svg g.cluster rect {
  fill: #A0DFFF;
  stroke: #3EC0FF;
  stroke-width: 1px;
}

screen shot 2016-01-20 at 10 22 48 am

This looks more consistent with the stage page:
screen shot 2016-01-20 at 10 24 32 am

By the way, I saw a bug in the stage page:
screen shot 2016-01-20 at 10 24 14 am
Could you check it?

stroke: #3EC0FF;
stroke-width: 1px;
}

#plan-viz-graph svg g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ class SparkPlanInfo(
private[sql] object SparkPlanInfo {

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case WholeStageCodegen(child, _) => child :: Nil
case InputAdapter(child) => child :: Nil
case plan => plan.children
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
}
val children = plan.children.map(fromSparkPlan)

new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics)
new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
plan.metadata, metrics)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.rules.Rule

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
}

private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = {
val metadata = graph.nodes.flatMap { node =>
val metadata = graph.allNodes.flatMap { node =>
val nodeId = s"plan-meta-data-${node.id}"
<div id={nodeId}>{node.desc}</div>
}
Expand All @@ -110,7 +110,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
<div class="dot-file">
{graph.makeDotFile(metrics)}
</div>
<div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div>
<div id="plan-viz-metadata-size">{graph.allNodes.size.toString}</div>
{metadata}
</div>
{planVisualizationResources}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
case SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) =>
val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node =>
node.metrics.map(metric => metric.accumulatorId -> metric)
}
val executionUIData = new SQLExecutionUIData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.{InputAdapter, SparkPlanInfo, WholeStageCodegen}
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
Expand All @@ -41,6 +41,16 @@ private[ui] case class SparkPlanGraph(
dotFile.append("}")
dotFile.toString()
}

/**
* All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
*/
val allNodes: Seq[SparkPlanGraphNode] = {
nodes.flatMap {
case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster
case node => Seq(node)
}
}
}

private[sql] object SparkPlanGraph {
Expand All @@ -52,30 +62,48 @@ private[sql] object SparkPlanGraph {
val nodeIdGenerator = new AtomicLong(0)
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null)
new SparkPlanGraph(nodes, edges)
}

private def buildSparkPlanGraphNode(
planInfo: SparkPlanInfo,
nodeIdGenerator: AtomicLong,
nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
SQLMetrics.getMetricParam(metric.metricParam))
edges: mutable.ArrayBuffer[SparkPlanGraphEdge],
parent: SparkPlanGraphNode,
subgraph: SparkPlanGraphCluster): Unit = {
if (planInfo.nodeName == classOf[WholeStageCodegen].getSimpleName) {
val cluster = new SparkPlanGraphCluster(
nodeIdGenerator.getAndIncrement(),
planInfo.nodeName,
planInfo.simpleString,
mutable.ArrayBuffer[SparkPlanGraphNode]())
nodes += cluster
buildSparkPlanGraphNode(
planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster)
} else if (planInfo.nodeName == classOf[InputAdapter].getSimpleName) {
buildSparkPlanGraphNode(planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null)
} else {
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
SQLMetrics.getMetricParam(metric.metricParam))
}
val node = new SparkPlanGraphNode(
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
planInfo.simpleString, planInfo.metadata, metrics)
if (subgraph == null) {
nodes += node
} else {
subgraph.nodes += node
}

if (parent != null) {
edges += SparkPlanGraphEdge(node.id, parent.id)
}
planInfo.children.foreach(
buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph))
}
val node = SparkPlanGraphNode(
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
planInfo.simpleString, planInfo.metadata, metrics)

nodes += node
val childrenNodes = planInfo.children.map(
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
for (child <- childrenNodes) {
edges += SparkPlanGraphEdge(child.id, node.id)
}
node
}
}

Expand All @@ -86,12 +114,12 @@ private[sql] object SparkPlanGraph {
* @param name the name of this SparkPlan node
* @param metrics metrics that this SparkPlan node will track
*/
private[ui] case class SparkPlanGraphNode(
id: Long,
name: String,
desc: String,
metadata: Map[String, String],
metrics: Seq[SQLPlanMetric]) {
private[ui] class SparkPlanGraphNode(
val id: Long,
val name: String,
val desc: String,
val metadata: Map[String, String],
val metrics: Seq[SQLPlanMetric]) {

def makeDotNode(metricsValue: Map[Long, String]): String = {
val builder = new mutable.StringBuilder(name)
Expand All @@ -117,6 +145,27 @@ private[ui] case class SparkPlanGraphNode(
}
}

/**
* Represent a tree of SparkPlan for WholeStageCodegen.
*/
private[ui] class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
val nodes: mutable.ArrayBuffer[SparkPlanGraphNode])
extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) {

override def makeDotNode(metricsValue: Map[Long, String]): String = {
s"""
| subgraph cluster${id} {
| label=${name};
| ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")}
| }
""".stripMargin
}
}


/**
* Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child
* node id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
// If we can track all jobs, check the metric values
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
df.queryExecution.executedPlan)).nodes.filter { node =>
df.queryExecution.executedPlan)).allNodes.filter { node =>
expectedMetrics.contains(node.id)
}.map { node =>
val nodeMetrics = node.metrics.map { metric =>
Expand Down Expand Up @@ -132,6 +132,14 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
)
}

test("WholeStageCodegen metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
// TODO: update metrics in generated operators
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a JIRA for this one? If not, could you create one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val df = sqlContext.range(10).filter('id < 5)
testSparkPlanMetrics(df, 1, Map.empty)
}

test("TungstenAggregate metrics") {
// Assume the execution plan is
// ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val df = createTestDataFrame
val accumulatorIds =
SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
.nodes.flatMap(_.metrics.map(_.accumulatorId))
.allNodes.flatMap(_.metrics.map(_.accumulatorId))
// Assume all accumulators are long
var accumulatorValue = 0L
val accumulatorUpdates = accumulatorIds.map { id =>
Expand Down