diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 590f6f09e8b9e..56315340ec98a 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -33,10 +33,13 @@ use datafusion_common::{Result, plan_err}; use datafusion_execution::memory_pool::MemoryReservation; use futures::{StreamExt, TryStreamExt}; -use parking_lot::Mutex; /// [`MemoryReservation`] used across query execution streams -pub(crate) type SharedMemoryReservation = Arc>; +/// +/// `MemoryReservation` is internally thread-safe (uses `AtomicUsize` for size +/// tracking, and the underlying `MemoryPool` implementations handle their own +/// locking), so no external `Mutex` is needed. +pub(crate) type SharedMemoryReservation = Arc; /// Create a vector of record batches from a stream pub async fn collect(stream: SendableRecordBatchStream) -> Result> { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index dbfdf94426782..11e036434ee97 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -82,7 +82,6 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequiremen use datafusion_common::hash_utils::RandomState; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::{Stream, StreamExt, ready}; -use parking_lot::Mutex; const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; @@ -549,12 +548,12 @@ impl ExecutionPlan for SymmetricHashJoinExec { let enforce_batch_size_in_joins = context.session_config().enforce_batch_size_in_joins(); - let reservation = Arc::new(Mutex::new( + let reservation = Arc::new( MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]")) .register(context.memory_pool()), - )); + ); if let Some(g) = graph.as_ref() { - reservation.lock().try_grow(g.size())?; + reservation.try_grow(g.size())?; } if enforce_batch_size_in_joins { @@ -1745,7 +1744,7 @@ impl SymmetricHashJoinStream { let result = combine_two_batches(&self.schema, equal_result, anti_result)?; let capacity = self.size(); self.metrics.stream_memory_usage.set(capacity); - self.reservation.lock().try_resize(capacity)?; + self.reservation.try_resize(capacity)?; Ok(result) } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d4406360504f9..cb58ec5dd1cd3 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -322,11 +322,11 @@ impl RepartitionExecState { let mut channels = HashMap::with_capacity(txs.len()); for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { - let reservation = Arc::new(Mutex::new( + let reservation = Arc::new( MemoryConsumer::new(format!("{name}[{partition}]")) .with_can_spill(true) .register(context.memory_pool()), - )); + ); // Create spill channels based on mode: // - preserve_order: one spill channel per (input, output) pair for proper FIFO ordering @@ -1401,7 +1401,7 @@ impl RepartitionExec { // if there is still a receiver, send to it if let Some(channel) = output_channels.get_mut(&partition) { let (batch_to_send, is_memory_batch) = - match channel.reservation.lock().try_grow(size) { + match channel.reservation.try_grow(size) { Ok(_) => { // Memory available - send in-memory batch (RepartitionBatch::Memory(batch), true) @@ -1419,7 +1419,7 @@ impl RepartitionExec { // If the other end has hung up, it was an early shutdown (e.g. LIMIT) // Only shrink memory if it was a memory batch if is_memory_batch { - channel.reservation.lock().shrink(size); + channel.reservation.shrink(size); } output_channels.remove(&partition); } @@ -1639,7 +1639,6 @@ impl PerPartitionStream { Ok(RepartitionBatch::Memory(batch)) => { // Release memory and return batch self.reservation - .lock() .shrink(batch.get_array_memory_size()); return Poll::Ready(Some(Ok(batch))); }