From 3bf05f866d238766674ec0a99980635c48e38c6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 16 Apr 2026 16:39:28 +0200 Subject: [PATCH] Remove redundant Mutex from SharedMemoryReservation `MemoryReservation` is already internally thread-safe: its `size` field uses `AtomicUsize`, and the underlying `MemoryPool` implementations (`GreedyMemoryPool`, `FairSpillPool`, `UnboundedMemoryPool`) handle their own locking. The outer `Mutex` wrapper in `SharedMemoryReservation` was therefore redundant and caused unnecessary lock contention. In the repartition operator, this lock was acquired on every batch by every input partition (sender) plus the output partition (receiver), making it a serialization bottleneck that reduced CPU utilization during parallel query execution. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/common.rs | 7 +++++-- .../physical-plan/src/joins/symmetric_hash_join.rs | 9 ++++----- datafusion/physical-plan/src/repartition/mod.rs | 9 ++++----- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 590f6f09e8b9e..56315340ec98a 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -33,10 +33,13 @@ use datafusion_common::{Result, plan_err}; use datafusion_execution::memory_pool::MemoryReservation; use futures::{StreamExt, TryStreamExt}; -use parking_lot::Mutex; /// [`MemoryReservation`] used across query execution streams -pub(crate) type SharedMemoryReservation = Arc>; +/// +/// `MemoryReservation` is internally thread-safe (uses `AtomicUsize` for size +/// tracking, and the underlying `MemoryPool` implementations handle their own +/// locking), so no external `Mutex` is needed. +pub(crate) type SharedMemoryReservation = Arc; /// Create a vector of record batches from a stream pub async fn collect(stream: SendableRecordBatchStream) -> Result> { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index dbfdf94426782..11e036434ee97 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -82,7 +82,6 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequiremen use datafusion_common::hash_utils::RandomState; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::{Stream, StreamExt, ready}; -use parking_lot::Mutex; const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; @@ -549,12 +548,12 @@ impl ExecutionPlan for SymmetricHashJoinExec { let enforce_batch_size_in_joins = context.session_config().enforce_batch_size_in_joins(); - let reservation = Arc::new(Mutex::new( + let reservation = Arc::new( MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]")) .register(context.memory_pool()), - )); + ); if let Some(g) = graph.as_ref() { - reservation.lock().try_grow(g.size())?; + reservation.try_grow(g.size())?; } if enforce_batch_size_in_joins { @@ -1745,7 +1744,7 @@ impl SymmetricHashJoinStream { let result = combine_two_batches(&self.schema, equal_result, anti_result)?; let capacity = self.size(); self.metrics.stream_memory_usage.set(capacity); - self.reservation.lock().try_resize(capacity)?; + self.reservation.try_resize(capacity)?; Ok(result) } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d4406360504f9..cb58ec5dd1cd3 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -322,11 +322,11 @@ impl RepartitionExecState { let mut channels = HashMap::with_capacity(txs.len()); for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { - let reservation = Arc::new(Mutex::new( + let reservation = Arc::new( MemoryConsumer::new(format!("{name}[{partition}]")) .with_can_spill(true) .register(context.memory_pool()), - )); + ); // Create spill channels based on mode: // - preserve_order: one spill channel per (input, output) pair for proper FIFO ordering @@ -1401,7 +1401,7 @@ impl RepartitionExec { // if there is still a receiver, send to it if let Some(channel) = output_channels.get_mut(&partition) { let (batch_to_send, is_memory_batch) = - match channel.reservation.lock().try_grow(size) { + match channel.reservation.try_grow(size) { Ok(_) => { // Memory available - send in-memory batch (RepartitionBatch::Memory(batch), true) @@ -1419,7 +1419,7 @@ impl RepartitionExec { // If the other end has hung up, it was an early shutdown (e.g. LIMIT) // Only shrink memory if it was a memory batch if is_memory_batch { - channel.reservation.lock().shrink(size); + channel.reservation.shrink(size); } output_channels.remove(&partition); } @@ -1639,7 +1639,6 @@ impl PerPartitionStream { Ok(RepartitionBatch::Memory(batch)) => { // Release memory and return batch self.reservation - .lock() .shrink(batch.get_array_memory_size()); return Poll::Ready(Some(Ok(batch))); }