Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27395][SQL] Improve EXPLAIN command #24759

Closed
wants to merge 23 commits into from

Conversation

dilipbiswal
Copy link
Contributor

@dilipbiswal dilipbiswal commented May 31, 2019

What changes were proposed in this pull request?

This PR aims at improving the way physical plans are explained in spark.

Currently, the explain output for physical plan may look very cluttered and each operator's
string representation can be very wide and wraps around in the display making it little
hard to follow. This especially happens when explaining a query 1) Operating on wide tables
2) Has complex expressions etc.

This PR attempts to split the output into two sections. In the header section, we display
the basic operator tree with a number associated with each operator. In this section, we strictly
control what we output for each operator. In the footer section, each operator is verbosely
displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be
correlated by the originating expression id from its parent plan.

To illustrate, here is a simple plan displayed in old vs new way.

Example query1 :

EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0

Old :

*(2) Project [key#2, max(val)#15]
+- *(2) Filter (isnotnull(max(val#3)#18) AND (max(val#3)#18 > 0))
   +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)#15, max(val#3)#18])
      +- Exchange hashpartitioning(key#2, 200)
         +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21])
            +- *(1) Project [key#2, val#3]
               +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0))
                  +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>

New :

Project (8)
+- Filter (7)
   +- HashAggregate (6)
      +- Exchange (5)
         +- HashAggregate (4)
            +- Project (3)
               +- Filter (2)
                  +- Scan parquet default.explain_temp1 (1)


(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (isnotnull(key#2) AND (key#2 > 0))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]

(4) HashAggregate [codegen id : 1]
Input: [key#2, val#3]

(5) Exchange
Input: [key#2, max#11]

(6) HashAggregate [codegen id : 2]
Input: [key#2, max#11]

(7) Filter [codegen id : 2]
Input     : [key#2, max(val)#5, max(val#3)#8]
Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0))

(8) Project [codegen id : 2]
Output    : [key#2, max(val)#5]
Input     : [key#2, max(val)#5, max(val#3)#8]

Example Query2 (subquery):

SELECT * FROM   explain_temp1 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp2 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp3 WHERE  val > 0) AND val = 2) AND val > 3

Old:

*(1) Project [key#2, val#3]
+- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3))
   :  +- Subquery scalar-subquery#39
   :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)#45])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47])
   :              +- *(1) Project [key#26]
   :                 +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2))
   :                    :  +- Subquery scalar-subquery#38
   :                    :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)#43])
   :                    :        +- Exchange SinglePartition
   :                    :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49])
   :                    :              +- *(1) Project [key#28]
   :                    :                 +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0))
   :                    :                    +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int>
   :                    +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int>
   +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int>

New:

Project (3)
+- Filter (2)
   +- Scan parquet default.explain_temp1 (1)


(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]
===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23
HashAggregate (9)
+- Exchange (8)
   +- HashAggregate (7)
      +- Project (6)
         +- Filter (5)
            +- Scan parquet default.explain_temp2 (4)


(4) Scan parquet default.explain_temp2 [codegen id : 1]
Output: [key#26, val#27]

(5) Filter [codegen id : 1]
Input     : [key#26, val#27]
Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2))

(6) Project [codegen id : 1]
Output    : [key#26]
Input     : [key#26, val#27]

(7) HashAggregate [codegen id : 1]
Input: [key#26]

(8) Exchange
Input: [max#35]

(9) HashAggregate [codegen id : 2]
Input: [max#35]

Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22
HashAggregate (15)
+- Exchange (14)
   +- HashAggregate (13)
      +- Project (12)
         +- Filter (11)
            +- Scan parquet default.explain_temp3 (10)


(10) Scan parquet default.explain_temp3 [codegen id : 1]
Output: [key#28, val#29]

(11) Filter [codegen id : 1]
Input     : [key#28, val#29]
Condition : (isnotnull(val#29) AND (val#29 > 0))

(12) Project [codegen id : 1]
Output    : [key#28]
Input     : [key#28, val#29]

(13) HashAggregate [codegen id : 1]
Input: [key#28]

(14) Exchange
Input: [max#37]

(15) HashAggregate [codegen id : 2]
Input: [max#37]

Note:
Currently this PR provides a basic infrastructure
for explain enhancement. The details about individual operators will be implemented
in follow-up prs

How was this patch tested?

Added a new test explain.sql that tests basic scenarios. Need to add more tests.

@ekoifman
Copy link
Contributor

This looks easier to read but doesn't the new formatting loose some info? From 1st example,
Old:
+- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>

New:

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

Also,
it may be useful to support "EXPLAIN FORMATTED" or something like this for new style, and "EXPLAIN" for old - may break tests otherwise.

@dilipbiswal
Copy link
Contributor Author

dilipbiswal commented May 31, 2019

@ekoifman Thanks for your comment. So currently, verboseString has not been implemented for all the operators. What we have is very basic information like a default implementation. We will need to implement verboseString for each operator that will print the necessary information.

it may be useful to support "EXPLAIN FORMATTED" or something like this for new style, and "EXPLAIN" for old - may break tests otherwise.

Yeah. Its definitely an option. Lets get some more feedback and we can decide on it.

@SparkQA
Copy link

SparkQA commented May 31, 2019

Test build #106024 has finished for PR 24759 at commit ac2d172.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 31, 2019

Test build #106029 has finished for PR 24759 at commit 8ad5f3e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 31, 2019

Test build #106030 has finished for PR 24759 at commit 925144b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106034 has finished for PR 24759 at commit 9a17839.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

cc @gatorsmile @maryannxue

@gatorsmile
Copy link
Member

cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107722 has finished for PR 24759 at commit 01c6655.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107723 has finished for PR 24759 at commit 01c6655.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107726 has finished for PR 24759 at commit 01c6655.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jul 21, 2019

hi @dilipbiswal, this activity looks nice to me and I like this. I'm currently not sure that this current approach is the best for spark, so I think studying EXPLAIN output in other DBMS-like systems is beneficial for this? Also, any reference that this current approach is based on?

Anyway, I checked EXPLAIN output in postgresql, mysql, and presto by using the TPCDS q1;
https://gist.github.com/maropu/487dcb0837f4ae6ae412db549617897c

@dilipbiswal
Copy link
Contributor Author

dilipbiswal commented Jul 21, 2019

@maropu Thanks for your input. The current approach is based on DB2. I had some preliminary discussion with @gatorsmile before starting on this work. What are your concerns with current approach ?

@maropu
Copy link
Member

maropu commented Jul 23, 2019

oh I see, since I've never used db2, I didn't notice that (this output is based on the db2 one). Thanks!
I think the current format is basically ok to me. My suggestion is just that we could make the format more better by referring EXPLAIN output in the other systems. e.g., in the current format in this pr, only
a plan structure is written in the top and all the plan contents are written in the bottom. As another option, we might be able to leave some basic contents (e.g., filter conditions?) in the top as following;

...
         +- Filter (isnotnull(val#29) AND (val#29 > 0)) (11)
            +- ...

The postgresql format does so (only basic plan contents are printed in explain).
There is a room to discuss which content is basic or not though.

@dilipbiswal
Copy link
Contributor Author

@maropu Thank you !! Yeah, we could add more stuff to the header portion of the plan. I had discussed a bit with Sean on this and there are plans to make statistics info available during physical planning. So the plan was to have these stats info printed in the top section of the plan along side each operator (Like row count, num bytes etc). Once we do that, i don't think there would be enough space to print other info without cluttering the output.. but we could always discuss about it :-). Another thing to note is that, we will still be supporting the old plan format. So users would have flexibility to choose between the two ways.

@maropu
Copy link
Member

maropu commented Jul 23, 2019

Once we do that, i don't think there would be enough space to print other info without cluttering the output..

oh, yeah, plan stats also are candidates for that ;) But, as you said, the space is too limited.

So users would have flexibility to choose between the two ways.

Is this expected to be enabled by default? How do we switch the format? It seems some systems accpet options in an explain command as follows;

EXPLAIN (fomrat xxx) SELECT ...

@dilipbiswal
Copy link
Contributor Author

Is this expected to be enabled by default? How do we switch the format? It seems some systems accpet options in an explain command as follows;

EXPLAIN (fomrat xxx) SELECT ...

@maropu Yeah.. my proposal would be to introduce a new format for explain. Currently its controlled by a config. Lets see what wenchen and sean think on this..

@dilipbiswal
Copy link
Contributor Author

cc @cloud-fan If you can please take a look at this one when you have time..

@SparkQA
Copy link

SparkQA commented Jul 26, 2019

Test build #108207 has finished for PR 24759 at commit 42730b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 29, 2019

Test build #108302 has finished for PR 24759 at commit cb4bb2f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 29, 2019

Test build #108312 has finished for PR 24759 at commit cb4bb2f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

This is a big improvement! I have several high-level comments

  1. the *(1) prefix is pretty short. Shall we keep this in the head portion? It can give you a general sense about how is your query being whole-stage-codegened. If you worry about the codegen id may conflict with the operator id, we can omit it and only use * as the prefix.
  2. it's a good idea to separate uncorrelated subqueries from the main query, but how about correlated subqueries? Can you give an example?
  3. Shall we always include the statistics?
  4. what's the general rule for each operator to display information? I think input always need to be included except the leaf node. What about others?

/** ONE line description of this node with more information */
def verboseString(maxFields: Int): String

/** ONE line description of this node with some suffix information */
def verboseStringWithSuffix(maxFields: Int): String = verboseString(maxFields)

def verboseString(
planToOperatorID: mutable.LinkedHashMap[TreeNode[_], Int],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is specific to query plan, we can put it in QueryPlan instead of TreeNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sounds reasonable. Its been a while :-) Let me try to look at the code later today.

@SparkQA
Copy link

SparkQA commented Aug 24, 2019

Test build #109655 has finished for PR 24759 at commit 5f4e8b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 26, 2019

Test build #109725 has finished for PR 24759 at commit f401175.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in c61270f Aug 26, 2019
@dilipbiswal
Copy link
Contributor Author

Thanks a LOT @cloud-fan @gatorsmile @maryannxue @maropu @ekoifman

@gatorsmile
Copy link
Member

gatorsmile commented Sep 1, 2019

@dilipbiswal The Leaf Scan Node is one of the nodes that contain the most important info for perf investigation. Could you create another ticket and submit a follow-up PR on it?

The new node is too simple compared with the old one.

New:

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

Old:

*(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>

@dilipbiswal
Copy link
Contributor Author

@gatorsmile Sure.. I will address it.

@maropu
Copy link
Member

maropu commented Nov 29, 2019

Any plan to support this format in Dataset.explain for 3.0? The user's reputation for this feature in the 3.0-preview was pretty good in Meetup I joined last week. Probably, the comment above blocks the support in Dataset? @cloud-fan @gatorsmile @dilipbiswal

@cloud-fan
Copy link
Contributor

maybe we can add an overload: def explain(mode: ExplainMode) to map the behavior of SQL side.

@maropu
Copy link
Member

maropu commented Dec 2, 2019

ok, I'll try to make a pr later in that approach. Thanks!

dongjoon-hyun pushed a commit that referenced this pull request Dec 10, 2019
### What changes were proposed in this pull request?

This pr intends to add `ExplainMode` for explaining `Dataset/DataFrame` with a given format mode (`ExplainMode`). `ExplainMode` has four types along with the SQL EXPLAIN command: `Simple`, `Extended`, `Codegen`, `Cost`, and `Formatted`.

For example, this pr enables users to explain DataFrame/Dataset with the `FORMATTED` format implemented in #24759;
```
scala> spark.range(10).groupBy("id").count().explain(ExplainMode.Formatted)
== Physical Plan ==
* HashAggregate (3)
+- * HashAggregate (2)
   +- * Range (1)

(1) Range [codegen id : 1]
Output: [id#0L]

(2) HashAggregate [codegen id : 1]
Input: [id#0L]

(3) HashAggregate [codegen id : 1]
Input: [id#0L, count#8L]
```

This comes from [the cloud-fan suggestion.](#24759 (comment))

### Why are the changes needed?

To follow the SQL EXPLAIN command.

### Does this PR introduce any user-facing change?

No, this is just for a new API in Dataset.

### How was this patch tested?

Add tests in `ExplainSuite`.

Closes #26829 from maropu/DatasetExplain.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants