Skip to content

Commit

Permalink
[SPARK-36795][SQL] Explain Formatted has Duplicate Node IDs
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Fixed explain formatted mode so it doesn't have duplicate node IDs when InMemoryRelation is present in query plan.

### Why are the changes needed?

Having duplicated node IDs in the plan makes it confusing.

### Does this PR introduce _any_ user-facing change?

Yes, explain formatted string will change.
Notice how `ColumnarToRow` and `InMemoryRelation` have node id of 2.
Before changes =>
```
== Physical Plan ==
AdaptiveSparkPlan (14)
+- == Final Plan ==
   * BroadcastHashJoin Inner BuildLeft (9)
   :- BroadcastQueryStage (5)
   :  +- BroadcastExchange (4)
   :     +- * Filter (3)
   :        +- * ColumnarToRow (2)
   :           +- InMemoryTableScan (1)
   :                 +- InMemoryRelation (2)
   :                       +- * ColumnarToRow (4)
   :                          +- Scan parquet default.t1 (3)
   +- * Filter (8)
      +- * ColumnarToRow (7)
         +- Scan parquet default.t2 (6)
+- == Initial Plan ==
   BroadcastHashJoin Inner BuildLeft (13)
   :- BroadcastExchange (11)
   :  +- Filter (10)
   :     +- InMemoryTableScan (1)
   :           +- InMemoryRelation (2)
   :                 +- * ColumnarToRow (4)
   :                    +- Scan parquet default.t1 (3)
   +- Filter (12)
      +- Scan parquet default.t2 (6)

(1) InMemoryTableScan
Output [1]: [k#x]
Arguments: [k#x], [isnotnull(k#x)]

(2) InMemoryRelation
Arguments: [k#x], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer401788d5,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) ColumnarToRow
+- FileScan parquet default.t1[k#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apach..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int>
,None)

(3) Scan parquet default.t1
Output [1]: [k#x]
Batched: true
Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t1]
ReadSchema: struct<k:int>

(4) ColumnarToRow [codegen id : 1]
Input [1]: [k#x]

(5) BroadcastQueryStage
Output [1]: [k#x]
Arguments: 0

(6) Scan parquet default.t2
Output [1]: [key#x]
Batched: true
Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t2]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int>

(7) ColumnarToRow
Input [1]: [key#x]

(8) Filter
Input [1]: [key#x]
Condition : isnotnull(key#x)

(9) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [k#x]
Right keys [1]: [key#x]
Join condition: None

(10) Filter
Input [1]: [k#x]
Condition : isnotnull(k#x)

(11) BroadcastExchange
Input [1]: [k#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x]

(12) Filter
Input [1]: [key#x]
Condition : isnotnull(key#x)

(13) BroadcastHashJoin
Left keys [1]: [k#x]
Right keys [1]: [key#x]
Join condition: None

(14) AdaptiveSparkPlan
Output [2]: [k#x, key#x]
Arguments: isFinalPlan=true
```

After Changes =>
```
== Physical Plan ==
AdaptiveSparkPlan (17)
+- == Final Plan ==
   * BroadcastHashJoin Inner BuildLeft (12)
   :- BroadcastQueryStage (8)
   :  +- BroadcastExchange (7)
   :     +- * Filter (6)
   :        +- * ColumnarToRow (5)
   :           +- InMemoryTableScan (1)
   :                 +- InMemoryRelation (2)
   :                       +- * ColumnarToRow (4)
   :                          +- Scan parquet default.t1 (3)
   +- * Filter (11)
      +- * ColumnarToRow (10)
         +- Scan parquet default.t2 (9)
+- == Initial Plan ==
   BroadcastHashJoin Inner BuildLeft (16)
   :- BroadcastExchange (14)
   :  +- Filter (13)
   :     +- InMemoryTableScan (1)
   :           +- InMemoryRelation (2)
   :                 +- * ColumnarToRow (4)
   :                    +- Scan parquet default.t1 (3)
   +- Filter (15)
      +- Scan parquet default.t2 (9)

(1) InMemoryTableScan
Output [1]: [k#x]
Arguments: [k#x], [isnotnull(k#x)]

(2) InMemoryRelation
Arguments: [k#x], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer3ccb12d,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) ColumnarToRow
+- FileScan parquet default.t1[k#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apach..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int>
,None)

(3) Scan parquet default.t1
Output [1]: [k#x]
Batched: true
Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t1]
ReadSchema: struct<k:int>

(4) ColumnarToRow [codegen id : 1]
Input [1]: [k#x]

(5) ColumnarToRow [codegen id : 1]
Input [1]: [k#x]

(6) Filter [codegen id : 1]
Input [1]: [k#x]
Condition : isnotnull(k#x)

(7) BroadcastExchange
Input [1]: [k#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x]

(8) BroadcastQueryStage
Output [1]: [k#x]
Arguments: 0

(9) Scan parquet default.t2
Output [1]: [key#x]
Batched: true
Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t2]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int>

(10) ColumnarToRow
Input [1]: [key#x]

(11) Filter
Input [1]: [key#x]
Condition : isnotnull(key#x)

(12) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [k#x]
Right keys [1]: [key#x]
Join condition: None

(13) Filter
Input [1]: [k#x]
Condition : isnotnull(k#x)

(14) BroadcastExchange
Input [1]: [k#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x]

(15) Filter
Input [1]: [key#x]
Condition : isnotnull(key#x)

(16) BroadcastHashJoin
Left keys [1]: [k#x]
Right keys [1]: [key#x]
Join condition: None

(17) AdaptiveSparkPlan
Output [2]: [k#x, key#x]
Arguments: isFinalPlan=true
```

### How was this patch tested?

add test

Closes #34036 from ChenMichael/SPARK-36795-Duplicate-node-id-with-inMemoryRelation.

Authored-by: Michael Chen <mike.chen@workday.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
Michael Chen authored and HyukjinKwon committed Sep 23, 2021
1 parent 0076eba commit 6d7ab7b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Expand Up @@ -148,7 +148,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
setOpId(p)
case other: QueryPlan[_] =>
setOpId(other)
other.innerChildren.foldLeft(currentOperationID) {
currentOperationID = other.innerChildren.foldLeft(currentOperationID) {
(curId, plan) => generateOperatorIDs(plan, curId)
}
}
Expand Down
19 changes: 19 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
Expand Up @@ -704,6 +704,25 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
"Bucketed: false (bucket column(s) not read)")
}
}

test("SPARK-36795: Node IDs should not be duplicated when InMemoryRelation present") {
withTempView("t1", "t2") {
Seq(1).toDF("k").write.saveAsTable("t1")
Seq(1).toDF("key").write.saveAsTable("t2")
spark.sql("SELECT * FROM t1").persist()
val query = "SELECT * FROM (SELECT * FROM t1) join t2 " +
"ON k = t2.key"
val df = sql(query).toDF()

val inMemoryRelationRegex = """InMemoryRelation \(([0-9]+)\)""".r
val columnarToRowRegex = """ColumnarToRow \(([0-9]+)\)""".r
val explainString = getNormalizedExplain(df, FormattedMode)
val inMemoryRelationNodeId = inMemoryRelationRegex.findAllIn(explainString).group(1)
val columnarToRowNodeId = columnarToRowRegex.findAllIn(explainString).group(1)

assert(inMemoryRelationNodeId != columnarToRowNodeId)
}
}
}

case class ExplainSingleData(id: Int)

0 comments on commit 6d7ab7b

Please sign in to comment.