Skip to content

Commit

Permalink
[SPARK-13415][SQL] Visualize subquery in SQL web UI
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR support visualization for subquery in SQL web UI, also improve the explain of subquery, especially when it's used together with whole stage codegen.

For example:
```python
>>> sqlContext.range(100).registerTempTable("range")
>>> sqlContext.sql("select id / (select sum(id) from range) from range where id > (select id from range limit 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias(('id / subquery#9), None)]
:  +- 'SubqueryAlias subquery#9
:     +- 'Project [unresolvedalias('sum('id), None)]
:        +- 'UnresolvedRelation `range`, None
+- 'Filter ('id > subquery#8)
   :  +- 'SubqueryAlias subquery#8
   :     +- 'GlobalLimit 1
   :        +- 'LocalLimit 1
   :           +- 'Project [unresolvedalias('id, None)]
   :              +- 'UnresolvedRelation `range`, None
   +- 'UnresolvedRelation `range`, None

== Analyzed Logical Plan ==
(id / scalarsubquery()): double
Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
:  +- SubqueryAlias subquery#9
:     +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L]
:        +- SubqueryAlias range
:           +- Range 0, 100, 1, 4, [id#0L]
+- Filter (id#0L > subquery#8)
   :  +- SubqueryAlias subquery#8
   :     +- GlobalLimit 1
   :        +- LocalLimit 1
   :           +- Project [id#0L]
   :              +- SubqueryAlias range
   :                 +- Range 0, 100, 1, 4, [id#0L]
   +- SubqueryAlias range
      +- Range 0, 100, 1, 4, [id#0L]

== Optimized Logical Plan ==
Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
:  +- SubqueryAlias subquery#9
:     +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L]
:        +- Range 0, 100, 1, 4, [id#0L]
+- Filter (id#0L > subquery#8)
   :  +- SubqueryAlias subquery#8
   :     +- GlobalLimit 1
   :        +- LocalLimit 1
   :           +- Project [id#0L]
   :              +- Range 0, 100, 1, 4, [id#0L]
   +- Range 0, 100, 1, 4, [id#0L]

== Physical Plan ==
WholeStageCodegen
:  +- Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
:     :  +- Subquery subquery#9
:     :     +- WholeStageCodegen
:     :        :  +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[sum(id)#10L])
:     :        :     +- INPUT
:     :        +- Exchange SinglePartition, None
:     :           +- WholeStageCodegen
:     :              :  +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[sum#14L])
:     :              :     +- Range 0, 1, 4, 100, [id#0L]
:     +- Filter (id#0L > subquery#8)
:        :  +- Subquery subquery#8
:        :     +- CollectLimit 1
:        :        +- WholeStageCodegen
:        :           :  +- Project [id#0L]
:        :           :     +- Range 0, 1, 4, 100, [id#0L]
:        +- Range 0, 1, 4, 100, [id#0L]
```

The web UI looks like:

![subquery](https://cloud.githubusercontent.com/assets/40902/13377963/932bcbae-dda7-11e5-82f7-03c9be85d77c.png)

This PR also change the tree structure of WholeStageCodegen to make it consistent than others. Before this change, Both WholeStageCodegen and InputAdapter hold a references to the same plans, those could be updated without notify another, causing problems, this is discovered by #11403 .

## How was this patch tested?

Existing tests, also manual tests with the example query, check the explain and web UI.

Author: Davies Liu <davies@databricks.com>

Closes #11417 from davies/viz_subquery.
  • Loading branch information
Davies Liu authored and Arun Allamsetty committed Apr 21, 2016
1 parent 7ecf144 commit 7004382
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,12 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy

override def simpleString: String = statePrefix + super.simpleString

override def treeChildren: Seq[PlanType] = {
val subqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e})
children ++ subqueries.map(e => e.plan.asInstanceOf[PlanType])
/**
* All the subqueries of current plan.
*/
def subqueries: Seq[PlanType] = {
expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]})
}

override def innerChildren: Seq[PlanType] = subqueries
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,52 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {

/**
* All the nodes that will be used to generate tree string.
*
* For example:
*
* WholeStageCodegen
* +-- SortMergeJoin
* |-- InputAdapter
* | +-- Sort
* +-- InputAdapter
* +-- Sort
*
* the treeChildren of WholeStageCodegen will be Seq(Sort, Sort), it will generate a tree string
* like this:
*
* WholeStageCodegen
* : +- SortMergeJoin
* : :- INPUT
* : :- INPUT
* :- Sort
* :- Sort
*/
protected def treeChildren: Seq[BaseType] = children

/**
* All the nodes that are parts of this node.
*
* For example:
*
* WholeStageCodegen
* +- SortMergeJoin
* |-- InputAdapter
* | +-- Sort
* +-- InputAdapter
* +-- Sort
*
* the innerChildren of WholeStageCodegen will be Seq(SortMergeJoin), it will generate a tree
* string like this:
*
* WholeStageCodegen
* : +- SortMergeJoin
* : :- INPUT
* : :- INPUT
* :- Sort
* :- Sort
*/
protected def innerChildren: Seq[BaseType] = Nil

/**
* Appends the string represent of this node and its children to the given StringBuilder.
*
Expand All @@ -472,6 +515,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
builder.append(simpleString)
builder.append("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ false :+ false, builder))
innerChildren.last.generateTreeString(depth + 2, lastChildren :+ false :+ true, builder)
}

if (treeChildren.nonEmpty) {
treeChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
treeChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ class SparkPlanInfo(
private[sql] object SparkPlanInfo {

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case WholeStageCodegen(child, _) => child :: Nil
case InputAdapter(child) => child :: Nil
case plan => plan.children
}

val children = plan.children ++ plan.subqueries
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
Expand All @@ -29,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, BuildLeft, BuildRight, SortMergeJoin}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
import org.apache.spark.sql.execution.metric.LongSQLMetricValue

/**
Expand Down Expand Up @@ -163,16 +161,12 @@ trait CodegenSupport extends SparkPlan {
* This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
* an RDD iterator of InternalRow.
*/
case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport {

override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def doPrepare(): Unit = {
child.prepare()
}

override def doExecute(): RDD[InternalRow] = {
child.execute()
}
Expand All @@ -181,8 +175,6 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
child.doExecuteBroadcast()
}

override def supportCodegen: Boolean = false

override def upstreams(): Seq[RDD[InternalRow]] = {
child.execute() :: Nil
}
Expand Down Expand Up @@ -210,6 +202,8 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
}

override def simpleString: String = "INPUT"

override def treeChildren: Seq[SparkPlan] = Nil
}

/**
Expand Down Expand Up @@ -243,30 +237,23 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
* doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
* used to generated code for BoundReference.
*/
case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
extends SparkPlan with CodegenSupport {

override def supportCodegen: Boolean = false

override def output: Seq[Attribute] = plan.output
override def outputPartitioning: Partitioning = plan.outputPartitioning
override def outputOrdering: Seq[SortOrder] = plan.outputOrdering
case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSupport {

override def doPrepare(): Unit = {
plan.prepare()
}
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def doExecute(): RDD[InternalRow] = {
val ctx = new CodegenContext
val code = plan.produce(ctx, this)
val code = child.asInstanceOf[CodegenSupport].produce(ctx, this)
val references = ctx.references.toArray
val source = s"""
public Object generate(Object[] references) {
return new GeneratedIterator(references);
}

/** Codegened pipeline for:
* ${toCommentSafeString(plan.treeString.trim)}
* ${toCommentSafeString(child.treeString.trim)}
*/
class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {

Expand Down Expand Up @@ -294,7 +281,7 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
// println(s"${CodeFormatter.format(cleanedSource)}")
CodeGenerator.compile(cleanedSource)

val rdds = plan.upstreams()
val rdds = child.asInstanceOf[CodegenSupport].upstreams()
assert(rdds.size <= 2, "Up to two upstream RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitions { iter =>
Expand Down Expand Up @@ -361,34 +348,17 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
}
}

private[sql] override def resetMetrics(): Unit = {
plan.foreach(_.resetMetrics())
override def innerChildren: Seq[SparkPlan] = {
child :: Nil
}

override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
builder: StringBuilder): StringBuilder = {
if (depth > 0) {
lastChildren.init.foreach { isLast =>
val prefixFragment = if (isLast) " " else ": "
builder.append(prefixFragment)
}

val branch = if (lastChildren.last) "+- " else ":- "
builder.append(branch)
}

builder.append(simpleString)
builder.append("\n")

plan.generateTreeString(depth + 2, lastChildren :+ false :+ true, builder)
if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
children.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
}
private def collectInputs(plan: SparkPlan): Seq[SparkPlan] = plan match {
case InputAdapter(c) => c :: Nil
case other => other.children.flatMap(collectInputs)
}

builder
override def treeChildren: Seq[SparkPlan] = {
collectInputs(child)
}

override def simpleString: String = "WholeStageCodegen"
Expand Down Expand Up @@ -416,27 +386,34 @@ private[sql] case class CollapseCodegenStages(sqlContext: SQLContext) extends Ru
case _ => false
}

/**
* Inserts a InputAdapter on top of those that do not support codegen.
*/
private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
case j @ SortMergeJoin(_, _, _, left, right) =>
// The children of SortMergeJoin should do codegen separately.
j.copy(left = InputAdapter(insertWholeStageCodegen(left)),
right = InputAdapter(insertWholeStageCodegen(right)))
case p if !supportCodegen(p) =>
// collapse them recursively
InputAdapter(insertWholeStageCodegen(p))
case p =>
p.withNewChildren(p.children.map(insertInputAdapter))
}

/**
* Inserts a WholeStageCodegen on top of those that support codegen.
*/
private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match {
case plan: CodegenSupport if supportCodegen(plan) =>
WholeStageCodegen(insertInputAdapter(plan))
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}

def apply(plan: SparkPlan): SparkPlan = {
if (sqlContext.conf.wholeStageEnabled) {
plan.transform {
case plan: CodegenSupport if supportCodegen(plan) =>
var inputs = ArrayBuffer[SparkPlan]()
val combined = plan.transform {
// The build side can't be compiled together
case b @ BroadcastHashJoin(_, _, _, BuildLeft, _, left, right) =>
b.copy(left = apply(left))
case b @ BroadcastHashJoin(_, _, _, BuildRight, _, left, right) =>
b.copy(right = apply(right))
case j @ SortMergeJoin(_, _, _, left, right) =>
// The children of SortMergeJoin should do codegen separately.
j.copy(left = apply(left), right = apply(right))
case p if !supportCodegen(p) =>
val input = apply(p) // collapse them recursively
inputs += input
InputAdapter(input)
}.asInstanceOf[CodegenSupport]
WholeStageCodegen(combined, inputs)
}
insertWholeStageCodegen(plan)
} else {
plan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -68,7 +69,7 @@ package object debug {
}
}

private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode with CodegenSupport {
def output: Seq[Attribute] = child.output

implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
Expand All @@ -86,10 +87,11 @@ package object debug {
/**
* A collection of metrics for each column of output.
* @param elementTypes the actual runtime types for the output. Useful when there are bugs
* causing the wrong data to be projected.
* causing the wrong data to be projected.
*/
case class ColumnMetrics(
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))

val tupleCount: Accumulator[Int] = sparkContext.accumulator[Int](0)

val numColumns: Int = child.output.size
Expand All @@ -98,7 +100,7 @@ package object debug {
def dumpStats(): Unit = {
logDebug(s"== ${child.simpleString} ==")
logDebug(s"Tuples output: ${tupleCount.value}")
child.output.zip(columnStats).foreach { case(attr, metric) =>
child.output.zip(columnStats).foreach { case (attr, metric) =>
val actualDataTypes = metric.elementTypes.value.mkString("{", ",", "}")
logDebug(s" ${attr.name} ${attr.dataType}: $actualDataTypes")
}
Expand All @@ -108,6 +110,7 @@ package object debug {
child.execute().mapPartitions { iter =>
new Iterator[InternalRow] {
def hasNext: Boolean = iter.hasNext

def next(): InternalRow = {
val currentRow = iter.next()
tupleCount += 1
Expand All @@ -124,5 +127,17 @@ package object debug {
}
}
}

override def upstreams(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].upstreams()
}

override def doProduce(ctx: CodegenContext): String = {
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
consume(ctx, input)
}
}
}
Loading

0 comments on commit 7004382

Please sign in to comment.