From f0f3da6821ce03059ac472c3ab58bd6b183b4b7e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 18 Jan 2016 23:27:44 -0800 Subject: [PATCH 1/5] visualize the plans inside whole stage codegen --- .../sql/execution/ui/static/spark-sql-viz.css | 6 ++ .../spark/sql/execution/SparkPlanInfo.scala | 9 +- .../sql/execution/WholeStageCodegen.scala | 15 ++- .../spark/sql/execution/basicOperators.scala | 10 +- .../sql/execution/ui/ExecutionPage.scala | 4 +- .../spark/sql/execution/ui/SQLListener.scala | 2 +- .../sql/execution/ui/SparkPlanGraph.scala | 95 ++++++++++++++----- .../execution/metric/SQLMetricsSuite.scala | 14 ++- .../sql/execution/ui/SQLListenerSuite.scala | 2 +- 9 files changed, 125 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index ddd3a91dd8ef8..8b5a928685dfc 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -20,6 +20,12 @@ text-shadow: none; } +#plan-viz-graph svg g.cluster rect { + fill: #E5F2ff; + stroke: #3EC0FF; + stroke-width: 1px; +} + #plan-viz-graph svg g.node rect { fill: #C3EBFF; stroke: #3EC0FF; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 4f750ad13ab84..4dd9928244197 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -36,12 +36,17 @@ class SparkPlanInfo( private[sql] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { + val children = plan match { + case WholeStageCodegen(child, _) => child :: Nil + case InputAdapter(child) => child :: Nil + case plan => plan.children + } val metrics = plan.metrics.toSeq.map { case (key, metric) => new SQLMetricInfo(metric.name.getOrElse(key), metric.id, Utils.getFormattedClassName(metric.param)) } - val children = plan.children.map(fromSparkPlan) - new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics) + new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), + plan.metadata, metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index c15fabab805a7..08677d79f75e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.metric.LongSQLMetric /** * An interface for those physical operators that support codegen. @@ -90,8 +91,20 @@ trait CodegenSupport extends SparkPlan { * } */ def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String -} + /** + * Return a term for a LongSQLMetric specified by given name. + */ + protected def termForLongMetric(ctx: CodegenContext, name: String): String = { + val metric = longMetric(name) + val idx = ctx.references.length + ctx.references += metric + val term = ctx.freshName(name) + val clsName = classOf[LongSQLMetric].getName + ctx.addMutableState(clsName, term, s"$term = ($clsName) references[$idx];") + term + } +} /** * InputAdapter is used to hide a SparkPlan from a subtree that support codegen. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 9e2e0357c65f0..44ec3a90de2f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics} import org.apache.spark.sql.types.LongType import org.apache.spark.util.MutablePair import org.apache.spark.util.random.PoissonSampler @@ -42,11 +42,14 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) } override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = { + val numRows = termForLongMetric(ctx, "numRows") val exprs = projectList.map(x => ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) ctx.currentVars = input val output = exprs.map(_.gen(ctx)) s""" + | // Projection + | $numRows.add(1); | ${output.map(_.code).mkString("\n")} | | ${consume(ctx, output)} @@ -81,13 +84,18 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit } override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = { + val numInputTerm = termForLongMetric(ctx, "numInputRows") + val numOutputTerm = termForLongMetric(ctx, "numOutputRows") val expr = ExpressionCanonicalizer.execute( BindReferences.bindReference(condition, child.output)) ctx.currentVars = input val eval = expr.gen(ctx) s""" + | // Filter + | $numInputTerm.add(1); | ${eval.code} | if (!${eval.isNull} && ${eval.value}) { + | $numOutputTerm.add(1); | ${consume(ctx, ctx.currentVars)} | } """.stripMargin diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index c74ad40406992..49915adf6cd29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -99,7 +99,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") } private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = { - val metadata = graph.nodes.flatMap { node => + val metadata = graph.allNodes.flatMap { node => val nodeId = s"plan-meta-data-${node.id}"
{node.desc}
} @@ -110,7 +110,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
{graph.makeDotFile(metrics)}
-
{graph.nodes.size.toString}
+
{graph.allNodes.size.toString}
{metadata} {planVisualizationResources} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index cd56136927088..83c64f755f90f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -219,7 +219,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi case SparkListenerSQLExecutionStart(executionId, description, details, physicalPlanDescription, sparkPlanInfo, time) => val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo) - val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node => + val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node => node.metrics.map(metric => metric.accumulatorId -> metric) } val executionUIData = new SQLExecutionUIData( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 3a6eff9399825..4eb248569b281 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.execution.{InputAdapter, SparkPlanInfo, WholeStageCodegen} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -41,6 +41,16 @@ private[ui] case class SparkPlanGraph( dotFile.append("}") dotFile.toString() } + + /** + * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen. + */ + val allNodes: Seq[SparkPlanGraphNode] = { + nodes.flatMap { + case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster + case node => Seq(node) + } + } } private[sql] object SparkPlanGraph { @@ -52,7 +62,7 @@ private[sql] object SparkPlanGraph { val nodeIdGenerator = new AtomicLong(0) val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]() val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]() - buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges) + buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null) new SparkPlanGraph(nodes, edges) } @@ -60,22 +70,40 @@ private[sql] object SparkPlanGraph { planInfo: SparkPlanInfo, nodeIdGenerator: AtomicLong, nodes: mutable.ArrayBuffer[SparkPlanGraphNode], - edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = { - val metrics = planInfo.metrics.map { metric => - SQLPlanMetric(metric.name, metric.accumulatorId, - SQLMetrics.getMetricParam(metric.metricParam)) + edges: mutable.ArrayBuffer[SparkPlanGraphEdge], + parent: SparkPlanGraphNode, + subgraph: SparkPlanGraphCluster): Unit = { + if (planInfo.nodeName == classOf[WholeStageCodegen].getSimpleName) { + val cluster = new SparkPlanGraphCluster( + nodeIdGenerator.getAndIncrement(), + planInfo.nodeName, + planInfo.simpleString, + mutable.ArrayBuffer[SparkPlanGraphNode]()) + nodes += cluster + buildSparkPlanGraphNode( + planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster) + } else if (planInfo.nodeName == classOf[InputAdapter].getSimpleName) { + buildSparkPlanGraphNode(planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null) + } else { + val metrics = planInfo.metrics.map { metric => + SQLPlanMetric(metric.name, metric.accumulatorId, + SQLMetrics.getMetricParam(metric.metricParam)) + } + val node = new SparkPlanGraphNode( + nodeIdGenerator.getAndIncrement(), planInfo.nodeName, + planInfo.simpleString, planInfo.metadata, metrics) + if (subgraph == null) { + nodes += node + } else { + subgraph.nodes += node + } + + if (parent != null) { + edges += SparkPlanGraphEdge(node.id, parent.id) + } + planInfo.children.foreach( + buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph)) } - val node = SparkPlanGraphNode( - nodeIdGenerator.getAndIncrement(), planInfo.nodeName, - planInfo.simpleString, planInfo.metadata, metrics) - - nodes += node - val childrenNodes = planInfo.children.map( - child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges)) - for (child <- childrenNodes) { - edges += SparkPlanGraphEdge(child.id, node.id) - } - node } } @@ -86,12 +114,12 @@ private[sql] object SparkPlanGraph { * @param name the name of this SparkPlan node * @param metrics metrics that this SparkPlan node will track */ -private[ui] case class SparkPlanGraphNode( - id: Long, - name: String, - desc: String, - metadata: Map[String, String], - metrics: Seq[SQLPlanMetric]) { +private[ui] class SparkPlanGraphNode( + val id: Long, + val name: String, + val desc: String, + val metadata: Map[String, String], + val metrics: Seq[SQLPlanMetric]) { def makeDotNode(metricsValue: Map[Long, String]): String = { val builder = new mutable.StringBuilder(name) @@ -117,6 +145,27 @@ private[ui] case class SparkPlanGraphNode( } } +/** + * Represent a tree of SparkPlan for WholeStageCodegen. + */ +private[ui] class SparkPlanGraphCluster( + id: Long, + name: String, + desc: String, + val nodes: mutable.ArrayBuffer[SparkPlanGraphNode]) + extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) { + + override def makeDotNode(metricsValue: Map[Long, String]): String = { + s""" + | subgraph cluster${id} { + | label=${name}; + | ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")} + | } + """.stripMargin + } +} + + /** * Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child * node id. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 4339f7260dcb9..00a9346f2e1ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -84,7 +84,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { // If we can track all jobs, check the metric values val metricValues = sqlContext.listener.getExecutionMetrics(executionId) val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( - df.queryExecution.executedPlan)).nodes.filter { node => + df.queryExecution.executedPlan)).allNodes.filter { node => expectedMetrics.contains(node.id) }.map { node => val nodeMetrics = node.metrics.map { metric => @@ -132,6 +132,18 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { ) } + test("WholeStageCodegen metrics") { + // Assume the execution plan is + // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1)) + val df = sqlContext.range(10).filter('id < 5) + testSparkPlanMetrics(df, 1, Map( + 1L -> ("Filter", Map( + "number of input rows" -> 10L, + "number of output rows" -> 5L + ))) + ) + } + test("TungstenAggregate metrics") { // Assume the execution plan is // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index eef3c1f3e34d9..81a159d542c67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -83,7 +83,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val df = createTestDataFrame val accumulatorIds = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)) - .nodes.flatMap(_.metrics.map(_.accumulatorId)) + .allNodes.flatMap(_.metrics.map(_.accumulatorId)) // Assume all accumulators are long var accumulatorValue = 0L val accumulatorUpdates = accumulatorIds.map { id => From 331b46d7d3e0f0a5da62fb4cd9af3bb9b6cd54ec Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 19 Jan 2016 14:12:39 -0800 Subject: [PATCH 2/5] remove metrics, because they are very slow --- .../spark/sql/execution/WholeStageCodegen.scala | 17 ++--------------- .../spark/sql/execution/basicOperators.scala | 10 +--------- .../sql/execution/metric/SQLMetricsSuite.scala | 8 ++------ 3 files changed, 5 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 08677d79f75e6..c7c02576719a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -22,10 +22,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, LeafExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.metric.LongSQLMetric /** * An interface for those physical operators that support codegen. @@ -91,21 +90,9 @@ trait CodegenSupport extends SparkPlan { * } */ def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String - - /** - * Return a term for a LongSQLMetric specified by given name. - */ - protected def termForLongMetric(ctx: CodegenContext, name: String): String = { - val metric = longMetric(name) - val idx = ctx.references.length - ctx.references += metric - val term = ctx.freshName(name) - val clsName = classOf[LongSQLMetric].getName - ctx.addMutableState(clsName, term, s"$term = ($clsName) references[$idx];") - term - } } + /** * InputAdapter is used to hide a SparkPlan from a subtree that support codegen. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 44ec3a90de2f4..9e2e0357c65f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.LongType import org.apache.spark.util.MutablePair import org.apache.spark.util.random.PoissonSampler @@ -42,14 +42,11 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) } override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = { - val numRows = termForLongMetric(ctx, "numRows") val exprs = projectList.map(x => ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) ctx.currentVars = input val output = exprs.map(_.gen(ctx)) s""" - | // Projection - | $numRows.add(1); | ${output.map(_.code).mkString("\n")} | | ${consume(ctx, output)} @@ -84,18 +81,13 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit } override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = { - val numInputTerm = termForLongMetric(ctx, "numInputRows") - val numOutputTerm = termForLongMetric(ctx, "numOutputRows") val expr = ExpressionCanonicalizer.execute( BindReferences.bindReference(condition, child.output)) ctx.currentVars = input val eval = expr.gen(ctx) s""" - | // Filter - | $numInputTerm.add(1); | ${eval.code} | if (!${eval.isNull} && ${eval.value}) { - | $numOutputTerm.add(1); | ${consume(ctx, ctx.currentVars)} | } """.stripMargin diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 00a9346f2e1ed..a41147d26b128 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -135,13 +135,9 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { test("WholeStageCodegen metrics") { // Assume the execution plan is // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1)) + // TODO: update metrics in generated operators val df = sqlContext.range(10).filter('id < 5) - testSparkPlanMetrics(df, 1, Map( - 1L -> ("Filter", Map( - "number of input rows" -> 10L, - "number of output rows" -> 5L - ))) - ) + testSparkPlanMetrics(df, 1, Map.empty) } test("TungstenAggregate metrics") { From a179e8da478f44763204bfb7167f7c9fb9e2ce4a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 20 Jan 2016 11:40:10 -0800 Subject: [PATCH 3/5] fix style --- .../resources/org/apache/spark/ui/static/spark-dag-viz.js | 2 +- .../scala/org/apache/spark/ui/scope/RDDOperationGraph.scala | 6 +++++- .../apache/spark/sql/execution/ui/static/spark-sql-viz.css | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 83dbea40b63f3..3af25fc73c78e 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -284,7 +284,7 @@ function renderDot(dot, container, forJob) { renderer(container, g); // Find the stage cluster and mark it for styling and post-processing - container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true); + container.selectAll("g.cluster[name*=\"Stage \"]").classed("stage", true); } /* -------------------- * diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 06da74f1b6b5f..003c218aada9c 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -130,7 +130,11 @@ private[ui] object RDDOperationGraph extends Logging { } } // Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster - rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) } + rddClusters.headOption.foreach { cluster => + if (!rootCluster.childClusters.contains(cluster)) { + rootCluster.attachChildCluster(cluster) + } + } rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) } } } diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index 8b5a928685dfc..303f8ebb8814c 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -21,7 +21,7 @@ } #plan-viz-graph svg g.cluster rect { - fill: #E5F2ff; + fill: #A0DFFF; stroke: #3EC0FF; stroke-width: 1px; } From ccac15248ff2372204381a8f43bd66604a2886a3 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 20 Jan 2016 11:59:19 -0800 Subject: [PATCH 4/5] Update spark-dag-viz.js --- .../main/resources/org/apache/spark/ui/static/spark-dag-viz.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 3af25fc73c78e..4337c42087e79 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -284,7 +284,7 @@ function renderDot(dot, container, forJob) { renderer(container, g); // Find the stage cluster and mark it for styling and post-processing - container.selectAll("g.cluster[name*=\"Stage \"]").classed("stage", true); + container.selectAll("g.cluster[name^=\"Stage \"]").classed("stage", true); } /* -------------------- * From 5a431263dd39f80c204b8f29083f312acc353021 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 20 Jan 2016 17:03:26 -0800 Subject: [PATCH 5/5] fix conflict --- .../org/apache/spark/sql/execution/WholeStageCodegen.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index ebf6909b37713..57f4945de9804 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, LeafExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.rules.Rule