Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 30 additions & 91 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::vec;

Expand Down Expand Up @@ -155,22 +154,27 @@ type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeB
/// Output channel with its associated memory reservation and spill writer.
///
/// `coalescer` is `None` for preserve-order mode, where downstream
/// [`StreamingMergeBuilder`] performs the batching; otherwise it's a
/// [`SharedCoalescer`] cloned from the per-partition one held by
/// [`PartitionChannels`].
/// [`StreamingMergeBuilder`] performs batching. Otherwise each producer
/// channel owns a local [`LimitedBatchCoalescer`].
struct OutputChannel {
sender: DistributionSender<MaybeBatch>,
reservation: SharedMemoryReservation,
spill_writer: SpillPoolWriter,
shared_coalescer: Option<SharedCoalescer>,
coalescer: Option<LimitedBatchCoalescer>,
}

impl OutputChannel {
fn coalesce(&mut self, batch: RecordBatch) -> Result<Vec<RecordBatch>> {
match &self.shared_coalescer {
Some(shared) => Ok(shared.push_and_drain(batch)?),
None => Ok(vec![batch]),
let Some(coalescer) = self.coalescer.as_mut() else {
return Ok(vec![batch]);
};

coalescer.push_batch(batch)?;
let mut output = Vec::new();
while let Some(batch) = coalescer.next_completed_batch() {
output.push(batch);
}
Ok(output)
}

/// Send a single batch through the channel for `partition`, applying
Expand All @@ -181,8 +185,6 @@ impl OutputChannel {
async fn send(&mut self, batch: RecordBatch) -> Result<(), SendError<MaybeBatch>> {
let size = batch.get_array_memory_size();

// Decide the payload outside of any await: never hold a MutexGuard
// across an await point.
let (payload, is_memory_batch) = {
match self.reservation.try_grow(size) {
Ok(_) => (Ok(RepartitionBatch::Memory(batch)), true),
Expand All @@ -201,10 +203,17 @@ impl OutputChannel {
}

async fn finalize(mut self) -> Result<()> {
let Some(shared) = self.shared_coalescer.take() else {
let Some(mut coalescer) = self.coalescer.take() else {
return Ok(());
};
for batch in shared.finalize()? {

let mut batches = Vec::new();
coalescer.finish()?;
while let Some(batch) = coalescer.next_completed_batch() {
batches.push(batch);
}

for batch in batches {
// If this errored, it means that nobody is listening on the other side, which is fine
// and can happen in certain cases, like when a LIMIT drops the stream that listens.
let _ = self.send(batch).await;
Expand All @@ -213,63 +222,6 @@ impl OutputChannel {
}
}

/// A producer-side coalescer shared across all input tasks targeting a
/// single output partition.
///
/// Bundles the [`LimitedBatchCoalescer`] (behind a [`Mutex`]) with the
/// active-sender counter that tracks how many input tasks may still push
/// into it. The last task to call [`Self::finalize`] is the one that
/// finalizes the coalescer and ships the residual batch.
///
/// Cheap to [`Clone`]: both fields are [`Arc`]s.
#[derive(Clone)]
struct SharedCoalescer {
inner: Arc<Mutex<LimitedBatchCoalescer>>,
active_senders: Arc<AtomicUsize>,
}

impl SharedCoalescer {
fn new(schema: SchemaRef, target_batch_size: usize, num_senders: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(LimitedBatchCoalescer::new(
schema,
target_batch_size,
None,
))),
active_senders: Arc::new(AtomicUsize::new(num_senders)),
}
}

/// Push `batch` into the coalescer and drain any newly completed
/// batches. The mutex is held only briefly.
fn push_and_drain(&self, batch: RecordBatch) -> Result<Vec<RecordBatch>> {
let mut acc = Vec::new();
let mut c = self.inner.lock();
c.push_batch(batch)?;
while let Some(b) = c.next_completed_batch() {
acc.push(b);
}
Ok(acc)
}

/// Decrement the active-senders counter. If this caller was the last
/// sender, finalize the coalescer and return its residual batches; if
/// other senders are still active, return `Ok(None)`.
fn finalize(&self) -> Result<Vec<RecordBatch>> {
let was_last = self.active_senders.fetch_sub(1, Ordering::AcqRel) == 1;
if !was_last {
return Ok(vec![]);
}
let mut acc = Vec::new();
let mut c = self.inner.lock();
c.finish()?;
while let Some(b) = c.next_completed_batch() {
acc.push(b);
}
Ok(acc)
}
}

/// Channels and resources for a single output partition.
///
/// Each output partition has channels to receive data from all input partitions.
Expand Down Expand Up @@ -298,10 +250,6 @@ struct PartitionChannels {
rx: InputPartitionsToCurrentPartitionReceiver,
/// Memory reservation for this output partition
reservation: SharedMemoryReservation,
/// Shared coalescer used by all input tasks targeting this output
/// partition. `None` in preserve-order mode (downstream
/// `StreamingMergeBuilder` handles batching).
shared_coalescer: Option<SharedCoalescer>,
/// Spill writers for writing spilled data.
/// SpillPoolWriter is Clone, so multiple writers can share state in non-preserve-order mode.
spill_writers: Vec<SpillPoolWriter>,
Expand Down Expand Up @@ -471,18 +419,6 @@ impl RepartitionExecState {
.map(|_| spill_pool::channel(max_file_size, Arc::clone(&spill_manager)))
.unzip();

// Coalesce on the producer side, before the channel's gate, so
// the consumer never sees the per-input-task small batches.
// Skip in preserve-order mode: each input has its own dedicated
// channel and `StreamingMergeBuilder` handles batching.
let shared_coalescer = (!preserve_order).then(|| {
SharedCoalescer::new(
input.schema(),
context.session_config().batch_size(),
num_input_partitions,
)
});

channels.insert(
partition,
PartitionChannels {
Expand All @@ -491,7 +427,6 @@ impl RepartitionExecState {
reservation,
spill_readers,
spill_writers,
shared_coalescer,
},
);
}
Expand All @@ -514,7 +449,13 @@ impl RepartitionExecState {
reservation: Arc::clone(&channels.reservation),
spill_writer: channels.spill_writers[spill_writer_idx]
.clone(),
shared_coalescer: channels.shared_coalescer.clone(),
coalescer: (!preserve_order).then(|| {
LimitedBatchCoalescer::new(
input.schema(),
context.session_config().batch_size(),
None,
)
}),
},
)
})
Expand Down Expand Up @@ -971,7 +912,7 @@ impl BatchPartitioner {
/// batches adhere to the configured `datafusion.execution.batch_size` for efficient operations,
/// and for that, it will automatically coalesce batches right after repartitioning.
///
/// For this, one shared [`LimitedBatchCoalescer`] per output partition is used:
/// For this, each producer channel uses a local [`LimitedBatchCoalescer`]:
///
/// ```text
/// ┌───┐ ┌───┐
Expand Down Expand Up @@ -1732,9 +1673,7 @@ impl RepartitionExec {
}

// End of input for this task. For each output partition we still
// have a channel to, decrement the active-senders counter; whoever
// sees the count drop to zero is the last input task and must
// finalize the shared coalescer and ship its residual.
// have a channel to, finalize the local coalescer and ship its residual.
for (_, output_channel) in output_channels.drain() {
output_channel.finalize().await?;
}
Expand Down
Loading