diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 49dc5b845605..7034b71fd500 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -878,9 +878,9 @@ mod test { partition_row_counts.push(total_rows); } assert_eq!(partition_row_counts.len(), 3); - assert_eq!(partition_row_counts[0], 2); + assert_eq!(partition_row_counts[0], 1); assert_eq!(partition_row_counts[1], 2); - assert_eq!(partition_row_counts[2], 0); + assert_eq!(partition_row_counts[2], 1); Ok(()) } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 6249a0e9b856..bb5bd0ab565c 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -382,6 +382,9 @@ impl RepartitionExecState { txs, partitioning.clone(), metrics, + // preserve_order depends on partition index to start from 0 + if preserve_order { 0 } else { i }, + num_input_partitions, )); // In a separate task, wait for each input to be done @@ -428,12 +431,19 @@ impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] /// /// The time spent repartitioning will be recorded to `timer` - pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result { + pub fn try_new( + partitioning: Partitioning, + timer: metrics::Time, + input_partition: usize, + num_input_partitions: usize, + ) -> Result { let state = match partitioning { Partitioning::RoundRobinBatch(num_partitions) => { BatchPartitionerState::RoundRobin { num_partitions, - next_idx: 0, + // Distribute starting index evenly based on input partition, number of input partitions and number of partitions + // to avoid they all start at partition 0 and heavily skew on the lower partitions + next_idx: ((input_partition * num_partitions) / num_input_partitions), } } Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash { @@ -1196,9 +1206,15 @@ impl RepartitionExec { mut output_channels: HashMap, partitioning: Partitioning, metrics: RepartitionMetrics, + input_partition: usize, + num_input_partitions: usize, ) -> Result<()> { - let mut partitioner = - BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; + let mut partitioner = BatchPartitioner::try_new( + partitioning, + metrics.repartition_time.clone(), + input_partition, + num_input_partitions, + )?; // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions(); @@ -1862,16 +1878,16 @@ mod tests { // output stream 1 should *not* error and have one of the input batches let batches = crate::common::collect(output_stream1).await.unwrap(); - assert_snapshot!(batches_to_sort_string(&batches), @r#" - +------------------+ - | my_awesome_field | - +------------------+ - | baz | - | frob | - | gaz | - | grob | - +------------------+ - "#); + assert_snapshot!(batches_to_sort_string(&batches), @r" + +------------------+ + | my_awesome_field | + +------------------+ + | baz | + | frob | + | gar | + | goo | + +------------------+ + "); } #[tokio::test]