Skip to content

Commit

Permalink
[SPARK-40588] FileFormatWriter materializes AQE plan before accessing…
Browse files Browse the repository at this point in the history
… outputOrdering

### What changes were proposed in this pull request?
The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan.

### Why are the changes needed?
`FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588).

### Does this PR introduce _any_ user-facing change?
This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4.

### How was this patch tested?
The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario.

Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files.

The actual plan written into the files changed from

```
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30]
         +- BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastExchange IdentityBroadcastMode, [id=#28]
            :  +- Project [id#0L AS day#2L]
            :     +- Range (0, 2, step=1, splits=2)
            +- Range (0, 10000000, step=1, splits=2)
```

where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to

```
*(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68]
         +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastQueryStage 0
            :  +- BroadcastExchange IdentityBroadcastMode, [id=#42]
            :     +- *(1) Project [id#0L AS day#2L]
            :        +- *(1) Range (0, 2, step=1, splits=2)
            +- *(2) Range (0, 1000000, step=1, splits=2)
```

where the sort given by the user is the outermost sort now.

Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
EnricoMi authored and cloud-fan committed Nov 9, 2022
1 parent ef74381 commit f0cad7a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ case class AdaptiveSparkPlanExec(
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
}

def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)

private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
if (isFinalPlan) return currentPhysicalPlan

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -187,8 +188,17 @@ object FileFormatWriter extends Logging {
// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering =
partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns

// SPARK-40588: plan may contain an AdaptiveSparkPlanExec, which does not know
// its final plan's ordering, so we have to materialize that plan first
def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan))
}
val materializedPlan = materializeAdaptiveSparkPlan(empty2NullPlan)

// the sort order doesn't matter
val actualOrdering = empty2NullPlan.outputOrdering.map(_.child)
val actualOrdering = materializedPlan.outputOrdering.map(_.child)
val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
false
} else {
Expand All @@ -210,7 +220,7 @@ object FileFormatWriter extends Logging {

try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(empty2NullPlan.execute(), None)
(materializedPlan.execute(), None)
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
Expand All @@ -220,12 +230,12 @@ object FileFormatWriter extends Logging {
val sortPlan = SortExec(
orderingExpr,
global = false,
child = empty2NullPlan)
child = materializedPlan)

val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
if (concurrentWritersEnabled) {
(empty2NullPlan.execute(),
(materializedPlan.execute(),
Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())))
} else {
(sortPlan.execute(), None)
Expand Down

0 comments on commit f0cad7a

Please sign in to comment.