[SPARK-55848][SQL] Fix incorrect dedup results with SPJ partial clustering#54679
[SPARK-55848][SQL] Fix incorrect dedup results with SPJ partial clustering#54679naveenp2708 wants to merge 4 commits intoapache:masterfrom
Conversation
|
@peter-toth @szehon-ho @cloud-fan @chirag-s-db This PR addresses the correctness issue discussed in #54378. Test cases included as requested by @peter-toth. |
|
Can you please revert formatting changes? Now this PR shows |
…ering When SPJ partial clustering splits a partition across multiple tasks, post-join dedup operators (dropDuplicates, Window row_number) produce incorrect results because KeyGroupedPartitioning.satisfies0() incorrectly reports satisfaction of ClusteredDistribution via super.satisfies0() short-circuiting the isPartiallyClustered guard. This fix adds an isPartiallyClustered flag to KeyGroupedPartitioning and restructures satisfies0() to check ClusteredDistribution first, returning false when partially clustered. EnsureRequirements then inserts the necessary Exchange. Plain SPJ joins without dedup are unaffected. Closes apache#54378
5368d8f to
4f9752e
Compare
|
@peter-toth Done — rebased and cleaned up the formatting changes. The diff should now show only the actual fix (+192 -35). |
Thank you. Let me review this later today or tomorrow morning. |
peter-toth
left a comment
There was a problem hiding this comment.
Can you please add new test case similar to the existing SPARK-53322: checkpointed scans avoid shuffles for aggregates, because after this fix there should be a shuffle added when a checkpointed KeyGroupedPartitioning partitioning is isPartiallyClustered, but we have a node above the checkpoint that requires ClusteredDistribution.
| } | ||
| } | ||
|
|
||
| test("[SPARK-54378] dropDuplicates after SPJ with partial clustering should give correct " + |
There was a problem hiding this comment.
Can you please start the new test names with SPARK-55848:
|
|
||
| Seq(true, false).foreach { partiallyClustered => | ||
| withSQLConf( | ||
| SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> false.toString, |
There was a problem hiding this comment.
Do we need to turn REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION off for this test?
V2_BUCKETING_PUSH_PART_VALUES_ENABLED is enabled by default so we can omit it.
| |FROM testcat.ns.$items i | ||
| |JOIN testcat.ns.$purchases p ON i.id = p.item_id | ||
| |""".stripMargin) | ||
| checkAnswer(df, Seq(Row(1), Row(2), Row(3))) |
There was a problem hiding this comment.
Can you please check the presence of shuffles and the number of partitons of scans are the expected?
| } | ||
| } | ||
|
|
||
| test("[SPARK-54378] Window dedup after SPJ with partial clustering should give correct " + |
| } | ||
| } | ||
|
|
||
| test("SPARK-55848: dropDuplicates after SPJ with partial clustering should produce " + |
There was a problem hiding this comment.
Is this test different to the first one?
| // requires all rows with the same key to be co-located in a single task. Without this | ||
| // guard, downstream operators such as dropDuplicates or Window functions would skip | ||
| // their required shuffle and produce incorrect results. | ||
| // See SPARK-54378 / SPARK-55848. |
|
@naveenp2708, I've merged #54330 to |
|
Thanks for the thorough review @peter-toth! I'll address all feedback and split into two PRs: PR 1 (targeting master):
PR 2 (targeting branch-4.1):
Closing this PR and opening the new ones shortly. |
What changes were proposed in this pull request?
This PR fixes a data correctness bug where SPJ with partial clustering produces incorrect results for post-join dedup operations (dropDuplicates and Window-based row_number dedup).
The root cause:
KeyGroupedPartitioning.satisfies0()delegates tosuper.satisfies0()(fromHashPartitioningLike), which also matchesClusteredDistributionand returnstrue— short-circuiting theisPartiallyClusteredguard. This meansEnsureRequirementsnever inserts an Exchange before downstream dedup operators when partial clustering is active.The fix adds an
isPartiallyClusteredflag toKeyGroupedPartitioningand restructuressatisfies0()to checkClusteredDistributionfirst, returningfalsewhen partially clustered.EnsureRequirementsthen automatically inserts the necessary Exchange.Why are the changes needed?
Without this fix, any Spark user running SPJ with partial clustering enabled who applies
dropDuplicates()or Window-based dedup on the join output gets silently inflated results. This affects production pipelines using Iceberg, Delta Lake, or any DataSource V2 connector with bucketed tables.Does this PR introduce any user-facing change?
Yes. Queries using SPJ with partial clustering followed by dedup operations will now return correct results. An additional shuffle may be introduced for these specific query patterns. Queries without post-join dedup are unaffected.
How was this patch tested?
KeyGroupedPartitioningSuite: SPARK-54378 dropDuplicates, SPARK-54378 Window dedup, SPARK-55848 dropDuplicatesKeyGroupedPartitioningSuitepassEnsureRequirementsSuitepassDistributionAndOrderingSuitepassWas this patch authored or co-authored using generative AI tooling?
No.