Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! help with allocation accounting.

use datafusion_common::{internal_err, Result};
use log::trace;
use std::hash::{Hash, Hasher};
use std::{cmp::Ordering, sync::atomic, sync::Arc};

Expand Down Expand Up @@ -380,6 +381,10 @@ impl MemoryReservation {
///
/// Panics if `capacity` exceeds [`Self::size`]
pub fn shrink(&mut self, capacity: usize) {
trace!(
"MemoryReservation[{}].shrink({capacity})",
self.consumer().name
);
let new_size = self.size.checked_sub(capacity).unwrap();
self.registration.pool.shrink(self, capacity);
self.size = new_size
Expand All @@ -393,8 +398,16 @@ impl MemoryReservation {
if let Some(new_size) = self.size.checked_sub(capacity) {
self.registration.pool.shrink(self, capacity);
self.size = new_size;
trace!(
"MemoryReservation[{}].try_shrink({capacity}) returning Ok({new_size})",
self.consumer().name
);
Ok(new_size)
} else {
trace!(
"MemoryReservation[{}].try_shrink({capacity}) returning Err",
self.consumer().name
);
internal_err!(
"Cannot free the capacity {capacity} out of allocated size {}",
self.size
Expand All @@ -404,6 +417,10 @@ impl MemoryReservation {

/// Sets the size of this reservation to `capacity`
pub fn resize(&mut self, capacity: usize) {
trace!(
"MemoryReservation[{}].resize({capacity})",
self.consumer().name
);
match capacity.cmp(&self.size) {
Ordering::Greater => self.grow(capacity - self.size),
Ordering::Less => self.shrink(self.size - capacity),
Expand All @@ -413,6 +430,10 @@ impl MemoryReservation {

/// Try to set the size of this reservation to `capacity`
pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
trace!(
"MemoryReservation[{}].try_resize({capacity})",
self.consumer().name
);
match capacity.cmp(&self.size) {
Ordering::Greater => self.try_grow(capacity - self.size)?,
Ordering::Less => self.shrink(self.size - capacity),
Expand All @@ -423,6 +444,10 @@ impl MemoryReservation {

/// Increase the size of this reservation by `capacity` bytes
pub fn grow(&mut self, capacity: usize) {
trace!(
"MemoryReservation[{}].grow({capacity})",
self.consumer().name
);
self.registration.pool.grow(self, capacity);
self.size += capacity;
}
Expand All @@ -431,6 +456,10 @@ impl MemoryReservation {
/// bytes, returning error if there is insufficient capacity left
/// in the pool.
pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
trace!(
"MemoryReservation[{}].try_grow({capacity})",
self.consumer().name
);
self.registration.pool.try_grow(self, capacity)?;
self.size += capacity;
Ok(())
Expand All @@ -447,6 +476,10 @@ impl MemoryReservation {
///
/// Panics if `capacity` exceeds [`Self::size`]
pub fn split(&mut self, capacity: usize) -> MemoryReservation {
trace!(
"MemoryReservation[{}].split({capacity})",
self.consumer().name
);
self.size = self.size.checked_sub(capacity).unwrap();
Self {
size: capacity,
Expand All @@ -456,6 +489,7 @@ impl MemoryReservation {

/// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
pub fn new_empty(&self) -> Self {
trace!("MemoryReservation[{}].new_empty()", self.consumer().name);
Self {
size: 0,
registration: Arc::clone(&self.registration),
Expand Down