Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,9 +909,23 @@ fn add_hash_on_top(
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.prefer_existing_sort`).
let partitioning = dist.create_partitioning(n_target);
let repartition =
let mut repartition =
RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
.with_preserve_order();

// Skip backpressure when the input is a partial aggregate.
// Partial aggregation already manages its own memory pressure via
// early emission, so the repartition gate would only block the
// partial aggregate from flushing its buffered groups.
if input
.plan
.as_any()
.downcast_ref::<AggregateExec>()
.is_some_and(|agg| agg.mode() == &AggregateMode::Partial)
{
repartition = repartition.with_skip_backpressure();
}

let plan = Arc::new(repartition) as _;

return Ok(DistributionContext::new(plan, true, vec![input]));
Expand Down
33 changes: 32 additions & 1 deletion datafusion/physical-plan/src/repartition/distributor_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,33 @@ use parking_lot::Mutex;
/// Create `n` empty channels.
pub fn channels<T>(
n: usize,
) -> (Vec<DistributionSender<T>>, Vec<DistributionReceiver<T>>) {
channels_inner(n, false)
}

/// Create `n` empty channels without backpressure.
///
/// Unlike [`channels`], the gate will never close, so senders will never block.
/// This is useful when the input already manages its own memory pressure
/// (e.g. partial aggregation with early emission).
pub fn channels_without_backpressure<T>(
n: usize,
) -> (Vec<DistributionSender<T>>, Vec<DistributionReceiver<T>>) {
channels_inner(n, true)
}

fn channels_inner<T>(
n: usize,
skip_backpressure: bool,
) -> (Vec<DistributionSender<T>>, Vec<DistributionReceiver<T>>) {
let channels = (0..n)
.map(|id| Arc::new(Channel::new_with_one_sender(id)))
.collect::<Vec<_>>();
// When skipping backpressure, set empty_channels high enough that the gate
// never closes (it closes when empty_channels reaches 0).
let initial_empty = if skip_backpressure { usize::MAX } else { n };
let gate = Arc::new(Gate {
empty_channels: AtomicUsize::new(n),
empty_channels: AtomicUsize::new(initial_empty),
send_wakers: Mutex::new(None),
});
let senders = channels
Expand Down Expand Up @@ -92,6 +113,16 @@ pub fn partition_aware_channels<T>(
(0..n_in).map(|_| channels(n_out)).unzip()
}

/// Like [`partition_aware_channels`] but without backpressure.
pub fn partition_aware_channels_without_backpressure<T>(
n_in: usize,
n_out: usize,
) -> (PartitionAwareSenders<T>, PartitionAwareReceivers<T>) {
(0..n_in)
.map(|_| channels_without_backpressure(n_out))
.unzip()
}

/// Erroring during [send](DistributionSender::send).
///
/// This occurs when the [receiver](DistributionReceiver) is gone.
Expand Down
48 changes: 43 additions & 5 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ use parking_lot::Mutex;

mod distributor_channels;
use distributor_channels::{
DistributionReceiver, DistributionSender, channels, partition_aware_channels,
DistributionReceiver, DistributionSender, channels,
channels_without_backpressure, partition_aware_channels,
partition_aware_channels_without_backpressure,
};

/// A batch in the repartition queue - either in memory or spilled to disk.
Expand Down Expand Up @@ -269,6 +271,7 @@ impl RepartitionExecState {
metrics: &ExecutionPlanMetricsSet,
partitioning: &Partitioning,
preserve_order: bool,
skip_backpressure: bool,
name: &str,
context: &Arc<TaskContext>,
spill_manager: SpillManager,
Expand Down Expand Up @@ -302,15 +305,25 @@ impl RepartitionExecState {
let (txs, rxs) = if preserve_order {
// Create partition-aware channels with one channel per (input, output) pair
// This provides backpressure while maintaining proper ordering
let (txs_all, rxs_all) =
partition_aware_channels(num_input_partitions, num_output_partitions);
let (txs_all, rxs_all) = if skip_backpressure {
partition_aware_channels_without_backpressure(
num_input_partitions,
num_output_partitions,
)
} else {
partition_aware_channels(num_input_partitions, num_output_partitions)
};
// Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition
let txs = transpose(txs_all);
let rxs = transpose(rxs_all);
(txs, rxs)
} else {
// Create one channel per *output* partition with backpressure
let (txs, rxs) = channels(num_output_partitions);
// Create one channel per *output* partition
let (txs, rxs) = if skip_backpressure {
channels_without_backpressure(num_output_partitions)
} else {
channels(num_output_partitions)
};
// Clone sender for each input partitions
let txs = txs
.into_iter()
Expand Down Expand Up @@ -766,6 +779,10 @@ pub struct RepartitionExec {
/// Boolean flag to decide whether to preserve ordering. If true means
/// `SortPreservingRepartitionExec`, false means `RepartitionExec`.
preserve_order: bool,
/// When true, the distribution channels will not apply backpressure.
/// This is useful when the input already manages its own memory pressure
/// (e.g. partial aggregation with early emission).
skip_backpressure: bool,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: Arc<PlanProperties>,
}
Expand Down Expand Up @@ -942,6 +959,9 @@ impl ExecutionPlan for RepartitionExec {
if self.preserve_order {
repartition = repartition.with_preserve_order();
}
if self.skip_backpressure {
repartition = repartition.with_skip_backpressure();
}
Ok(Arc::new(repartition))
}

Expand Down Expand Up @@ -970,6 +990,7 @@ impl ExecutionPlan for RepartitionExec {
let partitioning = self.partitioning().clone();
let metrics = self.metrics.clone();
let preserve_order = self.sort_exprs().is_some();
let skip_backpressure = self.skip_backpressure;
let name = self.name().to_owned();
let schema = self.schema();
let schema_captured = Arc::clone(&schema);
Expand Down Expand Up @@ -1005,6 +1026,7 @@ impl ExecutionPlan for RepartitionExec {
&metrics,
&partitioning,
preserve_order,
skip_backpressure,
&name,
&context,
spill_manager.clone(),
Expand Down Expand Up @@ -1222,6 +1244,9 @@ impl ExecutionPlan for RepartitionExec {
if self.preserve_order {
new_repartition = new_repartition.with_preserve_order();
}
if self.skip_backpressure {
new_repartition = new_repartition.with_skip_backpressure();
}
Ok(Arc::new(new_repartition) as Arc<dyn ExecutionPlan>)
})
}
Expand All @@ -1243,6 +1268,7 @@ impl ExecutionPlan for RepartitionExec {
state: Arc::clone(&self.state),
metrics: self.metrics.clone(),
preserve_order: self.preserve_order,
skip_backpressure: self.skip_backpressure,
cache: new_properties.into(),
})))
}
Expand All @@ -1263,6 +1289,7 @@ impl RepartitionExec {
state: Default::default(),
metrics: ExecutionPlanMetricsSet::new(),
preserve_order,
skip_backpressure: false,
cache: Arc::new(cache),
})
}
Expand Down Expand Up @@ -1328,6 +1355,17 @@ impl RepartitionExec {
self
}

/// Skip backpressure on the distribution channels.
///
/// When the input already manages its own memory pressure (e.g. partial
/// aggregation that emits early under memory pressure), the repartition
/// gate-based backpressure is unnecessary and can actually hurt performance
/// by preventing the input from flushing its buffered groups.
pub fn with_skip_backpressure(mut self) -> Self {
self.skip_backpressure = true;
self
}

/// Return the sort expressions that are used to merge
fn sort_exprs(&self) -> Option<&LexOrdering> {
if self.preserve_order {
Expand Down
Loading