Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30864][SQL][DOC] add the user guide for Adaptive Query Execution #27616

Closed
wants to merge 8 commits into from

Conversation

JkSelf
Copy link
Contributor

@JkSelf JkSelf commented Feb 18, 2020

What changes were proposed in this pull request?

This PR will add the user guide for AQE and the detailed configurations about the three mainly features in AQE.

Why are the changes needed?

Add the detailed configurations.

Does this PR introduce any user-facing change?

No

How was this patch tested?

only add doc no need ut.

@JkSelf
Copy link
Contributor Author

@JkSelf JkSelf commented Feb 18, 2020

cc @cloud-fan

@@ -186,3 +186,75 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that make use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. There are three mainly feature in AQE, including coalescing post partition number, optimizing local shuffle reader and optimizing skewed join.
Copy link
Contributor

@cloud-fan cloud-fan Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes use

Copy link
Contributor

@cloud-fan cloud-fan Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of Spark 3.0, there are three major features ...

Copy link
Contributor

@cloud-fan cloud-fan Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

post-shuffle partition number

Copy link
Contributor Author

@JkSelf JkSelf Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that make use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. There are three mainly feature in AQE, including coalescing post partition number, optimizing local shuffle reader and optimizing skewed join.
### Coalescing Post Shuffle Partition Num
Copy link
Contributor

@cloud-fan cloud-fan Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that make use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. There are three mainly feature in AQE, including coalescing post partition number, optimizing local shuffle reader and optimizing skewed join.
### Coalescing Post Shuffle Partition Num
This feature coalesces the post shuffle partitions based on the map output statistics when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled` configuration properties are both enabled. There are four following sub-configurations in this optimization rule.
Copy link
Contributor

@cloud-fan cloud-fan Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we introduce the benefits of this feature?

</table>

### Optimize Local Shuffle Reader
This feature optimize the shuffle reader to local shuffle reader when converting the sort merge join to broadcast hash join in runtime and no additional shuffle introduced. It takes effect when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.shuffle.localShuffleReader.enabled` configuration properties are both enabled.
Copy link
Contributor

@cloud-fan cloud-fan Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, users care more about the benefit

Copy link
Contributor Author

@JkSelf JkSelf Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the performance data both three features.

@SparkQA
Copy link

@SparkQA SparkQA commented Feb 18, 2020

Test build #118613 has finished for PR 27616 at commit 19a381b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

@SparkQA SparkQA commented Feb 18, 2020

Test build #118618 has finished for PR 27616 at commit 727f57f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -186,3 +186,75 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partition number, optimizing local shuffle reader and optimizing skewed join.
Copy link
Contributor

@cloud-fan cloud-fan Feb 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: coalescing post-shuffle partitions, local shuffle reader optimization and skewed join optimization.

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partition number, optimizing local shuffle reader and optimizing skewed join.
### Coalescing Post Shuffle Partition Number
This feature coalesces the post shuffle partitions based on the map output statistics when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled` configuration properties are both enabled. There are four following sub-configurations in this optimization rule. And this feature can bring about 1.28x performance gain with query 38 in 3TB TPC-DS.
Copy link
Contributor

@cloud-fan cloud-fan Feb 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update to the latest config names.

Copy link
Contributor

@cloud-fan cloud-fan Feb 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this feature can bring about 1.28x performance gain with query 38 in 3TB TPC-DS. This is not useful... how about something like

This feature simplifies the tuning of shuffle partitions number when running queries. You don't need to
set a proper shuffle partition number that just fits your data. You just need to set a large enough number and
Spark can pick the proper shuffle partition number at runtime.

</table>

### Optimize Local Shuffle Reader
This feature optimize the shuffle reader to local shuffle reader when converting the sort merge join to broadcast hash join in runtime and no additional shuffle introduced. It takes effect when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.shuffle.localShuffleReader.enabled` configuration properties are both enabled. This feature and coalescing post shuffle partition number feature can bring about 1.76x performance gain with query 77 in 3TB TPC-DS.
Copy link
Contributor

@cloud-fan cloud-fan Feb 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, don't put perf number in a user guide. Just briefly explain how it affects user queries. E.g. save network traffic

@SparkQA
Copy link

@SparkQA SparkQA commented Feb 27, 2020

Test build #119003 has finished for PR 27616 at commit 8ff8b71.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

@SparkQA SparkQA commented Feb 27, 2020

Test build #119002 has finished for PR 27616 at commit 52fcdf3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

@SparkQA SparkQA commented Mar 6, 2020

Test build #119455 has finished for PR 27616 at commit 4fdbe10.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Mar 10, 2020

cc @maryannxue


## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, local shuffle reader optimization and skewed join optimization.
### Coalescing Post Shuffle Partition Number
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a leading space here which should be removed. Also newline should be inserted prior to '###'.

</tr>
</table>

### Optimize Local Shuffle Reader
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's another leading space here.


### Optimize Local Shuffle Reader
This feature optimize the shuffle reader to local shuffle reader when converting the sort merge join to broadcast hash join in runtime and no additional shuffle introduced. It takes effect when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.localShuffleReader.enabled` configuration properties are both enabled. This feature can improve the performance by saving the network overhead of shuffle process.
### Optimize Skewed Join
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

### Optimize Local Shuffle Reader
This feature optimize the shuffle reader to local shuffle reader when converting the sort merge join to broadcast hash join in runtime and no additional shuffle introduced. It takes effect when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.localShuffleReader.enabled` configuration properties are both enabled. This feature can improve the performance by saving the network overhead of shuffle process.
### Optimize Skewed Join
This feature choose the skewed partition and creates multi tasks to handle the skewed partition when both enable `spark.sql.adaptive.enabled` and `spark.sql.adaptive.skewJoin.enabled`. There are two following sub-configurations in this optimization rule. Data skew can severely downgrade performance of join queries. And this feature can split the skewed partition into multi parallel tasks instead of original 1 task to reduce the overhead of skewed join.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

choose -> chooses

### Optimize Local Shuffle Reader
This feature optimize the shuffle reader to local shuffle reader when converting the sort merge join to broadcast hash join in runtime and no additional shuffle introduced. It takes effect when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.localShuffleReader.enabled` configuration properties are both enabled. This feature can improve the performance by saving the network overhead of shuffle process.
### Optimize Skewed Join
This feature choose the skewed partition and creates multi tasks to handle the skewed partition when both enable `spark.sql.adaptive.enabled` and `spark.sql.adaptive.skewJoin.enabled`. There are two following sub-configurations in this optimization rule. Data skew can severely downgrade performance of join queries. And this feature can split the skewed partition into multi parallel tasks instead of original 1 task to reduce the overhead of skewed join.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi -> multiple

</table>

### Optimize Local Shuffle Reader
This feature optimize the shuffle reader to local shuffle reader when converting the sort merge join to broadcast hash join in runtime and no additional shuffle introduced. It takes effect when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.localShuffleReader.enabled` configuration properties are both enabled. This feature can improve the performance by saving the network overhead of shuffle process.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no additional shuffle introduced -> no additional shuffle is introduced

</table>

### Optimize Local Shuffle Reader
This feature optimize the shuffle reader to local shuffle reader when converting the sort merge join to broadcast hash join in runtime and no additional shuffle introduced. It takes effect when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.localShuffleReader.enabled` configuration properties are both enabled. This feature can improve the performance by saving the network overhead of shuffle process.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when when spark.sql.adaptive.enabled and spark.sql.adaptive.localShuffleReader.enabled configuration properties are both enabled. ->

when both spark.sql.adaptive.enabled and spark.sql.adaptive.localShuffleReader.enabled configuration are enabled.

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, local shuffle reader optimization and skewed join optimization.
### Coalescing Post Shuffle Partition Number
This feature coalesces the post shuffle partitions based on the map output statistics when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.coalescePartitions.enabled` configuration properties are both enabled. There are four following sub-configurations in this optimization rule. This feature simplifies the tuning of shuffle partitions number when running queries. You don't need to set a proper shuffle partition number to fit your dataset. You just need to set a large enough number and Spark can pick the proper shuffle partition number at runtime.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled configuration properties are both enabled. ->

when both spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled configurations are enabled.

Copy link
Contributor

@cloud-fan cloud-fan Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just need to set a large enough number -> You just need to set a large enough initial number of shuffle partitions via xxx config, and ...

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, local shuffle reader optimization and skewed join optimization.
### Coalescing Post Shuffle Partition Number
This feature coalesces the post shuffle partitions based on the map output statistics when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.coalescePartitions.enabled` configuration properties are both enabled. There are four following sub-configurations in this optimization rule. This feature simplifies the tuning of shuffle partitions number when running queries. You don't need to set a proper shuffle partition number to fit your dataset. You just need to set a large enough number and Spark can pick the proper shuffle partition number at runtime.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid abbreviation in the public documentation. don't -> do not

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, local shuffle reader optimization and skewed join optimization.
### Coalescing Post Shuffle Partition Number
This feature coalesces the post shuffle partitions based on the map output statistics when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.coalescePartitions.enabled` configuration properties are both enabled. There are four following sub-configurations in this optimization rule. This feature simplifies the tuning of shuffle partitions number when running queries. You don't need to set a proper shuffle partition number to fit your dataset. You just need to set a large enough number and Spark can pick the proper shuffle partition number at runtime.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle partitions number -> the shuffle partition number

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, local shuffle reader optimization and skewed join optimization.
### Coalescing Post Shuffle Partition Number
This feature coalesces the post shuffle partitions based on the map output statistics when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.coalescePartitions.enabled` configuration properties are both enabled. There are four following sub-configurations in this optimization rule. This feature simplifies the tuning of shuffle partitions number when running queries. You don't need to set a proper shuffle partition number to fit your dataset. You just need to set a large enough number and Spark can pick the proper shuffle partition number at runtime.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to set a proper shuffle partition number to fit your dataset. You just need to set a large enough number and Spark can pick the proper shuffle partition number at runtime.

It sounds like a marketing phrase. I would write like Spark picks a proper shuffle number at runtime once you set the number large enough.

<td><code>spark.sql.adaptive.coalescePartitions.initialPartitionNum</code></td>
<td>200</td>
<td>
The advisory number of post-shuffle partitions used in adaptive execution. This is used as the initial number of pre-shuffle partitions. By default it equals to <code>spark.sql.shuffle.partitions</code>.
Copy link
Contributor

@cloud-fan cloud-fan Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we copy the doc from SQLConf?

</tr>
<tr>
<td><code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code></td>
<td>67108864 (64 MB)</td>
Copy link
Contributor

@cloud-fan cloud-fan Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just write 64 MB

<td><code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code></td>
<td>67108864 (64 MB)</td>
<td>
The target post-shuffle input size in bytes of a task when <code>spark.sql.adaptive.enabled</code> and <code>spark.sql.adaptive.coalescePartitions.enabled</code> are both enabled.
Copy link
Contributor

@cloud-fan cloud-fan Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, let's make sure all the config doc are the same with SQLConf

### Optimize Local Shuffle Reader
This feature optimize the shuffle reader to local shuffle reader when converting the sort merge join to broadcast hash join in runtime and no additional shuffle introduced. It takes effect when `spark.sql.adaptive.enabled` and `spark.sql.adaptive.localShuffleReader.enabled` configuration properties are both enabled. This feature can improve the performance by saving the network overhead of shuffle process.
### Optimize Skewed Join
This feature choose the skewed partition and creates multi tasks to handle the skewed partition when both enable `spark.sql.adaptive.enabled` and `spark.sql.adaptive.skewJoin.enabled`. There are two following sub-configurations in this optimization rule. Data skew can severely downgrade performance of join queries. And this feature can split the skewed partition into multi parallel tasks instead of original 1 task to reduce the overhead of skewed join.
Copy link
Contributor

@cloud-fan cloud-fan Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it simple:

Data skew can severely downgrade the performance of join queries. This feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. It takes effect when both abc and xyz configurations are enabled.

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Mar 12, 2020

A high-level comment: "Optimize Local Shuffle Reader" should be a sub-feature of converting SMJ to BHJ. We should mention that instead.

@JkSelf
Copy link
Contributor Author

@JkSelf JkSelf commented Mar 13, 2020

@HyukjinKwon @cloud-fan Thanks for your review. Updated.

@SparkQA
Copy link

@SparkQA SparkQA commented Mar 13, 2020

Test build #119733 has finished for PR 27616 at commit 9edc644.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -186,3 +186,63 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, local shuffle reader optimization and skewed join optimization.
Copy link
Contributor

@cloud-fan cloud-fan Mar 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

including coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skewed join optimization.

Copy link
Contributor

@cloud-fan cloud-fan Mar 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local shuffle reader optimization is just one improvement of converting sort-merge join to broadcast join

@SparkQA
Copy link

@SparkQA SparkQA commented Mar 16, 2020

Test build #119832 has finished for PR 27616 at commit 13aa51b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JkSelf
Copy link
Contributor Author

@JkSelf JkSelf commented Mar 16, 2020

retest this please

@@ -186,3 +186,63 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skewed join optimization.
Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: coalescing coalescing

Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skew join is actually more commonly used.

## Adaptive Query Execution
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skewed join optimization.

### Coalescing Post Shuffle Partition Number
Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coalescing Post Shuffle Partitios

Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skewed join optimization.

### Coalescing Post Shuffle Partition Number
This feature coalesces the post shuffle partitions based on the map output statistics when both `spark.sql.adaptive.enabled` and `spark.sql.adaptive.coalescePartitions.enabled` configuration properties are enabled. There are four following sub-configurations in this optimization rule. This feature simplifies the tuning of shuffle partition number when running queries. You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via `spark.sql.adaptive.coalescePartitions.initialPartitionNum` configuration.
Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when both xxx and yyy configuration properties are enabled reads a little weird. How about when both xxx and yyy configurations are true?

Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are four following sub-configurations in this optimization rule. Can we remove this sentence? This looks not useful as users can see all the configs in the following table.

</tr>
</table>

### Optimize Local Shuffle Reader
Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting sort-merge join to broadcast hash join

</table>

### Optimize Local Shuffle Reader
AQE converts the sort merge join to broad cast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This feature can optimize the shuffle reader to local shuffle reader after converting the sort merge join to broadcast hash join at runtime and if no additional shuffle is introduced. It takes effect when both `spark.sql.adaptive.enabled` and `spark.sql.adaptive.localShuffleReader.enabled` configuration properties are enabled. This feature can improve the performance by saving the network overhead of shuffle process.
Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broad cast -> broadcast

Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AQE converts sort-merge join to broadcast hash join when the runtime statistics of
any join side is smaller than the broadcast hash join threshold. This is not as efficient as
planning a broadcast hash join in the first place, but it's better than keep doing the
sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network
traffic(if `spark.sql.adaptive.localShuffleReader.enabled` is true)

### Optimize Local Shuffle Reader
AQE converts the sort merge join to broad cast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This feature can optimize the shuffle reader to local shuffle reader after converting the sort merge join to broadcast hash join at runtime and if no additional shuffle is introduced. It takes effect when both `spark.sql.adaptive.enabled` and `spark.sql.adaptive.localShuffleReader.enabled` configuration properties are enabled. This feature can improve the performance by saving the network overhead of shuffle process.

### Optimize Skewed Join
Copy link
Contributor

@cloud-fan cloud-fan Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimizing Skew Join

@SparkQA
Copy link

@SparkQA SparkQA commented Mar 16, 2020

Test build #119843 has finished for PR 27616 at commit 13aa51b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan changed the title [SPARK-30864] [SQL]add the user guide for Adaptive Query Execution [SPARK-30864][SQL][DOC] add the user guide for Adaptive Query Execution Mar 16, 2020
@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Mar 16, 2020

thanks, merging to master/3.0!

@cloud-fan cloud-fan closed this in 21c02ee Mar 16, 2020
cloud-fan pushed a commit that referenced this issue Mar 16, 2020
### What changes were proposed in this pull request?
This PR will add the user guide for AQE and the detailed configurations about the three mainly features in AQE.

### Why are the changes needed?
Add the detailed configurations.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
only add doc no need ut.

Closes #27616 from JkSelf/aqeuserguide.

Authored-by: jiake <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 21c02ee)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@SparkQA
Copy link

@SparkQA SparkQA commented Mar 16, 2020

Test build #119868 has finished for PR 27616 at commit 8289200.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

sjincho pushed a commit to sjincho/spark that referenced this issue Apr 15, 2020
### What changes were proposed in this pull request?
This PR will add the user guide for AQE and the detailed configurations about the three mainly features in AQE.

### Why are the changes needed?
Add the detailed configurations.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
only add doc no need ut.

Closes apache#27616 from JkSelf/aqeuserguide.

Authored-by: jiake <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants