From c61270fd74a89a39e5fbbfa9402e801ec57ce34c Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 26 Aug 2019 20:37:13 +0800 Subject: [PATCH] [SPARK-27395][SQL] Improve EXPLAIN command ## What changes were proposed in this pull request? This PR aims at improving the way physical plans are explained in spark. Currently, the explain output for physical plan may look very cluttered and each operator's string representation can be very wide and wraps around in the display making it little hard to follow. This especially happens when explaining a query 1) Operating on wide tables 2) Has complex expressions etc. This PR attempts to split the output into two sections. In the header section, we display the basic operator tree with a number associated with each operator. In this section, we strictly control what we output for each operator. In the footer section, each operator is verbosely displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be correlated by the originating expression id from its parent plan. To illustrate, here is a simple plan displayed in old vs new way. Example query1 : ``` EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0 ``` Old : ``` *(2) Project [key#2, max(val)#15] +- *(2) Filter (isnotnull(max(val#3)#18) AND (max(val#3)#18 > 0)) +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)#15, max(val#3)#18]) +- Exchange hashpartitioning(key#2, 200) +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21]) +- *(1) Project [key#2, val#3] +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0)) +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct ``` New : ``` Project (8) +- Filter (7) +- HashAggregate (6) +- Exchange (5) +- HashAggregate (4) +- Project (3) +- Filter (2) +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 [codegen id : 1] Output: [key#2, val#3] (2) Filter [codegen id : 1] Input : [key#2, val#3] Condition : (isnotnull(key#2) AND (key#2 > 0)) (3) Project [codegen id : 1] Output : [key#2, val#3] Input : [key#2, val#3] (4) HashAggregate [codegen id : 1] Input: [key#2, val#3] (5) Exchange Input: [key#2, max#11] (6) HashAggregate [codegen id : 2] Input: [key#2, max#11] (7) Filter [codegen id : 2] Input : [key#2, max(val)#5, max(val#3)#8] Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0)) (8) Project [codegen id : 2] Output : [key#2, max(val)#5] Input : [key#2, max(val)#5, max(val#3)#8] ``` Example Query2 (subquery): ``` SELECT * FROM explain_temp1 WHERE KEY = (SELECT Max(KEY) FROM explain_temp2 WHERE KEY = (SELECT Max(KEY) FROM explain_temp3 WHERE val > 0) AND val = 2) AND val > 3 ``` Old: ``` *(1) Project [key#2, val#3] +- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3)) : +- Subquery scalar-subquery#39 : +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)#45]) : +- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47]) : +- *(1) Project [key#26] : +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2)) : : +- Subquery scalar-subquery#38 : : +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)#43]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49]) : : +- *(1) Project [key#28] : : +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0)) : : +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct : +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct ``` New: ``` Project (3) +- Filter (2) +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 [codegen id : 1] Output: [key#2, val#3] (2) Filter [codegen id : 1] Input : [key#2, val#3] Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3)) (3) Project [codegen id : 1] Output : [key#2, val#3] Input : [key#2, val#3] ===== Subqueries ===== Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23 HashAggregate (9) +- Exchange (8) +- HashAggregate (7) +- Project (6) +- Filter (5) +- Scan parquet default.explain_temp2 (4) (4) Scan parquet default.explain_temp2 [codegen id : 1] Output: [key#26, val#27] (5) Filter [codegen id : 1] Input : [key#26, val#27] Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2)) (6) Project [codegen id : 1] Output : [key#26] Input : [key#26, val#27] (7) HashAggregate [codegen id : 1] Input: [key#26] (8) Exchange Input: [max#35] (9) HashAggregate [codegen id : 2] Input: [max#35] Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22 HashAggregate (15) +- Exchange (14) +- HashAggregate (13) +- Project (12) +- Filter (11) +- Scan parquet default.explain_temp3 (10) (10) Scan parquet default.explain_temp3 [codegen id : 1] Output: [key#28, val#29] (11) Filter [codegen id : 1] Input : [key#28, val#29] Condition : (isnotnull(val#29) AND (val#29 > 0)) (12) Project [codegen id : 1] Output : [key#28] Input : [key#28, val#29] (13) HashAggregate [codegen id : 1] Input: [key#28] (14) Exchange Input: [max#37] (15) HashAggregate [codegen id : 2] Input: [max#37] ``` Note: I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow would not be able to immediately incorporate the feedback. I will start to work on them as soon as i can. Also, currently this PR provides a basic infrastructure for explain enhancement. The details about individual operators will be implemented in follow-up prs ## How was this patch tested? Added a new test `explain.sql` that tests basic scenarios. Need to add more tests. Closes #24759 from dilipbiswal/explain_feature. Authored-by: Dilip Biswal Signed-off-by: Wenchen Fan --- .../sql/catalyst/expressions/Expression.scala | 4 + .../expressions/codegen/javaCode.scala | 3 + .../spark/sql/catalyst/plans/QueryPlan.scala | 27 +- .../spark/sql/catalyst/trees/TreeNode.scala | 44 +- .../spark/sql/execution/ExplainUtils.scala | 234 ++++++ .../spark/sql/execution/QueryExecution.scala | 11 +- .../spark/sql/execution/SparkPlan.scala | 20 + .../spark/sql/execution/SparkSqlParser.scala | 8 +- .../sql/execution/WholeStageCodegenExec.scala | 15 +- .../adaptive/AdaptiveSparkPlanExec.scala | 22 +- .../execution/adaptive/QueryStageExec.scala | 16 +- .../execution/basicPhysicalOperators.scala | 45 ++ .../execution/columnar/InMemoryRelation.scala | 2 +- .../columnar/InMemoryTableScanExec.scala | 2 +- .../InsertIntoDataSourceDirCommand.scala | 2 +- .../spark/sql/execution/command/cache.scala | 2 +- .../sql/execution/command/commands.scala | 7 +- .../spark/sql/execution/command/views.scala | 4 +- .../InsertIntoDataSourceCommand.scala | 2 +- .../SaveIntoDataSourceCommand.scala | 2 +- .../sql/execution/exchange/Exchange.scala | 11 +- .../joins/CartesianProductExec.scala | 14 +- .../spark/sql/execution/joins/HashJoin.scala | 20 +- .../execution/joins/SortMergeJoinExec.scala | 17 + .../resources/sql-tests/inputs/explain.sql | 96 +++ .../sql-tests/results/explain.sql.out | 738 ++++++++++++++++++ 26 files changed, 1318 insertions(+), 50 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala create mode 100644 sql/core/src/test/resources/sql-tests/inputs/explain.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/explain.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 64823a9eaec1d..4632957e7afdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -282,6 +282,10 @@ abstract class Expression extends TreeNode[Expression] { val childrenSQL = children.map(_.sql).mkString(", ") s"$prettyName($childrenSQL)" } + + override def simpleStringWithNodeId(): String = { + throw new UnsupportedOperationException(s"$nodeName does not implement simpleStringWithNodeId") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala index c3b79900d308d..3bb3c602f775b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala @@ -198,6 +198,9 @@ trait Block extends TreeNode[Block] with JavaCode { } override def verboseString(maxFields: Int): String = toString + override def simpleStringWithNodeId(): String = { + throw new UnsupportedOperationException(s"$nodeName does not implement simpleStringWithNodeId") + } } object Block { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index e88405f2b54c1..c5abb6378ff7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} +import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag} +import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} @@ -179,6 +180,20 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT override def verboseString(maxFields: Int): String = simpleString(maxFields) + override def simpleStringWithNodeId(): String = { + val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown") + s"$nodeName ($operatorId)".trim + } + + def verboseStringWithOperatorId(): String = { + val codegenIdStr = + getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s"[codegen id : $id]").getOrElse("") + val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown") + s""" + |($operatorId) $nodeName $codegenIdStr + """.stripMargin + } + /** * All the subqueries of current plan. */ @@ -204,7 +219,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT subqueries ++ subqueries.flatMap(_.subqueriesAll) } - override protected def innerChildren: Seq[QueryPlan[_]] = subqueries + override def innerChildren: Seq[QueryPlan[_]] = subqueries /** * A private mutable variable to indicate whether this plan is the result of canonicalization. @@ -289,6 +304,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } object QueryPlan extends PredicateHelper { + val OP_ID_TAG = TreeNodeTag[Int]("operatorId") + val CODEGEN_ID_TAG = new TreeNodeTag[Int]("wholeStageCodegenId") + /** * Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference` * with its referenced ordinal from input attributes. It's similar to `BindReferences` but we @@ -335,9 +353,10 @@ object QueryPlan extends PredicateHelper { append: String => Unit, verbose: Boolean, addSuffix: Boolean, - maxFields: Int = SQLConf.get.maxToStringFields): Unit = { + maxFields: Int = SQLConf.get.maxToStringFields, + printOperatorId: Boolean = false): Unit = { try { - plan.treeString(append, verbose, addSuffix, maxFields) + plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId) } catch { case e: AnalysisException => append(e.toString) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 9a28ae201222a..00fb81e361fdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString @@ -530,9 +531,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { * @param maxFields Maximum number of fields that will be converted to strings. * Any elements beyond the limit will be dropped. */ - def simpleString(maxFields: Int): String = { - s"$nodeName ${argString(maxFields)}".trim - } + def simpleString(maxFields: Int): String = s"$nodeName ${argString(maxFields)}".trim + + /** + * ONE line description of this node containing the node identifier. + * @return + */ + def simpleStringWithNodeId(): String /** ONE line description of this node with more information */ def verboseString(maxFields: Int): String @@ -548,10 +553,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { final def treeString( verbose: Boolean, addSuffix: Boolean = false, - maxFields: Int = SQLConf.get.maxToStringFields): String = { + maxFields: Int = SQLConf.get.maxToStringFields, + printOperatorId: Boolean = false): String = { val concat = new PlanStringConcat() - - treeString(concat.append, verbose, addSuffix, maxFields) + treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId) concat.toString } @@ -559,8 +564,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { append: String => Unit, verbose: Boolean, addSuffix: Boolean, - maxFields: Int): Unit = { - generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields) + maxFields: Int, + printOperatorId: Boolean): Unit = { + generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields, printOperatorId) } /** @@ -609,7 +615,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { * All the nodes that should be shown as a inner nested tree of this node. * For example, this can be used to show sub-queries. */ - protected def innerChildren: Seq[TreeNode[_]] = Seq.empty + def innerChildren: Seq[TreeNode[_]] = Seq.empty /** * Appends the string representation of this node and its children to the given Writer. @@ -627,7 +633,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int, + printNodeId: Boolean): Unit = { if (depth > 0) { lastChildren.init.foreach { isLast => @@ -639,7 +646,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { val str = if (verbose) { if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields) } else { - simpleString(maxFields) + if (printNodeId) { + simpleStringWithNodeId() + } else { + simpleString(maxFields) + } } append(prefix) append(str) @@ -648,17 +659,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { if (innerChildren.nonEmpty) { innerChildren.init.foreach(_.generateTreeString( depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose, - addSuffix = addSuffix, maxFields = maxFields)) + addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId)) innerChildren.last.generateTreeString( depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose, - addSuffix = addSuffix, maxFields = maxFields) + addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId) } if (children.nonEmpty) { children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields)) + depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, + maxFields, printNodeId = printNodeId) + ) children.last.generateTreeString( - depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields) + depth + 1, lastChildren :+ true, append, verbose, prefix, + addSuffix, maxFields, printNodeId = printNodeId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala new file mode 100644 index 0000000000000..18a7f9822dcbc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.trees.TreeNodeTag + +object ExplainUtils { + /** + * Given a input physical plan, performs the following tasks. + * 1. Computes the operator id for current operator and records it in the operaror + * by setting a tag. + * 2. Computes the whole stage codegen id for current operator and records it in the + * operator by setting a tag. + * 3. Generate the two part explain output for this plan. + * 1. First part explains the operator tree with each operator tagged with an unique + * identifier. + * 2. Second part explans each operator in a verbose manner. + * + * Note : This function skips over subqueries. They are handled by its caller. + * + * @param plan Input query plan to process + * @param append function used to append the explain output + * @param startOperationID The start value of operation id. The subsequent operations will + * be assigned higher value. + * + * @return The last generated operation id for this input plan. This is to ensure we + * always assign incrementing unique id to each operator. + * + */ + private def processPlanSkippingSubqueries[T <: QueryPlan[T]]( + plan: => QueryPlan[T], + append: String => Unit, + startOperatorID: Int): Int = { + + val operationIDs = new mutable.ArrayBuffer[(Int, QueryPlan[_])]() + var currentOperatorID = startOperatorID + try { + currentOperatorID = generateOperatorIDs(plan, currentOperatorID, operationIDs) + generateWholeStageCodegenIds(plan) + + QueryPlan.append( + plan, + append, + verbose = false, + addSuffix = false, + printOperatorId = true) + + append("\n") + var i: Integer = 0 + for ((opId, curPlan) <- operationIDs) { + append(curPlan.verboseStringWithOperatorId()) + } + } catch { + case e: AnalysisException => append(e.toString) + } + currentOperatorID + } + + /** + * Given a input physical plan, performs the following tasks. + * 1. Generates the explain output for the input plan excluding the subquery plans. + * 2. Generates the explain output for each subquery referenced in the plan. + */ + def processPlan[T <: QueryPlan[T]]( + plan: => QueryPlan[T], + append: String => Unit): Unit = { + try { + val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)] + var currentOperatorID = 0 + currentOperatorID = processPlanSkippingSubqueries(plan, append, currentOperatorID) + getSubqueries(plan, subqueries) + var i = 0 + + for (sub <- subqueries) { + if (i == 0) { + append("\n===== Subqueries =====\n\n") + } + i = i + 1 + append(s"Subquery:$i Hosting operator id = " + + s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n") + + // For each subquery expression in the parent plan, process its child plan to compute + // the explain output. In case of subquery reuse, we don't print subquery plan more + // than once. So we skip [[ReusedSubqueryExec]] here. + if (!sub._3.isInstanceOf[ReusedSubqueryExec]) { + currentOperatorID = processPlanSkippingSubqueries( + sub._3.child, + append, + currentOperatorID) + } + append("\n") + } + } finally { + removeTags(plan) + } + } + + /** + * Traverses the supplied input plan in a bottem-up fashion does the following : + * 1. produces a map : operator identifier -> operator + * 2. Records the operator id via setting a tag in the operator. + * Note : + * 1. Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't + * appear in the explain output. + * 2. operator identifier starts at startOperatorID + 1 + * @param plan Input query plan to process + * @param startOperationID The start value of operation id. The subsequent operations will + * be assigned higher value. + * @param operatorIDs A output parameter that contains a map of operator id and query plan. This + * is used by caller to print the detail portion of the plan. + * @return The last generated operation id for this input plan. This is to ensure we + * always assign incrementing unique id to each operator. + */ + private def generateOperatorIDs( + plan: QueryPlan[_], + startOperatorID: Int, + operatorIDs: mutable.ArrayBuffer[(Int, QueryPlan[_])]): Int = { + var currentOperationID = startOperatorID + // Skip the subqueries as they are not printed as part of main query block. + if (plan.isInstanceOf[BaseSubqueryExec]) { + return currentOperationID + } + plan.foreachUp { + case p: WholeStageCodegenExec => + case p: InputAdapter => + case other: QueryPlan[_] => + if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) { + currentOperationID += 1 + other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID) + operatorIDs += ((currentOperationID, other)) + } + other.innerChildren.foreach { plan => + currentOperationID = generateOperatorIDs(plan, + currentOperationID, + operatorIDs) + } + } + currentOperationID + } + + /** + * Traverses the supplied input plan in a top-down fashion and records the + * whole stage code gen id in the plan via setting a tag. + */ + private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = { + // Skip the subqueries as they are not printed as part of main query block. + if (plan.isInstanceOf[BaseSubqueryExec]) { + return + } + var currentCodegenId = -1 + plan.foreach { + case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId + case p: InputAdapter => currentCodegenId = -1 + case other: QueryPlan[_] => + if (currentCodegenId != -1) { + other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId) + } + other.innerChildren.foreach { plan => + generateWholeStageCodegenIds(plan) + } + } + } + + /** + * Given a input plan, returns an array of tuples comprising of : + * 1. Hosting opeator id. + * 2. Hosting expression + * 3. Subquery plan + */ + private def getSubqueries( + plan: => QueryPlan[_], + subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = { + plan.foreach { + case p: SparkPlan => + p.expressions.flatMap(_.collect { + case e: PlanExpression[_] => + e.plan match { + case s: BaseSubqueryExec => + subqueries += ((p, e, s)) + getSubqueries(s, subqueries) + } + case other => + }) + } + } + + /** + * Returns the operator identifier for the supplied plan by retrieving the + * `operationId` tag value.` + */ + def getOpId(plan: QueryPlan[_]): String = { + plan.getTagValue(QueryPlan.OP_ID_TAG).map(v => s"$v").getOrElse("unknown") + } + + /** + * Returns the operator identifier for the supplied plan by retrieving the + * `codegenId` tag value.` + */ + def getCodegenId(plan: QueryPlan[_]): String = { + plan.getTagValue(QueryPlan.CODEGEN_ID_TAG).map(v => s"[codegen id : $v]").getOrElse("") + } + + def removeTags(plan: QueryPlan[_]): Unit = { + plan foreach { + case plan: QueryPlan[_] => + plan.unsetTagValue(QueryPlan.OP_ID_TAG) + plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG) + plan.innerChildren.foreach { p => + removeTags(p) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 5f67b17f8e6fe..68e5b534e9215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule @@ -127,10 +128,16 @@ class QueryExecution( ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) - def simpleString: String = withRedaction { + def simpleString: String = simpleString(false) + + def simpleString(formatted: Boolean): String = withRedaction { val concat = new PlanStringConcat() concat.append("== Physical Plan ==\n") - QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) + if (formatted) { + ExplainUtils.processPlan(executedPlan, concat.append) + } else { + QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) + } concat.append("\n") concat.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ba89ba75767cf..b4cdf9e16b7e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionContext import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.InternalCompilerException @@ -511,6 +512,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ trait LeafExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = Nil override def producedAttributes: AttributeSet = outputSet + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Output: ${producedAttributes.mkString("[", ", ", "]")} + """.stripMargin + } } object UnaryExecNode { @@ -524,6 +531,12 @@ trait UnaryExecNode extends SparkPlan { def child: SparkPlan override final def children: Seq[SparkPlan] = child :: Nil + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Input: ${child.output.mkString("[", ", ", "]")} + """.stripMargin + } } trait BinaryExecNode extends SparkPlan { @@ -531,4 +544,11 @@ trait BinaryExecNode extends SparkPlan { def right: SparkPlan override final def children: Seq[SparkPlan] = Seq(left, right) + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Left output: ${left.output.mkString("[", ", ", "]")} + |Right output: ${right.output.mkString("[", ", ", "]")} + """.stripMargin + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8fc4afbb0db3f..83cdc7a978a9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -289,13 +289,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create an [[ExplainCommand]] logical plan. * The syntax of using this command in SQL is: * {{{ - * EXPLAIN (EXTENDED | CODEGEN) SELECT * FROM ... + * EXPLAIN (EXTENDED | CODEGEN | COST | FORMATTED) SELECT * FROM ... * }}} */ override def visitExplain(ctx: ExplainContext): LogicalPlan = withOrigin(ctx) { - if (ctx.FORMATTED != null) { - operationNotAllowed("EXPLAIN FORMATTED", ctx) - } if (ctx.LOGICAL != null) { operationNotAllowed("EXPLAIN LOGICAL", ctx) } @@ -308,7 +305,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { logicalPlan = statement, extended = ctx.EXTENDED != null, codegen = ctx.CODEGEN != null, - cost = ctx.COST != null) + cost = ctx.COST != null, + formatted = ctx.FORMATTED != null) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 5fda272ce21a3..ce9a6ea319d5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.aggregate.HashAggregateExec @@ -535,7 +536,8 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int, + printNodeId: Boolean): Unit = { child.generateTreeString( depth, lastChildren, @@ -543,7 +545,8 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod verbose, prefix = "", addSuffix = false, - maxFields) + maxFields, + printNodeId) } override def needCopyResult: Boolean = false @@ -776,15 +779,17 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int, + printNodeId: Boolean): Unit = { child.generateTreeString( depth, lastChildren, append, verbose, - s"*($codegenStageId) ", + if (printNodeId) "* " else s"*($codegenStageId) ", false, - maxFields) + maxFields, + printNodeId) } override def needStopCheck: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 1f269d23c5d68..5d92ddad887bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreeNodeTag @@ -226,10 +227,25 @@ case class AdaptiveSparkPlanExec( verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { - super.generateTreeString(depth, lastChildren, append, verbose, prefix, addSuffix, maxFields) + maxFields: Int, + printNodeId: Boolean): Unit = { + super.generateTreeString(depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId) currentPhysicalPlan.generateTreeString( - depth + 1, lastChildren :+ true, append, verbose, "", addSuffix = false, maxFields) + depth + 1, + lastChildren :+ true, + append, + verbose, + "", + addSuffix = false, + maxFields, + printNodeId) } override def hashCode(): Int = initialPlan.hashCode() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index c803ca3638b4a..231fffce3360b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.adaptive +import scala.collection.mutable import scala.concurrent.Future import org.apache.spark.{FutureAction, MapOutputStatistics} @@ -24,6 +25,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ @@ -107,10 +109,18 @@ abstract class QueryStageExec extends LeafExecNode { verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { - super.generateTreeString(depth, lastChildren, append, verbose, prefix, addSuffix, maxFields) + maxFields: Int, + printNodeId: Boolean): Unit = { + super.generateTreeString(depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId) plan.generateTreeString( - depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields) + depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, printNodeId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index b74dd95d1d8f9..b072a7f5d914c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.{LongType, StructType} @@ -80,6 +81,14 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = child.outputPartitioning + + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Output : ${projectList.mkString("[", ", ", "]")} + |Input : ${child.output.mkString("[", ", ", "]")} + """.stripMargin + } } @@ -226,6 +235,14 @@ case class FilterExec(condition: Expression, child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = child.outputPartitioning + + override def verboseStringWithOperatorId(): String = { + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Input : ${child.output.mkString("[", ", ", "]")} + |Condition : ${condition} + """.stripMargin + } } /** @@ -682,6 +699,34 @@ abstract class BaseSubqueryExec extends SparkPlan { override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def generateTreeString( + depth: Int, + lastChildren: Seq[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean): Unit = { + /** + * In the new explain mode `EXPLAIN FORMATTED`, the subqueries are not shown in the + * main plan and are printed separately along with correlation information with + * its parent plan. The condition below makes sure that subquery plans are + * excluded from the main plan. + */ + if (!printNodeId) { + super.generateTreeString( + depth, + lastChildren, + append, + verbose, + "", + false, + maxFields, + printNodeId) + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 7a881ad878c18..c8fa07941af87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -179,7 +179,7 @@ case class InMemoryRelation( @volatile var statsOfPlanToCache: Statistics = null - override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan) + override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan) override def doCanonicalize(): logical.LogicalPlan = copy(output = output.map(QueryPlan.normalizeExpressions(_, cachedPlan.output)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index f14b5d7965219..8d13cfb93d270 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -49,7 +49,7 @@ case class InMemoryTableScanExec( } } - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren + override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren override def doCanonicalize(): SparkPlan = copy(attributes = attributes.map(QueryPlan.normalizeExpressions(_, relation.output)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 1dc24b3d221cf..08d31fdda2dc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -44,7 +44,7 @@ case class InsertIntoDataSourceDirCommand( query: LogicalPlan, overwrite: Boolean) extends RunnableCommand { - override protected def innerChildren: Seq[LogicalPlan] = query :: Nil + override def innerChildren: Seq[LogicalPlan] = query :: Nil override def run(sparkSession: SparkSession): Seq[Row] = { assert(storage.locationUri.nonEmpty, "Directory path is required") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 7b00769308a41..ef6b0bba1628e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -35,7 +35,7 @@ case class CacheTableCommand( require(plan.isEmpty || tableIdent.database.isEmpty, "Database name is not allowed in CACHE TABLE AS SELECT") - override protected def innerChildren: Seq[QueryPlan[_]] = plan.toSeq + override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq override def run(sparkSession: SparkSession): Seq[Row] = { plan.foreach { logicalPlan => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index bb8a982f0d317..f7d4fa4c4ffcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -70,7 +70,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) } - override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil + override def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil override def output: Seq[Attribute] = cmd.output @@ -143,7 +143,8 @@ case class ExplainCommand( logicalPlan: LogicalPlan, extended: Boolean = false, codegen: Boolean = false, - cost: Boolean = false) + cost: Boolean = false, + formatted: Boolean = false) extends RunnableCommand { override val output: Seq[Attribute] = @@ -160,6 +161,8 @@ case class ExplainCommand( queryExecution.toString } else if (cost) { queryExecution.stringWithStats + } else if (formatted) { + queryExecution.simpleString(formatted = true) } else { queryExecution.simpleString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 4d3eb11250c3f..2617add6ffded 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -98,7 +98,7 @@ case class CreateViewCommand( import ViewHelper._ - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) + override def innerChildren: Seq[QueryPlan[_]] = Seq(child) if (viewType == PersistedView) { require(originalText.isDefined, "'originalText' must be provided to create permanent view") @@ -271,7 +271,7 @@ case class AlterViewAsCommand( import ViewHelper._ - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(session: SparkSession): Seq[Row] = { // If the plan cannot be analyzed, throw an exception and don't proceed. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 80d7608a22891..bd9cc0e44fca3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -33,7 +33,7 @@ case class InsertIntoDataSourceCommand( overwrite: Boolean) extends RunnableCommand { - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index a1de287b93f9d..49e77f618f2a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -39,7 +39,7 @@ case class SaveIntoDataSourceCommand( options: Map[String, String], mode: SaveMode) extends RunnableCommand { - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { dataSource.createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 153645decdcf8..3315ae7dabef1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SparkPlan, UnaryExecNode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -86,6 +86,15 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan override def outputOrdering: Seq[SortOrder] = { child.outputOrdering.map(updateAttr(_).asInstanceOf[SortOrder]) } + + override def verboseStringWithOperatorId(): String = { + val cdgen = ExplainUtils.getCodegenId(this) + val reuse_op_str = ExplainUtils.getOpId(child) + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${cdgen} [Reuses operator id: $reuse_op_str] + |Output : ${output} + """.stripMargin + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 4d261dd422bc5..88d98530991c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -22,7 +22,8 @@ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner -import org.apache.spark.sql.execution.{BinaryExecNode, ExternalAppendOnlyUnsafeRowArray, SparkPlan} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils, ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.CompletionIterator @@ -66,6 +67,17 @@ case class CartesianProductExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override def verboseStringWithOperatorId(): String = { + val joinCondStr = if (condition.isDefined) { + s"${condition.get}" + } else "None" + + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Join condition: ${joinCondStr} + """.stripMargin + } + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 5ee4c7ffb1911..3788418f8aafe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{RowIterator, SparkPlan} +import org.apache.spark.sql.execution.{ExplainUtils, RowIterator, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{IntegralType, LongType} @@ -37,6 +37,24 @@ trait HashJoin { val left: SparkPlan val right: SparkPlan + override def simpleStringWithNodeId(): String = { + val opId = ExplainUtils.getOpId(this) + s"$nodeName $joinType ${buildSide} ($opId)".trim + } + + override def verboseStringWithOperatorId(): String = { + val joinCondStr = if (condition.isDefined) { + s"${condition.get}" + } else "None" + + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Left keys: ${leftKeys} + |Right keys: ${rightKeys} + |Join condition: ${joinCondStr} + """.stripMargin + } + override def output: Seq[Attribute] = { joinType match { case _: InnerLike => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index f829f07e80720..189727a9bc88d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -45,6 +45,23 @@ case class SortMergeJoinExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override def simpleStringWithNodeId(): String = { + val opId = ExplainUtils.getOpId(this) + s"$nodeName $joinType ($opId)".trim + } + + override def verboseStringWithOperatorId(): String = { + val joinCondStr = if (condition.isDefined) { + s"${condition.get}" + } else "None" + s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |Left keys : ${leftKeys} + |Right keys: ${rightKeys} + |Join condition : ${joinCondStr} + """.stripMargin + } + override def output: Seq[Attribute] = { joinType match { case _: InnerLike => diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql b/sql/core/src/test/resources/sql-tests/inputs/explain.sql new file mode 100644 index 0000000000000..773c123992f71 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql @@ -0,0 +1,96 @@ +-- Test tables +CREATE table explain_temp1 (key int, val int) USING PARQUET; +CREATE table explain_temp2 (key int, val int) USING PARQUET; +CREATE table explain_temp3 (key int, val int) USING PARQUET; + +SET spark.sql.codegen.wholeStage = true; + +-- single table +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + ORDER BY key; + +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + HAVING max(val) > 0; + +-- simple union +EXPLAIN FORMATTED + SELECT key, val FROM explain_temp1 WHERE key > 0 + UNION + SELECT key, val FROM explain_temp1 WHERE key > 0; + +-- Join +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a, + explain_temp2 b + WHERE a.key = b.key; + +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a + LEFT OUTER JOIN explain_temp2 b + ON a.key = b.key; + +-- Subqueries nested. +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0) + AND val = 2) + AND val > 3; + +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE val > 0) + OR + key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0); + +-- Reuse subquery +EXPLAIN FORMATTED + SELECT (SELECT Avg(key) FROM explain_temp1) + (SELECT Avg(key) FROM explain_temp1) + FROM explain_temp1; + +-- CTE + ReuseExchange +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT * + FROM explain_temp1 + WHERE key > 10 + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key; + +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 10 + GROUP BY key + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key; + +-- A spark plan which has innerChildren other than subquery +EXPLAIN FORMATTED + CREATE VIEW explain_view AS + SELECT key, val FROM explain_temp1; + +-- cleanup +DROP TABLE explain_temp1; +DROP TABLE explain_temp2; +DROP TABLE explain_temp3; diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out new file mode 100644 index 0000000000000..4a08cfada292d --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -0,0 +1,738 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 18 + + +-- !query 0 +CREATE table explain_temp1 (key int, val int) USING PARQUET +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE table explain_temp2 (key int, val int) USING PARQUET +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +CREATE table explain_temp3 (key int, val int) USING PARQUET +-- !query 2 schema +struct<> +-- !query 2 output + + + +-- !query 3 +SET spark.sql.codegen.wholeStage = true +-- !query 3 schema +struct +-- !query 3 output +spark.sql.codegen.wholeStage true + + +-- !query 4 +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + ORDER BY key +-- !query 4 schema +struct +-- !query 4 output +== Physical Plan == +* Sort (9) ++- Exchange (8) + +- * HashAggregate (7) + +- Exchange (6) + +- * HashAggregate (5) + +- * Project (4) + +- * Filter (3) + +- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) HashAggregate [codegen id : 1] +Input: [key#x, val#x] + +(6) Exchange +Input: [key#x, max#x] + +(7) HashAggregate [codegen id : 2] +Input: [key#x, max#x] + +(8) Exchange +Input: [key#x, max(val)#x] + +(9) Sort [codegen id : 3] +Input: [key#x, max(val)#x] + + +-- !query 5 +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + HAVING max(val) > 0 +-- !query 5 schema +struct +-- !query 5 output +== Physical Plan == +* Project (9) ++- * Filter (8) + +- * HashAggregate (7) + +- Exchange (6) + +- * HashAggregate (5) + +- * Project (4) + +- * Filter (3) + +- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) HashAggregate [codegen id : 1] +Input: [key#x, val#x] + +(6) Exchange +Input: [key#x, max#x] + +(7) HashAggregate [codegen id : 2] +Input: [key#x, max#x] + +(8) Filter [codegen id : 2] +Input : [key#x, max(val)#x, max(val#x)#x] +Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0)) + +(9) Project [codegen id : 2] +Output : [key#x, max(val)#x] +Input : [key#x, max(val)#x, max(val#x)#x] + + +-- !query 6 +EXPLAIN FORMATTED + SELECT key, val FROM explain_temp1 WHERE key > 0 + UNION + SELECT key, val FROM explain_temp1 WHERE key > 0 +-- !query 6 schema +struct +-- !query 6 output +== Physical Plan == +* HashAggregate (12) ++- Exchange (11) + +- * HashAggregate (10) + +- Union (9) + :- * Project (4) + : +- * Filter (3) + : +- * ColumnarToRow (2) + : +- Scan parquet default.explain_temp1 (1) + +- * Project (8) + +- * Filter (7) + +- * ColumnarToRow (6) + +- Scan parquet default.explain_temp1 (5) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(6) ColumnarToRow [codegen id : 2] +Input: [key#x, val#x] + +(7) Filter [codegen id : 2] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(8) Project [codegen id : 2] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(9) Union + +(10) HashAggregate [codegen id : 3] +Input: [key#x, val#x] + +(11) Exchange +Input: [key#x, val#x] + +(12) HashAggregate [codegen id : 4] +Input: [key#x, val#x] + + +-- !query 7 +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a, + explain_temp2 b + WHERE a.key = b.key +-- !query 7 schema +struct +-- !query 7 output +== Physical Plan == +* BroadcastHashJoin Inner BuildRight (10) +:- * Project (4) +: +- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (9) + +- * Project (8) + +- * Filter (7) + +- * ColumnarToRow (6) + +- Scan parquet default.explain_temp2 (5) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 2] +Input: [key#x, val#x] + +(3) Filter [codegen id : 2] +Input : [key#x, val#x] +Condition : isnotnull(key#x) + +(4) Project [codegen id : 2] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) Scan parquet default.explain_temp2 +Output: [key#x, val#x] + +(6) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(7) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : isnotnull(key#x) + +(8) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(9) BroadcastExchange +Input: [key#x, val#x] + +(10) BroadcastHashJoin [codegen id : 2] +Left keys: List(key#x) +Right keys: List(key#x) +Join condition: None + + +-- !query 8 +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a + LEFT OUTER JOIN explain_temp2 b + ON a.key = b.key +-- !query 8 schema +struct +-- !query 8 output +== Physical Plan == +* BroadcastHashJoin LeftOuter BuildRight (8) +:- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (7) + +- * Project (6) + +- * Filter (5) + +- * ColumnarToRow (4) + +- Scan parquet default.explain_temp2 (3) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 2] +Input: [key#x, val#x] + +(3) Scan parquet default.explain_temp2 +Output: [key#x, val#x] + +(4) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(5) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : isnotnull(key#x) + +(6) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(7) BroadcastExchange +Input: [key#x, val#x] + +(8) BroadcastHashJoin [codegen id : 2] +Left keys: List(key#x) +Right keys: List(key#x) +Join condition: None + + +-- !query 9 +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0) + AND val = 2) + AND val > 3 +-- !query 9 schema +struct +-- !query 9 output +== Physical Plan == +* Project (4) ++- * Filter (3) + +- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +===== Subqueries ===== + +Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (11) ++- Exchange (10) + +- * HashAggregate (9) + +- * Project (8) + +- * Filter (7) + +- * ColumnarToRow (6) + +- Scan parquet default.explain_temp2 (5) + + +(5) Scan parquet default.explain_temp2 +Output: [key#x, val#x] + +(6) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(7) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) + +(8) Project [codegen id : 1] +Output : [key#x] +Input : [key#x, val#x] + +(9) HashAggregate [codegen id : 1] +Input: [key#x] + +(10) Exchange +Input: [max#x] + +(11) HashAggregate [codegen id : 2] +Input: [max#x] + +Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (18) ++- Exchange (17) + +- * HashAggregate (16) + +- * Project (15) + +- * Filter (14) + +- * ColumnarToRow (13) + +- Scan parquet default.explain_temp3 (12) + + +(12) Scan parquet default.explain_temp3 +Output: [key#x, val#x] + +(13) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(14) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(15) Project [codegen id : 1] +Output : [key#x] +Input : [key#x, val#x] + +(16) HashAggregate [codegen id : 1] +Input: [key#x] + +(17) Exchange +Input: [max#x] + +(18) HashAggregate [codegen id : 2] +Input: [max#x] + + +-- !query 10 +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE val > 0) + OR + key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0) +-- !query 10 schema +struct +-- !query 10 output +== Physical Plan == +* Filter (3) ++- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (key#x = Subquery scalar-subquery#x, [id=#x])) + +===== Subqueries ===== + +Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (10) ++- Exchange (9) + +- * HashAggregate (8) + +- * Project (7) + +- * Filter (6) + +- * ColumnarToRow (5) + +- Scan parquet default.explain_temp2 (4) + + +(4) Scan parquet default.explain_temp2 +Output: [key#x, val#x] + +(5) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(6) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(7) Project [codegen id : 1] +Output : [key#x] +Input : [key#x, val#x] + +(8) HashAggregate [codegen id : 1] +Input: [key#x] + +(9) Exchange +Input: [max#x] + +(10) HashAggregate [codegen id : 2] +Input: [max#x] + +Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (17) ++- Exchange (16) + +- * HashAggregate (15) + +- * Project (14) + +- * Filter (13) + +- * ColumnarToRow (12) + +- Scan parquet default.explain_temp3 (11) + + +(11) Scan parquet default.explain_temp3 +Output: [key#x, val#x] + +(12) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(13) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(14) Project [codegen id : 1] +Output : [key#x] +Input : [key#x, val#x] + +(15) HashAggregate [codegen id : 1] +Input: [key#x] + +(16) Exchange +Input: [max#x] + +(17) HashAggregate [codegen id : 2] +Input: [max#x] + + +-- !query 11 +EXPLAIN FORMATTED + SELECT (SELECT Avg(key) FROM explain_temp1) + (SELECT Avg(key) FROM explain_temp1) + FROM explain_temp1 +-- !query 11 schema +struct +-- !query 11 output +== Physical Plan == +* Project (3) ++- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [] + +(2) ColumnarToRow [codegen id : 1] +Input: [] + +(3) Project [codegen id : 1] +Output : [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] +Input : [] + +===== Subqueries ===== + +Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (8) ++- Exchange (7) + +- * HashAggregate (6) + +- * ColumnarToRow (5) + +- Scan parquet default.explain_temp1 (4) + + +(4) Scan parquet default.explain_temp1 +Output: [key#x] + +(5) ColumnarToRow [codegen id : 1] +Input: [key#x] + +(6) HashAggregate [codegen id : 1] +Input: [key#x] + +(7) Exchange +Input: [sum#x, count#xL] + +(8) HashAggregate [codegen id : 2] +Input: [sum#x, count#xL] + +Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] + + +-- !query 12 +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT * + FROM explain_temp1 + WHERE key > 10 + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key +-- !query 12 schema +struct +-- !query 12 output +== Physical Plan == +* BroadcastHashJoin Inner BuildRight (10) +:- * Project (4) +: +- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (9) + +- * Project (8) + +- * Filter (7) + +- * ColumnarToRow (6) + +- Scan parquet default.explain_temp1 (5) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 2] +Input: [key#x, val#x] + +(3) Filter [codegen id : 2] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(4) Project [codegen id : 2] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(6) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(7) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(8) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(9) BroadcastExchange +Input: [key#x, val#x] + +(10) BroadcastHashJoin [codegen id : 2] +Left keys: List(key#x) +Right keys: List(key#x) +Join condition: None + + +-- !query 13 +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 10 + GROUP BY key + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key +-- !query 13 schema +struct +-- !query 13 output +== Physical Plan == +* BroadcastHashJoin Inner BuildRight (11) +:- * HashAggregate (7) +: +- Exchange (6) +: +- * HashAggregate (5) +: +- * Project (4) +: +- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (10) + +- * HashAggregate (9) + +- ReusedExchange (8) + + +(1) Scan parquet default.explain_temp1 +Output: [key#x, val#x] + +(2) ColumnarToRow [codegen id : 1] +Input: [key#x, val#x] + +(3) Filter [codegen id : 1] +Input : [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(4) Project [codegen id : 1] +Output : [key#x, val#x] +Input : [key#x, val#x] + +(5) HashAggregate [codegen id : 1] +Input: [key#x, val#x] + +(6) Exchange +Input: [key#x, max#x] + +(7) HashAggregate [codegen id : 4] +Input: [key#x, max#x] + +(8) ReusedExchange [Reuses operator id: 6] +Output : ArrayBuffer(key#x, max#x) + +(9) HashAggregate [codegen id : 3] +Input: [key#x, max#x] + +(10) BroadcastExchange +Input: [key#x, max(val)#x] + +(11) BroadcastHashJoin [codegen id : 4] +Left keys: List(key#x) +Right keys: List(key#x) +Join condition: None + + +-- !query 14 +EXPLAIN FORMATTED + CREATE VIEW explain_view AS + SELECT key, val FROM explain_temp1 +-- !query 14 schema +struct +-- !query 14 output +== Physical Plan == +Execute CreateViewCommand (1) + +- CreateViewCommand (2) + +- Project (4) + +- UnresolvedRelation (3) + + +(1) Execute CreateViewCommand +Output: [] + +(2) CreateViewCommand + +(3) UnresolvedRelation + +(4) Project + + +-- !query 15 +DROP TABLE explain_temp1 +-- !query 15 schema +struct<> +-- !query 15 output + + + +-- !query 16 +DROP TABLE explain_temp2 +-- !query 16 schema +struct<> +-- !query 16 output + + + +-- !query 17 +DROP TABLE explain_temp3 +-- !query 17 schema +struct<> +-- !query 17 output +