Skip to content

Commit

Permalink
[SPARK-33850][SQL] EXPLAIN FORMATTED doesn't show the plan for subque…
Browse files Browse the repository at this point in the history
…ries if AQE is enabled

### What changes were proposed in this pull request?

This PR fixes an issue that when AQE is enabled, EXPLAIN FORMATTED doesn't show the plan for subqueries.

```scala
val df = spark.range(1, 100)
df.createTempView("df")
spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("FORMATTED")

== Physical Plan ==
AdaptiveSparkPlan (3)
+- Project (2)
 +- Scan OneRowRelation (1)

(1) Scan OneRowRelation
Output: []
Arguments: ParallelCollectionRDD[0] at explain at <console>:24, OneRowRelation, UnknownPartitioning(0)

(2) Project
Output [1]: [Subquery subquery#3, [id=#20] AS scalarsubquery()#5L]
Input: []

(3) AdaptiveSparkPlan
Output [1]: [scalarsubquery()#5L]
Arguments: isFinalPlan=false
```

After this change, the plan for the subquerie is shown.
```scala
== Physical Plan ==
* Project (2)
+- * Scan OneRowRelation (1)

(1) Scan OneRowRelation [codegen id : 1]
Output: []
Arguments: ParallelCollectionRDD[0] at explain at <console>:24, OneRowRelation, UnknownPartitioning(0)

(2) Project [codegen id : 1]
Output [1]: [Subquery scalar-subquery#3, [id=#24] AS scalarsubquery()#5L]
Input: []

===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#3, [id=#24]
* HashAggregate (6)
+- Exchange (5)
   +- * HashAggregate (4)
      +- * Range (3)

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (1, 100, step=1, splits=Some(12))

(4) HashAggregate [codegen id : 1]
Input [1]: [id#0L]
Keys: []
Functions [1]: [partial_min(id#0L)]
Aggregate Attributes [1]: [min#7L]
Results [1]: [min#8L]

(5) Exchange
Input [1]: [min#8L]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#20]

(6) HashAggregate [codegen id : 2]
Input [1]: [min#8L]
Keys: []
Functions [1]: [min(id#0L)]
Aggregate Attributes [1]: [min(id#0L)#4L]
Results [1]: [min(id#0L)#4L AS v#2L]
```

### Why are the changes needed?

For better debuggability.

### Does this PR introduce _any_ user-facing change?

Yes. Users can see the formatted plan for subqueries.

### How was this patch tested?

New test.

Closes apache#30855 from sarutak/fix-aqe-explain.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 70da86a)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
sarutak authored and dongjoon-hyun committed Dec 19, 2020
1 parent 1ce8000 commit c0ac578
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
plan: => QueryPlan[_],
subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = {
plan.foreach {
case a: AdaptiveSparkPlanExec =>
getSubqueries(a.executedPlan, subqueries)
case p: SparkPlan =>
p.expressions.foreach (_.collect {
case e: PlanExpression[_] =>
Expand Down
263 changes: 263 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,101 @@ Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subq
Output [2]: [key#x, val#x]
Arguments: isFinalPlan=false

===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
AdaptiveSparkPlan (10)
+- HashAggregate (9)
+- Exchange (8)
+- HashAggregate (7)
+- Project (6)
+- Filter (5)
+- Scan parquet default.explain_temp2 (4)


(4) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)]
ReadSchema: struct<key:int,val:int>

(5) Filter
Input [2]: [key#x, val#x]
Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subquery#x, [id=#x])) AND (val#x = 2))

(6) Project
Output [1]: [key#x]
Input [2]: [key#x, val#x]

(7) HashAggregate
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_max(key#x)]
Aggregate Attributes [1]: [max#x]
Results [1]: [max#x]

(8) Exchange
Input [1]: [max#x]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]

(9) HashAggregate
Input [1]: [max#x]
Keys: []
Functions [1]: [max(key#x)]
Aggregate Attributes [1]: [max(key#x)#x]
Results [1]: [max(key#x)#x AS max(key)#x]

(10) AdaptiveSparkPlan
Output [1]: [max(key)#x]
Arguments: isFinalPlan=false

Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery subquery#x, [id=#x]
AdaptiveSparkPlan (17)
+- HashAggregate (16)
+- Exchange (15)
+- HashAggregate (14)
+- Project (13)
+- Filter (12)
+- Scan parquet default.explain_temp3 (11)


(11) Scan parquet default.explain_temp3
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp3]
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
ReadSchema: struct<key:int,val:int>

(12) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(val#x) AND (val#x > 0))

(13) Project
Output [1]: [key#x]
Input [2]: [key#x, val#x]

(14) HashAggregate
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_max(key#x)]
Aggregate Attributes [1]: [max#x]
Results [1]: [max#x]

(15) Exchange
Input [1]: [max#x]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]

(16) HashAggregate
Input [1]: [max#x]
Keys: []
Functions [1]: [max(key#x)]
Aggregate Attributes [1]: [max(key#x)#x]
Results [1]: [max(key#x)#x AS max(key)#x]

(17) AdaptiveSparkPlan
Output [1]: [max(key)#x]
Arguments: isFinalPlan=false

-- !query
EXPLAIN FORMATTED
Expand Down Expand Up @@ -442,6 +537,101 @@ Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double) =
Output [2]: [key#x, val#x]
Arguments: isFinalPlan=false

===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
AdaptiveSparkPlan (10)
+- HashAggregate (9)
+- Exchange (8)
+- HashAggregate (7)
+- Project (6)
+- Filter (5)
+- Scan parquet default.explain_temp2 (4)


(4) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
ReadSchema: struct<key:int,val:int>

(5) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(val#x) AND (val#x > 0))

(6) Project
Output [1]: [key#x]
Input [2]: [key#x, val#x]

(7) HashAggregate
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_max(key#x)]
Aggregate Attributes [1]: [max#x]
Results [1]: [max#x]

(8) Exchange
Input [1]: [max#x]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]

(9) HashAggregate
Input [1]: [max#x]
Keys: []
Functions [1]: [max(key#x)]
Aggregate Attributes [1]: [max(key#x)#x]
Results [1]: [max(key#x)#x AS max(key)#x]

(10) AdaptiveSparkPlan
Output [1]: [max(key)#x]
Arguments: isFinalPlan=false

Subquery:2 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
AdaptiveSparkPlan (17)
+- HashAggregate (16)
+- Exchange (15)
+- HashAggregate (14)
+- Project (13)
+- Filter (12)
+- Scan parquet default.explain_temp3 (11)


(11) Scan parquet default.explain_temp3
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp3]
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
ReadSchema: struct<key:int,val:int>

(12) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(val#x) AND (val#x > 0))

(13) Project
Output [1]: [key#x]
Input [2]: [key#x, val#x]

(14) HashAggregate
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_avg(cast(key#x as bigint))]
Aggregate Attributes [2]: [sum#x, count#xL]
Results [2]: [sum#x, count#xL]

(15) Exchange
Input [2]: [sum#x, count#xL]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]

(16) HashAggregate
Input [2]: [sum#x, count#xL]
Keys: []
Functions [1]: [avg(cast(key#x as bigint))]
Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]

(17) AdaptiveSparkPlan
Output [1]: [avg(key)#x]
Arguments: isFinalPlan=false

-- !query
EXPLAIN FORMATTED
Expand Down Expand Up @@ -470,6 +660,79 @@ Input: []
Output [1]: [(scalarsubquery() + scalarsubquery())#x]
Arguments: isFinalPlan=false

===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
AdaptiveSparkPlan (8)
+- HashAggregate (7)
+- Exchange (6)
+- HashAggregate (5)
+- Scan parquet default.explain_temp1 (4)


(4) Scan parquet default.explain_temp1
Output [1]: [key#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int>

(5) HashAggregate
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_avg(cast(key#x as bigint))]
Aggregate Attributes [2]: [sum#x, count#xL]
Results [2]: [sum#x, count#xL]

(6) Exchange
Input [2]: [sum#x, count#xL]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]

(7) HashAggregate
Input [2]: [sum#x, count#xL]
Keys: []
Functions [1]: [avg(cast(key#x as bigint))]
Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]

(8) AdaptiveSparkPlan
Output [1]: [avg(key)#x]
Arguments: isFinalPlan=false

Subquery:2 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
AdaptiveSparkPlan (13)
+- HashAggregate (12)
+- Exchange (11)
+- HashAggregate (10)
+- Scan parquet default.explain_temp1 (9)


(9) Scan parquet default.explain_temp1
Output [1]: [key#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int>

(10) HashAggregate
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_avg(cast(key#x as bigint))]
Aggregate Attributes [2]: [sum#x, count#xL]
Results [2]: [sum#x, count#xL]

(11) Exchange
Input [2]: [sum#x, count#xL]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]

(12) HashAggregate
Input [2]: [sum#x, count#xL]
Keys: []
Functions [1]: [avg(cast(key#x as bigint))]
Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]

(13) AdaptiveSparkPlan
Output [1]: [avg(key)#x]
Arguments: isFinalPlan=false

-- !query
EXPLAIN FORMATTED
Expand Down
22 changes: 22 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,28 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
}
}

test("SPARK-33850: explain formatted - check presence of subquery in case of AQE") {
withTable("df1") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
withTable("df1") {
spark.range(1, 100)
.write
.format("parquet")
.mode("overwrite")
.saveAsTable("df1")

val sqlText = "EXPLAIN FORMATTED SELECT (SELECT min(id) FROM df1) as v"
val expected_pattern1 =
"Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x"

withNormalizedExplain(sqlText) { normalizedOutput =>
assert(expected_pattern1.r.findAllMatchIn(normalizedOutput).length == 1)
}
}
}
}
}

test("Support ExplainMode in Dataset.explain") {
val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1")
val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2")
Expand Down

0 comments on commit c0ac578

Please sign in to comment.