Skip to content

[SPARK-55535][SQL][FOLLOW-UP] Fix OrderedDistribution handling and minor improvements to EnsureRequirements#54727

Closed
peter-toth wants to merge 5 commits intoapache:masterfrom
peter-toth:SPARK-55535-refactor-kgp-and-spj-follow-up
Closed

[SPARK-55535][SQL][FOLLOW-UP] Fix OrderedDistribution handling and minor improvements to EnsureRequirements#54727
peter-toth wants to merge 5 commits intoapache:masterfrom
peter-toth:SPARK-55535-refactor-kgp-and-spj-follow-up

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Mar 10, 2026

What changes were proposed in this pull request?

This is a follow-up PR to #54330 to fix OrderedDistribution handling in EnsureRequirements so as to avoid a correctness bug. The PR contains minor improvements to EnsureRequirements and configuration docs updates as well.

Why are the changes needed?

To fix a correctness bug introduced with the refactor.

Does this PR introduce any user-facing change?

Yes, but the refactor (#54330) hasn't been released.

How was this patch tested?

Added new UT.

Was this patch authored or co-authored using generative AI tooling?

No.

…other improvements to `EnsureRequirements`
.map(_.asInstanceOf[Attribute])
val keyRowOrdering = RowOrdering.create(o.ordering, attrs)
val keyOrdering = keyRowOrdering.on((t: InternalRowComparableWrapper) => t.row)
val sorted = satisfyingKeyedPartitioning.partitionKeys.sorted(keyOrdering)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug is that sorted should be distict as well (as it was before the refactor), but after the refactor we can do better:

  • We can avoid adding a grouping operator entirelly when the non-grouped satisfyingKeyedPartitioning.partitionKeys satisfies the required sort order.
  • Or if it doesn't, then we need to add a GroupPartitionsExec operator, but we can avoid coalescing partitions in the operator with setting applyPartialClustering.

val dfWithDuplicate = sql(s"SELECT id FROM testcat.ns.$items i ORDER BY id")

val expectedWithDuplicate = Seq(1, 2, 2, 3).map(Row(_))
checkAnswer(dfWithDuplicate, expectedWithDuplicate)
Copy link
Contributor Author

@peter-toth peter-toth Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the bug test as it returned Seq(1, 2, 2, 2, 2, 3) before the fix.

df -> Seq.empty,
reverseDf -> Seq(3),
dfWithDuplicate -> Seq.empty,
reverseDfWithDuplicate -> Seq(4)
Copy link
Contributor Author

@peter-toth peter-toth Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor improvement compared to pre-refactor. Although we need to add GroupPartitions to reorder the 4 partitions by their key in descending order, we don't need to coalesce them into 3.

@peter-toth
Copy link
Contributor Author

@cloud-fan, @dongjoon-hyun, @viirya, @szehon-ho, @chirag-s-db this is a follow-up PR to #54330.

@peter-toth peter-toth changed the title [SPARK-55535][SQL][FOLLOW-UP] Fix OrderedDistribution handling and other improvements to EnsureRequirements [SPARK-55535][SQL][FOLLOW-UP] Fix OrderedDistribution handling and minor improvements to EnsureRequirements Mar 10, 2026
// shuffles or group partitions
Seq(Row(null, 3), Row(10.0, 2), Row(15.5, null),
Row(15.5, 3), Row(40.0, 1), Row(41.0, 1)))
Row(15.5, 3), Row(40.0, 1), Row(41.0, 1)), 0)
Copy link
Contributor Author

@peter-toth peter-toth Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor improvement compared to pre-refactor.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs). Thank you, @peter-toth .

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One Minor Concern

applyPartialClustering=true is semantically overloaded. This flag was designed for the partial clustering join optimization, but here it's being used purely as a way to say "distribute one input split per output partition." The actual partial clustering join logic (deciding which side to replicate based on stats, join type checks, etc.) is completely irrelevant here — the flag just happens to switch alignToExpectedKeys into the "one split per task" branch instead of "all splits into one task."

This is correct but confusing. Someone reading GroupPartitionsExec(..., applyPartialClustering=true) in an ORDER BY context would reasonably wonder what partial clustering has to do with sorting. A cleaner fix might be a dedicated boolean like distributeInputPartitions, but that's a bigger change and the current approach works correctly.

A comment at the call site explaining why applyPartialClustering=true is used here would at minimum help, even if renaming the parameter is too big a change.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small, well-targeted fix. The correctness bug was real and the fix is correct. The main review note is the semantic overloading of applyPartialClustering.

@peter-toth
Copy link
Contributor Author

One Minor Concern

applyPartialClustering=true is semantically overloaded. This flag was designed for the partial clustering join optimization, but here it's being used purely as a way to say "distribute one input split per output partition." The actual partial clustering join logic (deciding which side to replicate based on stats, join type checks, etc.) is completely irrelevant here — the flag just happens to switch alignToExpectedKeys into the "one split per task" branch instead of "all splits into one task."

This is correct but confusing. Someone reading GroupPartitionsExec(..., applyPartialClustering=true) in an ORDER BY context would reasonably wonder what partial clustering has to do with sorting. A cleaner fix might be a dedicated boolean like distributeInputPartitions, but that's a bigger change and the current approach works correctly.

A comment at the call site explaining why applyPartialClustering=true is used here would at minimum help, even if renaming the parameter is too big a change.

Yeah, during the refactor I too was thinking about whether keeping the 2 flags (applyPartialClustering and replicatePartitions) in GroupPartitionsExec makes sense, because one flag would be enough to select from the 2 modes of GroupPartitionsExec to allign partitions to expectedPartitionKeys.
And now introducing the 3rd one seemed like an overkill.
How about changing GroupPartitionsExec and keeping only 1 flag as groupPartitions (or maybe distributePartitions), which is generic enough to be used in different contexts.

@viirya
Copy link
Member

viirya commented Mar 10, 2026

Yeah, during the refactor I too was thinking about whether keeping the 2 flags (applyPartialClustering and replicatePartitions) in GroupPartitionsExec makes sense, because one flag would be enough to select from the 2 modes of GroupPartitionsExec to allign partitions to expectedPartitionKeys. And now introducing the 3rd one seemed like an overkill. How about changing GroupPartitionsExec and keeping only 1 flag as groupPartitions (or maybe distributePartitions), which is generic enough to be used in different contexts.

The suggestion of a single distributePartitions flag is cleaner and more honest about what's actually happening:

distributePartitions=false → put all splits for a key into every expected output task (group when numSplits=1, replicate when numSplits>1)
distributePartitions=true → spread splits one per output task

This also resolves the naming confusion — distributePartitions describes the mechanical behavior of alignToExpectedKeys without implying anything about joins or skew handling. It would read naturally in both the partial clustering context and the OrderedDistribution sorting context.

@peter-toth
Copy link
Contributor Author

Yeah, during the refactor I too was thinking about whether keeping the 2 flags (applyPartialClustering and replicatePartitions) in GroupPartitionsExec makes sense, because one flag would be enough to select from the 2 modes of GroupPartitionsExec to allign partitions to expectedPartitionKeys. And now introducing the 3rd one seemed like an overkill. How about changing GroupPartitionsExec and keeping only 1 flag as groupPartitions (or maybe distributePartitions), which is generic enough to be used in different contexts.

The suggestion of a single distributePartitions flag is cleaner and more honest about what's actually happening:

distributePartitions=false → put all splits for a key into every expected output task (group when numSplits=1, replicate when numSplits>1) distributePartitions=true → spread splits one per output task

This also resolves the naming confusion — distributePartitions describes the mechanical behavior of alignToExpectedKeys without implying anything about joins or skew handling. It would read naturally in both the partial clustering context and the OrderedDistribution sorting context.

fad74ff does the rename.

I added 2 more commits:

  • afe32cc to remove unnecessary KeyedPartitioning.equals() and .hashCode() because partitionKeys is Seq[InternalRowComparableWrapper] after the refactor and
  • 286574c to rename KeyGroupedShuffleSpec to KeyedShuffleSpec to be in sync with KeyedPartitioning.

@peter-toth
Copy link
Contributor Author

Thank you @dongjoon-hyun and @viirya for the review.

Merged to master (4.2.0).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants