Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35725][SQL] Support optimize skewed partitions in RebalancePartitions #32883

Closed
wants to merge 20 commits into from

Conversation

ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Jun 11, 2021

What changes were proposed in this pull request?

  • Add a new rule OptimizeSkewInRebalancePartitions in AQE queryStageOptimizerRules
  • Add a new config spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled to decide if should enable the new rule

The new rule OptimizeSkewInRebalancePartitions only handle two shuffle origin REBALANCE_PARTITIONS_BY_NONE and REBALANCE_PARTITIONS_BY_COL for data skew issue. And re-use the exists config ADVISORY_PARTITION_SIZE_IN_BYTES to decide what partition size should be.

Why are the changes needed?

Currently, we don't support expand partition dynamically in AQE which is not friendly for some data skew job.

Let's say if we have a simple query:

SELECT /*+ REBALANCE(col) */ * FROM table

The column of col is skewed, then some shuffle partitions would handle too much data than others.

If we haven't inroduced extra shuffle, we can optimize this case by expanding partitions in AQE.

Does this PR introduce any user-facing change?

Yes, a new config

How was this patch tested?

Add test

@github-actions github-actions bot added the SQL label Jun 11, 2021
None
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these code move to ShufflePartitionsUtil so that new rule ExpandShufflePartitions can use them.

@ulysses-you
Copy link
Contributor Author

cc @maropu @cloud-fan @yaooqinn @wangyum @JkSelf thank you for review

@SparkQA
Copy link

SparkQA commented Jun 11, 2021

Test build #139702 has started for PR 32883 at commit 1eef9fb.

@ulysses-you
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 12, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44251/

@SparkQA
Copy link

SparkQA commented Jun 12, 2021

Test build #139726 has finished for PR 32883 at commit 1eef9fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
object ExpandShufflePartitions extends CustomShuffleReaderRule {
override def supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REPARTITION_BY_COL, REPARTITION_BY_NONE)
Copy link
Contributor

@cloud-fan cloud-fan Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can we accept REPARTITION_BY_COL? If people do df.repartition($"a"), we should make sure the output is hash partitioned by column a, isn't it? even if it's the last operator like df.repartition($"a").collect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it only makes sense if the repartition($"a") is added by the framework to optimize table insertion, not added by users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think that if we need a new operator to do the repartition for partitioned table insertion (non partitioned table can use the existing operator, thanks to ce16369), and assign it a new shuffle origin.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If people do df.repartition($"a"), we should make sure the output is hash partitioned by column a, isn't it

yea, we should promise this.

I think it only makes sense if the repartition($"a") is added by the framework to optimize table insertion, not added by users.

yea, we can use a new operator and shuffle origin to distinguish if it is added by user or framework. Then only optimize the operator added by framework.

The origin idea of this PR is that add a config to let user decide if repartition($"a") can expand partitions which break the semantics, so user can use it in SQL query easily. After some thought maybe it's better to add a new hint that can support expand partitions ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can start with the new operator first, and think of the user-facing API later. Maybe we don't need a user-facing API and the new operator can only be used by the table insertion optimizer rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan created #32932. in order to show the usage, that PR added a new hint.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we try to improve this case?

CREATE TABLE t1 USING parquet PARTITIONED BY (p2)
AS
(SELECT id, id % 1000 AS p1, if(id < 3000000, 1, id % 100) AS p2 FROM range(3100000) distribute by p2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, here are two approach to optimize this case you give:

  • this PR can optimize it directly using a new config, but a potential issue is the distribute by semantics of output partitioning will be changed.
  • #32932 is going to use a new operator which does not guarantee the output partitioning, then we can optimize the new operator safely.

@HyukjinKwon
Copy link
Member

cc @maryannxue can you review this please?

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44785/

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44785/

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Test build #140256 has finished for PR 32883 at commit 844393e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -642,6 +642,15 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0L)

val ADAPTIVE_EXPAND_PARTITIONS_ENABLED =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a config, as people can exclude any optimizer rule by name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cann't do this, currently only AQE Optimizer support exclude rule

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then we need to think about the config name.

How about spark.sql.adaptive.optimizeSkewsInRebalacePartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@@ -99,6 +99,7 @@ case class AdaptiveSparkPlanExec(
// Skew join does not handle `CustomShuffleReader` so needs to be applied first.
OptimizeSkewedJoin,
CoalesceShufflePartitions(context.session),
ExpandShufflePartitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do it before CoalesceShufflePartitions? to follow OptimizeSkewedJoin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, seems better

* / \
* r0:[m0-b0, m1-b0, m2-b0], r1:[m0-b1], r2:[m1-b1], r3:[m2-b1], r4[m0-b2, m1-b2, m2-b2]
*/
object ExpandShufflePartitions extends CustomShuffleReaderRule {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd name it OptimizeSkewedRebalance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OptimizeSkewedPartitions ?

updateShuffleReaders(plan)
}

override def apply(plan: SparkPlan): SparkPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the case we want to support is pretty simple: the root rebalance node.

override def apply(plan: SparkPlan): SparkPlan = plan match {
  case s: ShuffleQueryStageExec ...

  case _ => plan
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this limitation. people may use rebalance in some other place. A extreme case:

Generate
  Rebalance
    Aggregate(skewed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to expand the use cases of Rebalance, please mention it explicitly and discuss the relevant changes we need to make. Otherwise, let's assume Rebalance is only used to optimize file writing and do no introduce complexity for non-existing use cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

* CoalescedPartition for normal partition.
* - has `ShufflePartitionSpec` before expand that should be CoalescedPartition
* We use some PartialReducerPartitionSpecs to replace CoalescedPartition for every
* large partition, and use origin ShufflePartitionSpec for normal partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic looks overly complicated. Is it because we run the new rule after CoalesceShufflePartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, will simplify it

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44794/

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44794/

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Test build #140264 has finished for PR 32883 at commit 2f55a1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44942/

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Test build #140414 has finished for PR 32883 at commit fd44620.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -98,6 +98,7 @@ case class AdaptiveSparkPlanExec(
ReuseAdaptiveSubquery(context.subqueryCache),
// Skew join does not handle `CustomShuffleReader` so needs to be applied first.
OptimizeSkewedJoin,
OptimizeSkewedPartitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make it more clear: OptimizeSkewInRebalancePartitions

val ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will optimize the " +
"skewed shuffle partition to some small partitions according to the target size " +
Copy link
Contributor

@cloud-fan cloud-fan Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skewed shuffle partitions in RebalancePartitions and split them to smaller ones ..., then we can remove the Note that, ... part.

import org.apache.spark.sql.internal.SQLConf

/**
* A rule to optimize the skewed shuffle partitions based on the map output statistics, which can
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skewed shuffle partitions in RebalancePartitions

* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \
* r0:[m0-b0, m1-b0, m2-b0], r1:[m0-b1], r2:[m1-b1], r3:[m2-b1], r4[m0-b2, m1-b2, m2-b2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd name them r0, r1-0, r1-1, r1-2, r2

* after split. Create a list of `PartialMapperPartitionSpec` for skewed partition and
* create `CoalescedPartition` for normal partition.
*/
def optimizeSkewedPartitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only called in the new rule, shall we move it to the new rule?

@@ -1806,7 +1806,8 @@ class AdaptiveQueryExecSuite
}

test("SPARK-35650: Use local shuffle reader if can not coalesce number of partitions") {
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2") {
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just disable partition coalesce in this test, so that we don't need to tune ADVISORY_PARTITION_SIZE_IN_BYTES which affects skew handling?

withTempView("v") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we turn it on to make sure the new optimization works well with it?

Comment on lines 67 to 73
plan match {
case shuffle: ShuffleQueryStageExec
if supportedShuffleOrigins.contains(shuffle.shuffle.shuffleOrigin) =>
optimizeSkewedPartitions(shuffle)
case _ => plan
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make it only works with DataWritingCommandExec and InsertIntoDataSourceExec? For example:

    if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
      return plan
    }

    plan match {
      case d @ DataWritingCommandExec(_, child) if supportOptimization(d) =>
        handleSkewed(d, child)
      case i @ InsertIntoDataSourceExec(child, _, _, _, _) if supportOptimization(i) =>
        handleSkewed(i, child)
      case _ => plan
    }
  }

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44953/

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44953/

@@ -1806,7 +1806,8 @@ class AdaptiveQueryExecSuite
}

test("SPARK-35650: Use local shuffle reader if can not coalesce number of partitions") {
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "false") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to disable this feature in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not need, the default value of target size is big enough

}

withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100") {
// partition size [0,258,72,72,72]
Copy link
Contributor

@cloud-fan cloud-fan Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we tune the size a little more, so that coalesce also applies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, tune up to 150

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Test build #140445 has finished for PR 32883 at commit 8e706ae.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Test build #140439 has finished for PR 32883 at commit 7666145.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44959/

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44959/

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in d46c1e3 Jun 30, 2021
@ulysses-you
Copy link
Contributor Author

thank you all !

@ulysses-you ulysses-you deleted the expand-partition branch July 1, 2021 01:34
domybest11 pushed a commit to domybest11/spark that referenced this pull request Jun 15, 2022
…titions

### What changes were proposed in this pull request?

* Add a new rule `ExpandShufflePartitions` in AQE `queryStageOptimizerRules`
* Add a new config `spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled` to decide if should enable the new rule

The new rule `OptimizeSkewInRebalancePartitions` only handle two shuffle origin `REBALANCE_PARTITIONS_BY_NONE` and `REBALANCE_PARTITIONS_BY_COL` for data skew issue. And re-use the exists config `ADVISORY_PARTITION_SIZE_IN_BYTES` to decide what partition size should be.

### Why are the changes needed?

Currently, we don't support expand partition dynamically in AQE which is not friendly for some data skew job.

Let's say if we have a simple query:
```
SELECT /*+ REBALANCE(col) */ * FROM table
```

The column of `col` is skewed, then some shuffle partitions would handle too much data than others.

If we haven't inroduced extra shuffle, we can optimize this case by expanding partitions in AQE.

### Does this PR introduce _any_ user-facing change?

Yes, a new config

### How was this patch tested?

Add test

Closes apache#32883 from ulysses-you/expand-partition.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants