[SPARK-56241][SQL] Derive outputOrdering from KeyedPartitioning key expressions#55036
Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
It's a nice improvement. I expected many generated query plan changes in the test case, but there is no change from the existing generated plan. Is there any reason, @peter-toth ?
We don't have any prodiction ready DSv2 filesources in Spark so the generated test plans / expected outputs doesn't cover this feature either. |
|
Got it~ |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Thank you, @peter-toth .
|
cc @cloud-fan , @szehon-ho , @aokolnychyi , @gengliangwang , too |
|
Iceberg can benefit from the change. |
7946dce to
4260f53
Compare
|
Marked as draft for now. Let me doublecheck a few edgecases as changing the reported ordering without the concept of constant order, which would be safe to prepend to any ordering, can be problematic. |
…xpressions
### What changes were proposed in this pull request?
Within a `KeyedPartitioning` partition, all rows share the same key value, so
the key expressions are trivially sorted (ascending) within each partition.
This PR makes two plan nodes expose that structural guarantee via
`outputOrdering`:
- **`DataSourceV2ScanExecBase`**: when `outputPartitioning` is a
`KeyedPartitioning` and the source reports no ordering via
`SupportsReportOrdering`, derive one ascending `SortOrder` per key
expression. When the source does report ordering, it is returned as-is.
- **`GroupPartitionsExec`**:
- *Non-coalescing* (every group has ≤ 1 input partition): pass through
`child.outputOrdering` unchanged.
- *Coalescing without reducers*: re-derive ordering from the output
`KeyedPartitioning` key expressions; a join may embed multiple
`KeyedPartitioning`s with different expressions — expose equivalences
via `sameOrderExpressions`.
- *Coalescing with reducers*: fall back to `super.outputOrdering` (empty),
because merged partitions share only the reduced key.
### Why are the changes needed?
Before this change, `outputOrdering` on both nodes returned an empty sequence
(unless `SupportsReportOrdering` was implemented), even though the within-
partition ordering was structurally guaranteed by the partitioning itself.
As a result, `EnsureRequirements` would insert a redundant `SortExec` before
`SortMergeJoin` inputs that are already in key order.
### Does this PR introduce _any_ user-facing change?
Yes. Queries involving storage-partitioned joins (v2 bucketing) no longer add
a redundant `SortExec` before `SortMergeJoin` when the join keys match the
partition keys, reducing CPU and memory overhead.
### How was this patch tested?
- New unit test class `GroupPartitionsExecSuite` covering all four
`outputOrdering` branches (non-coalescing, coalescing without reducers with
single and multi-key, join `sameOrderExpressions`, coalescing with reducers).
- New SQL integration tests in `KeyGroupedPartitioningSuite` (SPARK-56241):
- Scan with `KeyedPartitioning` reports key-derived `outputOrdering`.
- Non-coalescing `GroupPartitionsExec` (non-identical key sets) passes
through child ordering — no pre-join `SortExec`.
- Coalescing `GroupPartitionsExec` derives ordering from key expressions —
no pre-join `SortExec`.
- Updated expected output in `DataSourceV2Suite` for the case where a source
is partitioned by a key with no reported ordering — groupBy on the partition
key no longer requires a sort.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6
4260f53 to
f28c056
Compare
|
This PR now follows a safer approach and doesn't alter the reported ordering. The failures doesn't seem related, but looks like the same we hit here as well: #55048 (comment) |
| */ | ||
| override def outputOrdering: Seq[SortOrder] = ordering.getOrElse(super.outputOrdering) | ||
| override def outputOrdering: Seq[SortOrder] = { | ||
| val reportedOrdering = ordering.getOrElse(Seq.empty) |
There was a problem hiding this comment.
minor suggestion:
override def outputOrdering: Seq[SortOrder] = {
ordering match {
case Some(reported) if reported.nonEmpty => reported
case _ =>
outputPartitioning match {
case k: KeyedPartitioning => k.expressions.map(SortOrder(_, Ascending))
case _ => Seq.empty
}
}
}
| if (reportedOrdering.nonEmpty) { | ||
| reportedOrdering | ||
| } else { | ||
| outputPartitioning match { |
There was a problem hiding this comment.
is there any way to make not calculate every time? (is it an issue?)
| super.outputOrdering | ||
| // Coalescing: multiple input partitions are merged into one output partition. The child's | ||
| // within-partition ordering is lost due to concatenation, so we rederive ordering purely from | ||
| // the key expressions. A join may embed multiple `KeyedPartitioning`s (one per join side) |
There was a problem hiding this comment.
nit: re-derive?
also, maybe a short example would help
| // Without reducers all merged partitions share the same original key value, so the key | ||
| // expressions remain constant within the output partition. | ||
| val keyedPartitionings = p.collect { case k: KeyedPartitioning => k } | ||
| keyedPartitionings.map(_.expressions).transpose.map { exprs => |
There was a problem hiding this comment.
just trying to wrap my head around it, is it because its the same partition value? thats why its ordered?
|
I also feel we should gate this behind a flag, as its a new feature with certain risk |
What changes were proposed in this pull request?
Within a
KeyedPartitioningpartition, all rows share the same key value, so the key expressions are trivially sorted within each partition.This PR makes two plan nodes expose that structural guarantee via
outputOrdering:DataSourceV2ScanExecBase: whenoutputPartitioningis aKeyedPartitioningand the source reports no ordering viaSupportsReportOrdering, derive one ascendingSortOrderper key expression. When the source does report ordering, it is returned as-is.GroupPartitionsExec:child.outputOrderingunchanged.KeyedPartitioningkey expressions; a join may embed multipleKeyedPartitionings with different expressions — expose equivalences viasameOrderExpressions.super.outputOrdering(empty), because merged partitions share only the reduced key.Why are the changes needed?
Before this change,
outputOrderingon both nodes returned an empty sequence (unlessSupportsReportOrderingwas implemented), even though the within-partition ordering was structurally guaranteed by the partitioning itself.As a result,
EnsureRequirementswould insert a redundantSortExecbeforeSortMergeJoininputs that are already in key order.Does this PR introduce any user-facing change?
Yes. Queries involving storage-partitioned joins (v2 bucketing) no longer add a redundant
SortExecbeforeSortMergeJoinwhen the join keys match the partition keys, reducing CPU and memory overhead.How was this patch tested?
GroupPartitionsExecSuitecovering all fouroutputOrderingbranches (non-coalescing, coalescing without reducers with single and multi-key, joinsameOrderExpressions, coalescing with reducers).KeyGroupedPartitioningSuite:KeyedPartitioningreports key-derivedoutputOrdering.GroupPartitionsExec(non-identical key sets) passes through child ordering — no pre-joinSortExec.GroupPartitionsExecderives ordering from key expressions — no pre-joinSortExec.DataSourceV2Suitefor the case where a source is partitioned by a key with no reported ordering — groupBy on the partition key no longer requires a sort.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6