Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 60 additions & 42 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ use datafusion_common::stats::Precision;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::utils::transpose;
use datafusion_common::{
ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err,
internal_datafusion_err, internal_err,
ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, internal_err,
};
use datafusion_common::{Result, not_impl_err};
use datafusion_common_runtime::SpawnedTask;
Expand Down Expand Up @@ -681,46 +680,8 @@ impl BatchPartitioner {
// Finished building index-arrays for output partitions
timer.done();

// Borrowing partitioner timer to prevent moving `self` to closure
let partitioner_timer = &self.timer;

let mut partitioned_batches = vec![];
for (partition, p_indices) in indices.iter_mut().enumerate() {
if !p_indices.is_empty() {
let taken_indices = std::mem::take(p_indices);
let indices_array: PrimitiveArray<UInt32Type> =
taken_indices.into();

// Tracking time required for repartitioned batches construction
let _timer = partitioner_timer.timer();

// Produce batches based on indices
let columns =
take_arrays(batch.columns(), &indices_array, None)?;

let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(indices_array.len()));
let batch = RecordBatch::try_new_with_options(
batch.schema(),
columns,
&options,
)
.unwrap();

partitioned_batches.push(Ok((partition, batch)));

// Return the taken vec
let (_, buffer, _) = indices_array.into_parts();
let mut vec =
buffer.into_inner().into_vec::<u32>().map_err(|e| {
internal_datafusion_err!(
"Could not convert buffer to vec: {e:?}"
)
})?;
vec.clear();
*p_indices = vec;
}
}
let partitioned_batches =
Self::partition_grouped_take(&batch, indices, &self.timer)?;

Box::new(partitioned_batches.into_iter())
}
Expand All @@ -736,6 +697,63 @@ impl BatchPartitioner {
BatchPartitionerState::Hash { indices, .. } => indices.len(),
}
}

/// Build repartitioned hash output batches using one `take` per input batch.
///
/// The hash router first fills one index vector per output partition. This method
/// concatenates those index vectors, performs one grouped `take_arrays`, and
/// then returns each output partition as a slice of the reordered batch.
///
/// For example, given partition indices:
///
/// ```text
/// partition 0: [2, 5]
/// partition 1: []
/// partition 2: [0, 3, 4]
/// ```
///
/// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns
/// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`.
fn partition_grouped_take(
batch: &RecordBatch,
indices: &mut [Vec<u32>],
timer: &metrics::Time,
) -> Result<Vec<Result<(usize, RecordBatch)>>> {
let mut partition_ranges = Vec::with_capacity(indices.len());
let mut reordered_indices = Vec::with_capacity(batch.num_rows());

for (partition, p_indices) in indices.iter_mut().enumerate() {
if p_indices.is_empty() {
continue;
}

let start = reordered_indices.len();
reordered_indices.extend_from_slice(p_indices);
partition_ranges.push((partition, start, p_indices.len()));
p_indices.clear();
}

if reordered_indices.is_empty() {
return Ok(vec![]);
}

let indices_array: PrimitiveArray<UInt32Type> = reordered_indices.into();
let reordered_batch = {
let _timer = timer.timer();
let columns = take_arrays(batch.columns(), &indices_array, None)?;

let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(indices_array.len()));
RecordBatch::try_new_with_options(batch.schema(), columns, &options).unwrap()
};

Ok(partition_ranges
.into_iter()
.map(|(partition, start, len)| {
Ok((partition, reordered_batch.slice(start, len)))
})
.collect())
}
}

/// Maps `N` input partitions to `M` output partitions based on a
Expand Down
Loading