Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ use datafusion_common::{Result, plan_err};
use datafusion_execution::memory_pool::MemoryReservation;

use futures::{StreamExt, TryStreamExt};
use parking_lot::Mutex;

/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
///
/// `MemoryReservation` is internally thread-safe (uses `AtomicUsize` for size
/// tracking, and the underlying `MemoryPool` implementations handle their own
/// locking), so no external `Mutex` is needed.
pub(crate) type SharedMemoryReservation = Arc<MemoryReservation>;

/// Create a vector of record batches from a stream
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
Expand Down
9 changes: 4 additions & 5 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequiremen
use datafusion_common::hash_utils::RandomState;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::{Stream, StreamExt, ready};
use parking_lot::Mutex;

const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;

Expand Down Expand Up @@ -549,12 +548,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
let enforce_batch_size_in_joins =
context.session_config().enforce_batch_size_in_joins();

let reservation = Arc::new(Mutex::new(
let reservation = Arc::new(
MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]"))
.register(context.memory_pool()),
));
);
if let Some(g) = graph.as_ref() {
reservation.lock().try_grow(g.size())?;
reservation.try_grow(g.size())?;
}

if enforce_batch_size_in_joins {
Expand Down Expand Up @@ -1745,7 +1744,7 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
let result = combine_two_batches(&self.schema, equal_result, anti_result)?;
let capacity = self.size();
self.metrics.stream_memory_usage.set(capacity);
self.reservation.lock().try_resize(capacity)?;
self.reservation.try_resize(capacity)?;
Ok(result)
}
}
Expand Down
9 changes: 4 additions & 5 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,11 @@ impl RepartitionExecState {

let mut channels = HashMap::with_capacity(txs.len());
for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() {
let reservation = Arc::new(Mutex::new(
let reservation = Arc::new(
MemoryConsumer::new(format!("{name}[{partition}]"))
.with_can_spill(true)
.register(context.memory_pool()),
));
);

// Create spill channels based on mode:
// - preserve_order: one spill channel per (input, output) pair for proper FIFO ordering
Expand Down Expand Up @@ -1401,7 +1401,7 @@ impl RepartitionExec {
// if there is still a receiver, send to it
if let Some(channel) = output_channels.get_mut(&partition) {
let (batch_to_send, is_memory_batch) =
match channel.reservation.lock().try_grow(size) {
match channel.reservation.try_grow(size) {
Ok(_) => {
// Memory available - send in-memory batch
(RepartitionBatch::Memory(batch), true)
Expand All @@ -1419,7 +1419,7 @@ impl RepartitionExec {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
// Only shrink memory if it was a memory batch
if is_memory_batch {
channel.reservation.lock().shrink(size);
channel.reservation.shrink(size);
}
output_channels.remove(&partition);
}
Expand Down Expand Up @@ -1639,7 +1639,6 @@ impl PerPartitionStream {
Ok(RepartitionBatch::Memory(batch)) => {
// Release memory and return batch
self.reservation
.lock()
.shrink(batch.get_array_memory_size());
return Poll::Ready(Some(Ok(batch)));
}
Expand Down
Loading