From fb56dfecfe23f3c0d3409b33fd751e1f4ae1b37c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 22 Nov 2025 11:00:49 +0100 Subject: [PATCH 1/4] Avoid repartition skew --- .../partition_statistics.rs | 4 +- .../physical-plan/src/repartition/mod.rs | 47 +++++++++++++------ 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 49dc5b845605..7034b71fd500 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -878,9 +878,9 @@ mod test { partition_row_counts.push(total_rows); } assert_eq!(partition_row_counts.len(), 3); - assert_eq!(partition_row_counts[0], 2); + assert_eq!(partition_row_counts[0], 1); assert_eq!(partition_row_counts[1], 2); - assert_eq!(partition_row_counts[2], 0); + assert_eq!(partition_row_counts[2], 1); Ok(()) } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 6249a0e9b856..fa07c5b4c719 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -382,6 +382,9 @@ impl RepartitionExecState { txs, partitioning.clone(), metrics, + // preser_order dep + if preserve_order { 0 } else { i }, + num_input_partitions, )); // In a separate task, wait for each input to be done @@ -428,12 +431,22 @@ impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] /// /// The time spent repartitioning will be recorded to `timer` - pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result { + pub fn try_new( + partitioning: Partitioning, + timer: metrics::Time, + input_partition: usize, + num_input_partitions: usize, + ) -> Result { let state = match partitioning { Partitioning::RoundRobinBatch(num_partitions) => { BatchPartitionerState::RoundRobin { num_partitions, - next_idx: 0, + // Distribute starting index evenly based on input partition, number of input partitions and number of partitions + // to avoid they all start at partition 0 and heavily skew on the lower partitions + next_idx: (input_partition as f64 + * (num_partitions as f64 / num_input_partitions as f64)) + as usize + % num_partitions, } } Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash { @@ -1196,9 +1209,15 @@ impl RepartitionExec { mut output_channels: HashMap, partitioning: Partitioning, metrics: RepartitionMetrics, + input_partition: usize, + num_input_partitions: usize, ) -> Result<()> { - let mut partitioner = - BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; + let mut partitioner = BatchPartitioner::try_new( + partitioning, + metrics.repartition_time.clone(), + input_partition, + num_input_partitions, + )?; // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions(); @@ -1862,16 +1881,16 @@ mod tests { // output stream 1 should *not* error and have one of the input batches let batches = crate::common::collect(output_stream1).await.unwrap(); - assert_snapshot!(batches_to_sort_string(&batches), @r#" - +------------------+ - | my_awesome_field | - +------------------+ - | baz | - | frob | - | gaz | - | grob | - +------------------+ - "#); + assert_snapshot!(batches_to_sort_string(&batches), @r" + +------------------+ + | my_awesome_field | + +------------------+ + | baz | + | frob | + | gar | + | goo | + +------------------+ + "); } #[tokio::test] From 616588713cc496f0847aa67836e41dc699c4ea76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 22 Nov 2025 11:03:24 +0100 Subject: [PATCH 2/4] comment --- datafusion/physical-plan/src/repartition/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index fa07c5b4c719..0886eef3b370 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -382,7 +382,7 @@ impl RepartitionExecState { txs, partitioning.clone(), metrics, - // preser_order dep + // preserve_order depends on input partition to start from 0 if preserve_order { 0 } else { i }, num_input_partitions, )); From 0cc5ba2179168e0fdaa4c8ac0656af843295c892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 22 Nov 2025 11:08:40 +0100 Subject: [PATCH 3/4] Simplify --- datafusion/physical-plan/src/repartition/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 0886eef3b370..5017f607fad2 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -443,9 +443,7 @@ impl BatchPartitioner { num_partitions, // Distribute starting index evenly based on input partition, number of input partitions and number of partitions // to avoid they all start at partition 0 and heavily skew on the lower partitions - next_idx: (input_partition as f64 - * (num_partitions as f64 / num_input_partitions as f64)) - as usize + next_idx: ((input_partition * num_partitions) / num_input_partitions) % num_partitions, } } From 7923e9c97c7e2aa86af4bd4ba06fb57932c5b020 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 22 Nov 2025 14:14:41 +0100 Subject: [PATCH 4/4] Simplify --- datafusion/physical-plan/src/repartition/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 5017f607fad2..bb5bd0ab565c 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -382,7 +382,7 @@ impl RepartitionExecState { txs, partitioning.clone(), metrics, - // preserve_order depends on input partition to start from 0 + // preserve_order depends on partition index to start from 0 if preserve_order { 0 } else { i }, num_input_partitions, )); @@ -443,8 +443,7 @@ impl BatchPartitioner { num_partitions, // Distribute starting index evenly based on input partition, number of input partitions and number of partitions // to avoid they all start at partition 0 and heavily skew on the lower partitions - next_idx: ((input_partition * num_partitions) / num_input_partitions) - % num_partitions, + next_idx: ((input_partition * num_partitions) / num_input_partitions), } } Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash {