Skip to content

Commit

Permalink
[SPARK-29655][SQL] Read bucketed tables obeys spark.sql.shuffle.parti…
Browse files Browse the repository at this point in the history
…tions

### What changes were proposed in this pull request?

In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`.

How to reproduce:
```scala
val bucketedTableName = "bucketed_table"
spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName)
val bucketedTable = spark.table(bucketedTableName)
val df = spark.range(8)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Spark 2.4. spark.sql.adaptive.enabled=false
// We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case.
spark.conf.set("spark.sql.shuffle.partitions", 500)
bucketedTable.join(df, "id").explain()
// Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases.
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000)
bucketedTable.join(df, "id").explain()
```

```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
*(4) Project [id#5L]
+- *(4) SortMergeJoin [id#5L], [id#7L], Inner
   :- *(1) Sort [id#5L ASC NULLS FIRST], false, 0
   :  +- *(1) Project [id#5L]
   :     +- *(1) Filter isnotnull(id#5L)
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
   +- *(3) Sort [id#7L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#7L, 500), true, [id=#49]
         +- *(2) Range (0, 8, step=1, splits=16)
```
vs
```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Project [id#5L]
   +- SortMergeJoin [id#5L], [id#7L], Inner
      :- Sort [id#5L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#5L, 1000), true, [id=#93]
      :     +- Project [id#5L]
      :        +- Filter isnotnull(id#5L)
      :           +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
      +- Sort [id#7L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#7L, 1000), true, [id=#92]
            +- Range (0, 8, step=1, splits=16)
```

This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`.

### Why are the changes needed?
Do not degrade performance after enabling adaptive execution.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #26409 from wangyum/SPARK-29655.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
wangyum authored and cloud-fan committed Nov 15, 2019
1 parent 0c68578 commit 4f10e54
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 4 deletions.
Expand Up @@ -83,7 +83,24 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
numPartitionsSet.headOption
}

val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)
// If there are non-shuffle children that satisfy the required distribution, we have
// some tradeoffs when picking the expected number of shuffle partitions:
// 1. We should avoid shuffling these children.
// 2. We should have a reasonable parallelism.
val nonShuffleChildrenNumPartitions =
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
.map(_.outputPartitioning.numPartitions)
val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
// Here we pick the max number of partitions among these non-shuffle children as the
// expected number of shuffle partitions. However, if it's smaller than
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
// expected number of shuffle partitions.
math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions)
} else {
childrenNumPartitions.max
}

val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions)

children = children.zip(requiredChildDistributions).zipWithIndex.map {
case ((child, distribution), index) if childrenIndexes.contains(index) =>
Expand Down
Expand Up @@ -274,6 +274,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
.setMaster("local[*]")
.setAppName("test")
.set(UI_ENABLED, false)
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
Expand Down Expand Up @@ -507,7 +508,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
join,
expectedAnswer.collect())

// Then, let's make sure we do not reduce number of ppst shuffle partitions.
// Then, let's make sure we do not reduce number of post shuffle partitions.
val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect {
Expand Down
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.BucketingUtils
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
Expand Down Expand Up @@ -382,8 +383,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
joined.sort("bucketed_table1.k", "bucketed_table2.k"),
df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k"))

assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
assert(executedPlan.isInstanceOf[SortMergeJoinExec])
executedPlan.asInstanceOf[SortMergeJoinExec]
} else {
val executedPlan = joined.queryExecution.executedPlan
assert(executedPlan.isInstanceOf[SortMergeJoinExec])
executedPlan.asInstanceOf[SortMergeJoinExec]
}

// check existence of shuffle
assert(
Expand Down Expand Up @@ -795,4 +804,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
}
}

test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
withSQLConf(
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
Seq(false, true).foreach { enableAdaptive =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$enableAdaptive") {
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
testBucketing(
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j"))
)
}
}
}
}
}

0 comments on commit 4f10e54

Please sign in to comment.