diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 64823a9eaec1d..4632957e7afdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -282,6 +282,10 @@ abstract class Expression extends TreeNode[Expression] { val childrenSQL = children.map(_.sql).mkString(", ") s"$prettyName($childrenSQL)" } + + override def simpleStringWithNodeId(): String = { + throw new UnsupportedOperationException(s"$nodeName does not implement simpleStringWithNodeId") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala index c3b79900d308d..3bb3c602f775b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala @@ -198,6 +198,9 @@ trait Block extends TreeNode[Block] with JavaCode { } override def verboseString(maxFields: Int): String = toString + override def simpleStringWithNodeId(): String = { + throw new UnsupportedOperationException(s"$nodeName does not implement simpleStringWithNodeId") + } } object Block { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index e88405f2b54c1..c5abb6378ff7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} +import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag} +import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} @@ -179,6 +180,20 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT override def verboseString(maxFields: Int): String = simpleString(maxFields) + override def simpleStringWithNodeId(): String = { + val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown") + s"$nodeName ($operatorId)".trim + } + + def verboseStringWithOperatorId(): String = { + val codegenIdStr = + getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s"[codegen id : $id]").getOrElse("") + val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown") + s""" + |($operatorId) $nodeName $codegenIdStr + """.stripMargin + } + /** * All the subqueries of current plan. */ @@ -204,7 +219,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT subqueries ++ subqueries.flatMap(_.subqueriesAll) } - override protected def innerChildren: Seq[QueryPlan[_]] = subqueries + override def innerChildren: Seq[QueryPlan[_]] = subqueries /** * A private mutable variable to indicate whether this plan is the result of canonicalization. @@ -289,6 +304,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } object QueryPlan extends PredicateHelper { + val OP_ID_TAG = TreeNodeTag[Int]("operatorId") + val CODEGEN_ID_TAG = new TreeNodeTag[Int]("wholeStageCodegenId") + /** * Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference` * with its referenced ordinal from input attributes. It's similar to `BindReferences` but we @@ -335,9 +353,10 @@ object QueryPlan extends PredicateHelper { append: String => Unit, verbose: Boolean, addSuffix: Boolean, - maxFields: Int = SQLConf.get.maxToStringFields): Unit = { + maxFields: Int = SQLConf.get.maxToStringFields, + printOperatorId: Boolean = false): Unit = { try { - plan.treeString(append, verbose, addSuffix, maxFields) + plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId) } catch { case e: AnalysisException => append(e.toString) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 9a28ae201222a..00fb81e361fdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString @@ -530,9 +531,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { * @param maxFields Maximum number of fields that will be converted to strings. * Any elements beyond the limit will be dropped. */ - def simpleString(maxFields: Int): String = { - s"$nodeName ${argString(maxFields)}".trim - } + def simpleString(maxFields: Int): String = s"$nodeName ${argString(maxFields)}".trim + + /** + * ONE line description of this node containing the node identifier. + * @return + */ + def simpleStringWithNodeId(): String /** ONE line description of this node with more information */ def verboseString(maxFields: Int): String @@ -548,10 +553,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { final def treeString( verbose: Boolean, addSuffix: Boolean = false, - maxFields: Int = SQLConf.get.maxToStringFields): String = { + maxFields: Int = SQLConf.get.maxToStringFields, + printOperatorId: Boolean = false): String = { val concat = new PlanStringConcat() - - treeString(concat.append, verbose, addSuffix, maxFields) + treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId) concat.toString } @@ -559,8 +564,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { append: String => Unit, verbose: Boolean, addSuffix: Boolean, - maxFields: Int): Unit = { - generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields) + maxFields: Int, + printOperatorId: Boolean): Unit = { + generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields, printOperatorId) } /** @@ -609,7 +615,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { * All the nodes that should be shown as a inner nested tree of this node. * For example, this can be used to show sub-queries. */ - protected def innerChildren: Seq[TreeNode[_]] = Seq.empty + def innerChildren: Seq[TreeNode[_]] = Seq.empty /** * Appends the string representation of this node and its children to the given Writer. @@ -627,7 +633,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int, + printNodeId: Boolean): Unit = { if (depth > 0) { lastChildren.init.foreach { isLast => @@ -639,7 +646,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { val str = if (verbose) { if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields) } else { - simpleString(maxFields) + if (printNodeId) { + simpleStringWithNodeId() + } else { + simpleString(maxFields) + } } append(prefix) append(str) @@ -648,17 +659,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { if (innerChildren.nonEmpty) { innerChildren.init.foreach(_.generateTreeString( depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose, - addSuffix = addSuffix, maxFields = maxFields)) + addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId)) innerChildren.last.generateTreeString( depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose, - addSuffix = addSuffix, maxFields = maxFields) + addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId) } if (children.nonEmpty) { children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields)) + depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, + maxFields, printNodeId = printNodeId) + ) children.last.generateTreeString( - depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields) + depth + 1, lastChildren :+ true, append, verbose, prefix, + addSuffix, maxFields, printNodeId = printNodeId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala new file mode 100644 index 0000000000000..18a7f9822dcbc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.trees.TreeNodeTag + +object ExplainUtils { + /** + * Given a input physical plan, performs the following tasks. + * 1. Computes the operator id for current operator and records it in the operaror + * by setting a tag. + * 2. Computes the whole stage codegen id for current operator and records it in the + * operator by setting a tag. + * 3. Generate the two part explain output for this plan. + * 1. First part explains the operator tree with each operator tagged with an unique + * identifier. + * 2. Second part explans each operator in a verbose manner. + * + * Note : This function skips over subqueries. They are handled by its caller. + * + * @param plan Input query plan to process + * @param append function used to append the explain output + * @param startOperationID The start value of operation id. The subsequent operations will + * be assigned higher value. + * + * @return The last generated operation id for this input plan. This is to ensure we + * always assign incrementing unique id to each operator. + * + */ + private def processPlanSkippingSubqueries[T <: QueryPlan[T]]( + plan: => QueryPlan[T], + append: String => Unit, + startOperatorID: Int): Int = { + + val operationIDs = new mutable.ArrayBuffer[(Int, QueryPlan[_])]() + var currentOperatorID = startOperatorID + try { + currentOperatorID = generateOperatorIDs(plan, currentOperatorID, operationIDs) + generateWholeStageCodegenIds(plan) + + QueryPlan.append( + plan, + append, + verbose = false, + addSuffix = false, + printOperatorId = true) + + append("\n") + var i: Integer = 0 + for ((opId, curPlan) <- operationIDs) { + append(curPlan.verboseStringWithOperatorId()) + } + } catch { + case e: AnalysisException => append(e.toString) + } + currentOperatorID + } + + /** + * Given a input physical plan, performs the following tasks. + * 1. Generates the explain output for the input plan excluding the subquery plans. + * 2. Generates the explain output for each subquery referenced in the plan. + */ + def processPlan[T <: QueryPlan[T]]( + plan: => QueryPlan[T], + append: String => Unit): Unit = { + try { + val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)] + var currentOperatorID = 0 + currentOperatorID = processPlanSkippingSubqueries(plan, append, currentOperatorID) + getSubqueries(plan, subqueries) + var i = 0 + + for (sub <- subqueries) { + if (i == 0) { + append("\n===== Subqueries =====\n\n") + } + i = i + 1 + append(s"Subquery:$i Hosting operator id = " + + s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n") + + // For each subquery expression in the parent plan, process its child plan to compute + // the explain output. In case of subquery reuse, we don't print subquery plan more + // than once. So we skip [[ReusedSubqueryExec]] here. + if (!sub._3.isInstanceOf[ReusedSubqueryExec]) { + currentOperatorID = processPlanSkippingSubqueries( + sub._3.child, + append, + currentOperatorID) + } + append("\n") + } + } finally { + removeTags(plan) + } + } + + /** + * Traverses the supplied input plan in a bottem-up fashion does the following : + * 1. produces a map : operator identifier -> operator + * 2. Records the operator id via setting a tag in the operator. + * Note : + * 1. Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't + * appear in the explain output. + * 2. operator identifier starts at startOperatorID + 1 + * @param plan Input query plan to process + * @param startOperationID The start value of operation id. The subsequent operations will + * be assigned higher value. + * @param operatorIDs A output parameter that contains a map of operator id and query plan. This + * is used by caller to print the detail portion of the plan. + * @return The last generated operation id for this input plan. This is to ensure we + * always assign incrementing unique id to each operator. + */ + private def generateOperatorIDs( + plan: QueryPlan[_], + startOperatorID: Int, + operatorIDs: mutable.ArrayBuffer[(Int, QueryPlan[_])]): Int = { + var currentOperationID = startOperatorID + // Skip the subqueries as they are not printed as part of main query block. + if (plan.isInstanceOf[BaseSubqueryExec]) { + return currentOperationID + } + plan.foreachUp { + case p: WholeStageCodegenExec => + case p: InputAdapter => + case other: QueryPlan[_] => + if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) { + currentOperationID += 1 + other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID) + operatorIDs += ((currentOperationID, other)) + } + other.innerChildren.foreach { plan => + currentOperationID = generateOperatorIDs(plan, + currentOperationID, + operatorIDs) + } + } + currentOperationID + } + + /** + * Traverses the supplied input plan in a top-down fashion and records the + * whole stage code gen id in the plan via setting a tag. + */ + private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = { + // Skip the subqueries as they are not printed as part of main query block. + if (plan.isInstanceOf[BaseSubqueryExec]) { + return + } + var currentCodegenId = -1 + plan.foreach { + case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId + case p: InputAdapter => currentCodegenId = -1 + case other: QueryPlan[_] => + if (currentCodegenId != -1) { + other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId) + } + other.innerChildren.foreach { plan => + generateWholeStageCodegenIds(plan) + } + } + } + + /** + * Given a input plan, returns an array of tuples comprising of : + * 1. Hosting opeator id. + * 2. Hosting expression + * 3. Subquery plan + */ + private def getSubqueries( + plan: => QueryPlan[_], + subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = { + plan.foreach { + case p: SparkPlan => + p.expressions.flatMap(_.collect { + case e: PlanExpression[_] => + e.plan match { + case s: BaseSubqueryExec => + subqueries += ((p, e, s)) + getSubqueries(s, subqueries) + } + case other => + }) + } + } + + /** + * Returns the operator identifier for the supplied plan by retrieving the + * `operationId` tag value.` + */ + def getOpId(plan: QueryPlan[_]): String = { + plan.getTagValue(QueryPlan.OP_ID_TAG).map(v => s"$v").getOrElse("unknown") + } + + /** + * Returns the operator identifier for the supplied plan by retrieving the + * `codegenId` tag value.` + */ + def getCodegenId(plan: QueryPlan[_]): String = { + plan.getTagValue(QueryPlan.CODEGEN_ID_TAG).map(v => s"[codegen id : $v]").getOrElse("") + } + + def removeTags(plan: QueryPlan[_]): Unit = { + plan foreach { + case plan: QueryPlan[_] => + plan.unsetTagValue(QueryPlan.OP_ID_TAG) + plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG) + plan.innerChildren.foreach { p => + removeTags(p) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 5f67b17f8e6fe..68e5b534e9215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule @@ -127,10 +128,16 @@ class QueryExecution( ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) - def simpleString: String = withRedaction { + def simpleString: String = simpleString(false) + + def simpleString(formatted: Boolean): String = withRedaction { val concat = new PlanStringConcat() concat.append("== Physical Plan ==\n") - QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) + if (formatted) { + ExplainUtils.processPlan(executedPlan, concat.append) + } else { + QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) + } concat.append("\n") concat.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ba89ba75767cf..b4cdf9e16b7e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionContext import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.InternalCompilerException @@ -511,6 +512,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ trait LeafExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = Nil override def producedAttributes: AttributeSet = outputSet + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Output: ${producedAttributes.mkString("[", ", ", "]")} + """.stripMargin + } } object UnaryExecNode { @@ -524,6 +531,12 @@ trait UnaryExecNode extends SparkPlan { def child: SparkPlan override final def children: Seq[SparkPlan] = child :: Nil + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Input: ${child.output.mkString("[", ", ", "]")} + """.stripMargin + } } trait BinaryExecNode extends SparkPlan { @@ -531,4 +544,11 @@ trait BinaryExecNode extends SparkPlan { def right: SparkPlan override final def children: Seq[SparkPlan] = Seq(left, right) + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Left output: ${left.output.mkString("[", ", ", "]")} + |Right output: ${right.output.mkString("[", ", ", "]")} + """.stripMargin + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8fc4afbb0db3f..83cdc7a978a9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -289,13 +289,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create an [[ExplainCommand]] logical plan. * The syntax of using this command in SQL is: * {{{ - * EXPLAIN (EXTENDED | CODEGEN) SELECT * FROM ... + * EXPLAIN (EXTENDED | CODEGEN | COST | FORMATTED) SELECT * FROM ... * }}} */ override def visitExplain(ctx: ExplainContext): LogicalPlan = withOrigin(ctx) { - if (ctx.FORMATTED != null) { - operationNotAllowed("EXPLAIN FORMATTED", ctx) - } if (ctx.LOGICAL != null) { operationNotAllowed("EXPLAIN LOGICAL", ctx) } @@ -308,7 +305,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { logicalPlan = statement, extended = ctx.EXTENDED != null, codegen = ctx.CODEGEN != null, - cost = ctx.COST != null) + cost = ctx.COST != null, + formatted = ctx.FORMATTED != null) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 5fda272ce21a3..ce9a6ea319d5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.aggregate.HashAggregateExec @@ -535,7 +536,8 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int, + printNodeId: Boolean): Unit = { child.generateTreeString( depth, lastChildren, @@ -543,7 +545,8 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod verbose, prefix = "", addSuffix = false, - maxFields) + maxFields, + printNodeId) } override def needCopyResult: Boolean = false @@ -776,15 +779,17 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int, + printNodeId: Boolean): Unit = { child.generateTreeString( depth, lastChildren, append, verbose, - s"*($codegenStageId) ", + if (printNodeId) "* " else s"*($codegenStageId) ", false, - maxFields) + maxFields, + printNodeId) } override def needStopCheck: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 1f269d23c5d68..5d92ddad887bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreeNodeTag @@ -226,10 +227,25 @@ case class AdaptiveSparkPlanExec( verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { - super.generateTreeString(depth, lastChildren, append, verbose, prefix, addSuffix, maxFields) + maxFields: Int, + printNodeId: Boolean): Unit = { + super.generateTreeString(depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId) currentPhysicalPlan.generateTreeString( - depth + 1, lastChildren :+ true, append, verbose, "", addSuffix = false, maxFields) + depth + 1, + lastChildren :+ true, + append, + verbose, + "", + addSuffix = false, + maxFields, + printNodeId) } override def hashCode(): Int = initialPlan.hashCode() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index c803ca3638b4a..231fffce3360b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.adaptive +import scala.collection.mutable import scala.concurrent.Future import org.apache.spark.{FutureAction, MapOutputStatistics} @@ -24,6 +25,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ @@ -107,10 +109,18 @@ abstract class QueryStageExec extends LeafExecNode { verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { - super.generateTreeString(depth, lastChildren, append, verbose, prefix, addSuffix, maxFields) + maxFields: Int, + printNodeId: Boolean): Unit = { + super.generateTreeString(depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId) plan.generateTreeString( - depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields) + depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, printNodeId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index b74dd95d1d8f9..b072a7f5d914c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.{LongType, StructType} @@ -80,6 +81,14 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = child.outputPartitioning + + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Output : ${projectList.mkString("[", ", ", "]")} + |Input : ${child.output.mkString("[", ", ", "]")} + """.stripMargin + } } @@ -226,6 +235,14 @@ case class FilterExec(condition: Expression, child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = child.outputPartitioning + + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Input : ${child.output.mkString("[", ", ", "]")} + |Condition : ${condition} + """.stripMargin + } } /** @@ -682,6 +699,34 @@ abstract class BaseSubqueryExec extends SparkPlan { override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def generateTreeString( + depth: Int, + lastChildren: Seq[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean): Unit = { + /** + * In the new explain mode `EXPLAIN FORMATTED`, the subqueries are not shown in the + * main plan and are printed separately along with correlation information with + * its parent plan. The condition below makes sure that subquery plans are + * excluded from the main plan. + */ + if (!printNodeId) { + super.generateTreeString( + depth, + lastChildren, + append, + verbose, + "", + false, + maxFields, + printNodeId) + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 7a881ad878c18..c8fa07941af87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -179,7 +179,7 @@ case class InMemoryRelation( @volatile var statsOfPlanToCache: Statistics = null - override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan) + override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan) override def doCanonicalize(): logical.LogicalPlan = copy(output = output.map(QueryPlan.normalizeExpressions(_, cachedPlan.output)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index f14b5d7965219..8d13cfb93d270 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -49,7 +49,7 @@ case class InMemoryTableScanExec( } } - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren + override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren override def doCanonicalize(): SparkPlan = copy(attributes = attributes.map(QueryPlan.normalizeExpressions(_, relation.output)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 1dc24b3d221cf..08d31fdda2dc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -44,7 +44,7 @@ case class InsertIntoDataSourceDirCommand( query: LogicalPlan, overwrite: Boolean) extends RunnableCommand { - override protected def innerChildren: Seq[LogicalPlan] = query :: Nil + override def innerChildren: Seq[LogicalPlan] = query :: Nil override def run(sparkSession: SparkSession): Seq[Row] = { assert(storage.locationUri.nonEmpty, "Directory path is required") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 7b00769308a41..ef6b0bba1628e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -35,7 +35,7 @@ case class CacheTableCommand( require(plan.isEmpty || tableIdent.database.isEmpty, "Database name is not allowed in CACHE TABLE AS SELECT") - override protected def innerChildren: Seq[QueryPlan[_]] = plan.toSeq + override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq override def run(sparkSession: SparkSession): Seq[Row] = { plan.foreach { logicalPlan => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index bb8a982f0d317..f7d4fa4c4ffcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -70,7 +70,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) } - override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil + override def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil override def output: Seq[Attribute] = cmd.output @@ -143,7 +143,8 @@ case class ExplainCommand( logicalPlan: LogicalPlan, extended: Boolean = false, codegen: Boolean = false, - cost: Boolean = false) + cost: Boolean = false, + formatted: Boolean = false) extends RunnableCommand { override val output: Seq[Attribute] = @@ -160,6 +161,8 @@ case class ExplainCommand( queryExecution.toString } else if (cost) { queryExecution.stringWithStats + } else if (formatted) { + queryExecution.simpleString(formatted = true) } else { queryExecution.simpleString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 4d3eb11250c3f..2617add6ffded 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -98,7 +98,7 @@ case class CreateViewCommand( import ViewHelper._ - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) + override def innerChildren: Seq[QueryPlan[_]] = Seq(child) if (viewType == PersistedView) { require(originalText.isDefined, "'originalText' must be provided to create permanent view") @@ -271,7 +271,7 @@ case class AlterViewAsCommand( import ViewHelper._ - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(session: SparkSession): Seq[Row] = { // If the plan cannot be analyzed, throw an exception and don't proceed. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 80d7608a22891..bd9cc0e44fca3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -33,7 +33,7 @@ case class InsertIntoDataSourceCommand( overwrite: Boolean) extends RunnableCommand { - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index a1de287b93f9d..49e77f618f2a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -39,7 +39,7 @@ case class SaveIntoDataSourceCommand( options: Map[String, String], mode: SaveMode) extends RunnableCommand { - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { dataSource.createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 153645decdcf8..3315ae7dabef1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SparkPlan, UnaryExecNode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -86,6 +86,15 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan override def outputOrdering: Seq[SortOrder] = { child.outputOrdering.map(updateAttr(_).asInstanceOf[SortOrder]) } + + override def verboseStringWithOperatorId(): String = { + val cdgen = ExplainUtils.getCodegenId(this) + val reuse_op_str = ExplainUtils.getOpId(child) + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${cdgen} [Reuses operator id: $reuse_op_str] + |Output : ${output} + """.stripMargin + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 4d261dd422bc5..88d98530991c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -22,7 +22,8 @@ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner -import org.apache.spark.sql.execution.{BinaryExecNode, ExternalAppendOnlyUnsafeRowArray, SparkPlan} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils, ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.CompletionIterator @@ -66,6 +67,17 @@ case class CartesianProductExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override def verboseStringWithOperatorId(): String = { + val joinCondStr = if (condition.isDefined) { + s"${condition.get}" + } else "None" + + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Join condition: ${joinCondStr} + """.stripMargin + } + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 5ee4c7ffb1911..3788418f8aafe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{RowIterator, SparkPlan} +import org.apache.spark.sql.execution.{ExplainUtils, RowIterator, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{IntegralType, LongType} @@ -37,6 +37,24 @@ trait HashJoin { val left: SparkPlan val right: SparkPlan + override def simpleStringWithNodeId(): String = { + val opId = ExplainUtils.getOpId(this) + s"$nodeName $joinType ${buildSide} ($opId)".trim + } + + override def verboseStringWithOperatorId(): String = { + val joinCondStr = if (condition.isDefined) { + s"${condition.get}" + } else "None" + + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Left keys: ${leftKeys} + |Right keys: ${rightKeys} + |Join condition: ${joinCondStr} + """.stripMargin + } + override def output: Seq[Attribute] = { joinType match { case _: InnerLike => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index f829f07e80720..189727a9bc88d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -45,6 +45,23 @@ case class SortMergeJoinExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override def simpleStringWithNodeId(): String = { + val opId = ExplainUtils.getOpId(this) + s"$nodeName $joinType ($opId)".trim + } + + override def verboseStringWithOperatorId(): String = { + val joinCondStr = if (condition.isDefined) { + s"${condition.get}" + } else "None" + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Left keys : ${leftKeys} + |Right keys: ${rightKeys} + |Join condition : ${joinCondStr} + """.stripMargin + } + override def output: Seq[Attribute] = { joinType match { case _: InnerLike => diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql b/sql/core/src/test/resources/sql-tests/inputs/explain.sql new file mode 100644 index 0000000000000..773c123992f71 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql @@ -0,0 +1,96 @@ +-- Test tables +CREATE table explain_temp1 (key int, val int) USING PARQUET; +CREATE table explain_temp2 (key int, val int) USING PARQUET; +CREATE table explain_temp3 (key int, val int) USING PARQUET; + +SET spark.sql.codegen.wholeStage = true; + +-- single table +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + ORDER BY key; + +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + HAVING max(val) > 0; + +-- simple union +EXPLAIN FORMATTED + SELECT key, val FROM explain_temp1 WHERE key > 0 + UNION + SELECT key, val FROM explain_temp1 WHERE key > 0; + +-- Join +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a, + explain_temp2 b + WHERE a.key = b.key; + +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a + LEFT OUTER JOIN explain_temp2 b + ON a.key = b.key; + +-- Subqueries nested. +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0) + AND val = 2) + AND val > 3; + +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE val > 0) + OR + key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0); + +-- Reuse subquery +EXPLAIN FORMATTED + SELECT (SELECT Avg(key) FROM explain_temp1) + (SELECT Avg(key) FROM explain_temp1) + FROM explain_temp1; + +-- CTE + ReuseExchange +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT * + FROM explain_temp1 + WHERE key > 10 + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key; + +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 10 + GROUP BY key + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key; + +-- A spark plan which has innerChildren other than subquery +EXPLAIN FORMATTED + CREATE VIEW explain_view AS + SELECT key, val FROM explain_temp1; + +-- cleanup +DROP TABLE explain_temp1; +DROP TABLE explain_temp2; +DROP TABLE explain_temp3; diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out new file mode 100644 index 0000000000000..4a08cfada292d --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -0,0 +1,738 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 18 + + +-- !query 0 +CREATE table explain_temp1 (key int, val int) USING PARQUET +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE table explain_temp2 (key int, val int) USING PARQUET +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +CREATE table explain_temp3 (key int, val int) USING PARQUET +-- !query 2 schema +struct<> +-- !query 2 output + + + +-- !query 3 +SET spark.sql.codegen.wholeStage = true +-- !query 3 schema +struct +-- !query 3 output +spark.sql.codegen.wholeStage true + + +-- !query 4 +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + ORDER BY key +-- !query 4 schema +struct +-- !query 4 output +== Physical Plan == +* Sort (9) ++- Exchange (8) + +- * HashAggregate (7) + +- Exchange (6) + +- * HashAggregate (5) + +- * Project (4) + +- * Filter (3) + +- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) HashAggregate [codegen id : 1] +Input: [key#x, val#x] + +(6) Exchange +Input: [key#x, max#x] + +(7) HashAggregate [codegen id : 2] +Input: [key#x, max#x] + +(8) Exchange +Input: [key#x, max(val)#x] + +(9) Sort [codegen id : 3] +Input: [key#x, max(val)#x] + + +-- !query 5 +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + HAVING max(val) > 0 +-- !query 5 schema +struct +-- !query 5 output +== Physical Plan == +* Project (9) ++- * Filter (8) + +- * HashAggregate (7) + +- Exchange (6) + +- * HashAggregate (5) + +- * Project (4) + +- * Filter (3) + +- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) HashAggregate [codegen id : 1] +Input: [key#x, val#x] + +(6) Exchange +Input: [key#x, max#x] + +(7) HashAggregate [codegen id : 2] +Input: [key#x, max#x] + +(8) Filter [codegen id : 2] +Input : [key#x, max(val)#x, max(val#x)#x] +Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0)) + +(9) Project [codegen id : 2] +Output : [key#x, max(val)#x] +Input : [key#x, max(val)#x, max(val#x)#x] + + +-- !query 6 +EXPLAIN FORMATTED + SELECT key, val FROM explain_temp1 WHERE key > 0 + UNION + SELECT key, val FROM explain_temp1 WHERE key > 0 +-- !query 6 schema +struct +-- !query 6 output +== Physical Plan == +* HashAggregate (12) ++- Exchange (11) + +- * HashAggregate (10) + +- Union (9) + :- * Project (4) + : +- * Filter (3) + : +- * ColumnarToRow (2) + : +- Scan parquet default.explain_temp1 (1) + +- * Project (8) + +- * Filter (7) + +- * ColumnarToRow (6) + +- Scan parquet default.explain_temp1 (5) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(6) ColumnarToRow [codegen id : 2] +Input: [key#x, val#x] + +(7) Filter [codegen id : 2] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(8) Project [codegen id : 2] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(9) Union + +(10) HashAggregate [codegen id : 3] +Input: [key#x, val#x] + +(11) Exchange +Input: [key#x, val#x] + +(12) HashAggregate [codegen id : 4] +Input: [key#x, val#x] + + +-- !query 7 +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a, + explain_temp2 b + WHERE a.key = b.key +-- !query 7 schema +struct +-- !query 7 output +== Physical Plan == +* BroadcastHashJoin Inner BuildRight (10) +:- * Project (4) +: +- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (9) + +- * Project (8) + +- * Filter (7) + +- * ColumnarToRow (6) + +- Scan parquet default.explain_temp2 (5) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 2] +Input: [key#x, val#x] + +(3) Filter [codegen id : 2] +Input : [key#x, val#x] +Condition : isnotnull(key#x) + +(4) Project [codegen id : 2] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) Scan parquet default.explain_temp2 +Output: [key#x, val#x] + +(6) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(7) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : isnotnull(key#x) + +(8) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(9) BroadcastExchange +Input: [key#x, val#x] + +(10) BroadcastHashJoin [codegen id : 2] +Left keys: List(key#x) +Right keys: List(key#x) +Join condition: None + + +-- !query 8 +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a + LEFT OUTER JOIN explain_temp2 b + ON a.key = b.key +-- !query 8 schema +struct +-- !query 8 output +== Physical Plan == +* BroadcastHashJoin LeftOuter BuildRight (8) +:- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (7) + +- * Project (6) + +- * Filter (5) + +- * ColumnarToRow (4) + +- Scan parquet default.explain_temp2 (3) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 2] +Input: [key#x, val#x] + +(3) Scan parquet default.explain_temp2 +Output: [key#x, val#x] + +(4) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(5) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : isnotnull(key#x) + +(6) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(7) BroadcastExchange +Input: [key#x, val#x] + +(8) BroadcastHashJoin [codegen id : 2] +Left keys: List(key#x) +Right keys: List(key#x) +Join condition: None + + +-- !query 9 +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0) + AND val = 2) + AND val > 3 +-- !query 9 schema +struct +-- !query 9 output +== Physical Plan == +* Project (4) ++- * Filter (3) + +- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +===== Subqueries ===== + +Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (11) ++- Exchange (10) + +- * HashAggregate (9) + +- * Project (8) + +- * Filter (7) + +- * ColumnarToRow (6) + +- Scan parquet default.explain_temp2 (5) + + +(5) Scan parquet default.explain_temp2 +Output: [key#x, val#x] + +(6) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(7) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) + +(8) Project [codegen id : 1] +Output : [key#x] +Input : [key#x, val#x] + +(9) HashAggregate [codegen id : 1] +Input: [key#x] + +(10) Exchange +Input: [max#x] + +(11) HashAggregate [codegen id : 2] +Input: [max#x] + +Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (18) ++- Exchange (17) + +- * HashAggregate (16) + +- * Project (15) + +- * Filter (14) + +- * ColumnarToRow (13) + +- Scan parquet default.explain_temp3 (12) + + +(12) Scan parquet default.explain_temp3 +Output: [key#x, val#x] + +(13) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(14) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(15) Project [codegen id : 1] +Output : [key#x] +Input : [key#x, val#x] + +(16) HashAggregate [codegen id : 1] +Input: [key#x] + +(17) Exchange +Input: [max#x] + +(18) HashAggregate [codegen id : 2] +Input: [max#x] + + +-- !query 10 +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE val > 0) + OR + key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0) +-- !query 10 schema +struct +-- !query 10 output +== Physical Plan == +* Filter (3) ++- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (key#x = Subquery scalar-subquery#x, [id=#x])) + +===== Subqueries ===== + +Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (10) ++- Exchange (9) + +- * HashAggregate (8) + +- * Project (7) + +- * Filter (6) + +- * ColumnarToRow (5) + +- Scan parquet default.explain_temp2 (4) + + +(4) Scan parquet default.explain_temp2 +Output: [key#x, val#x] + +(5) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(6) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(7) Project [codegen id : 1] +Output : [key#x] +Input : [key#x, val#x] + +(8) HashAggregate [codegen id : 1] +Input: [key#x] + +(9) Exchange +Input: [max#x] + +(10) HashAggregate [codegen id : 2] +Input: [max#x] + +Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (17) ++- Exchange (16) + +- * HashAggregate (15) + +- * Project (14) + +- * Filter (13) + +- * ColumnarToRow (12) + +- Scan parquet default.explain_temp3 (11) + + +(11) Scan parquet default.explain_temp3 +Output: [key#x, val#x] + +(12) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(13) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(14) Project [codegen id : 1] +Output : [key#x] +Input : [key#x, val#x] + +(15) HashAggregate [codegen id : 1] +Input: [key#x] + +(16) Exchange +Input: [max#x] + +(17) HashAggregate [codegen id : 2] +Input: [max#x] + + +-- !query 11 +EXPLAIN FORMATTED + SELECT (SELECT Avg(key) FROM explain_temp1) + (SELECT Avg(key) FROM explain_temp1) + FROM explain_temp1 +-- !query 11 schema +struct +-- !query 11 output +== Physical Plan == +* Project (3) ++- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [] + +(2) ColumnarToRow [codegen id : 1] +Input: [] + +(3) Project [codegen id : 1] +Output : [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] +Input : [] + +===== Subqueries ===== + +Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (8) ++- Exchange (7) + +- * HashAggregate (6) + +- * ColumnarToRow (5) + +- Scan parquet default.explain_temp1 (4) + + +(4) Scan parquet default.explain_temp1 +Output: [key#x] + +(5) ColumnarToRow [codegen id : 1] +Input: [key#x] + +(6) HashAggregate [codegen id : 1] +Input: [key#x] + +(7) Exchange +Input: [sum#x, count#xL] + +(8) HashAggregate [codegen id : 2] +Input: [sum#x, count#xL] + +Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] + + +-- !query 12 +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT * + FROM explain_temp1 + WHERE key > 10 + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key +-- !query 12 schema +struct +-- !query 12 output +== Physical Plan == +* BroadcastHashJoin Inner BuildRight (10) +:- * Project (4) +: +- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (9) + +- * Project (8) + +- * Filter (7) + +- * ColumnarToRow (6) + +- Scan parquet default.explain_temp1 (5) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 2] +Input: [key#x, val#x] + +(3) Filter [codegen id : 2] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(4) Project [codegen id : 2] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(6) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(7) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(8) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(9) BroadcastExchange +Input: [key#x, val#x] + +(10) BroadcastHashJoin [codegen id : 2] +Left keys: List(key#x) +Right keys: List(key#x) +Join condition: None + + +-- !query 13 +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 10 + GROUP BY key + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key +-- !query 13 schema +struct +-- !query 13 output +== Physical Plan == +* BroadcastHashJoin Inner BuildRight (11) +:- * HashAggregate (7) +: +- Exchange (6) +: +- * HashAggregate (5) +: +- * Project (4) +: +- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (10) + +- * HashAggregate (9) + +- ReusedExchange (8) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) HashAggregate [codegen id : 1] +Input: [key#x, val#x] + +(6) Exchange +Input: [key#x, max#x] + +(7) HashAggregate [codegen id : 4] +Input: [key#x, max#x] + +(8) ReusedExchange [Reuses operator id: 6] +Output : ArrayBuffer(key#x, max#x) + +(9) HashAggregate [codegen id : 3] +Input: [key#x, max#x] + +(10) BroadcastExchange +Input: [key#x, max(val)#x] + +(11) BroadcastHashJoin [codegen id : 4] +Left keys: List(key#x) +Right keys: List(key#x) +Join condition: None + + +-- !query 14 +EXPLAIN FORMATTED + CREATE VIEW explain_view AS + SELECT key, val FROM explain_temp1 +-- !query 14 schema +struct +-- !query 14 output +== Physical Plan == +Execute CreateViewCommand (1) + +- CreateViewCommand (2) + +- Project (4) + +- UnresolvedRelation (3) + + +(1) Execute CreateViewCommand +Output: [] + +(2) CreateViewCommand + +(3) UnresolvedRelation + +(4) Project + + +-- !query 15 +DROP TABLE explain_temp1 +-- !query 15 schema +struct<> +-- !query 15 output + + + +-- !query 16 +DROP TABLE explain_temp2 +-- !query 16 schema +struct<> +-- !query 16 output + + + +-- !query 17 +DROP TABLE explain_temp3 +-- !query 17 schema +struct<> +-- !query 17 output +