Skip to content

Commit

Permalink
Remove MemoryManager
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 16, 2022
1 parent e07add8 commit 718a94b
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 134 deletions.
31 changes: 18 additions & 13 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry, MemoryManager};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
Expand All @@ -99,6 +99,7 @@ use url::Url;

use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_manager::MemoryPool;
use uuid::Uuid;

use super::options::{
Expand Down Expand Up @@ -1961,9 +1962,9 @@ impl TaskContext {
self.task_id.clone()
}

/// Return the [`MemoryManager`] associated with this [TaskContext]
pub fn memory_manager(&self) -> &Arc<MemoryManager> {
&self.runtime.memory_manager
/// Return the [`MemoryPool`] associated with this [TaskContext]
pub fn memory_pool(&self) -> &Arc<dyn MemoryPool> {
&self.runtime.memory_pool
}

/// Return the [RuntimeEnv] associated with this [TaskContext]
Expand Down Expand Up @@ -2031,6 +2032,7 @@ mod tests {
use super::*;
use crate::assert_batches_eq;
use crate::execution::context::QueryPlanner;
use crate::execution::memory_manager::TrackedAllocation;
use crate::execution::runtime_env::RuntimeConfig;
use crate::physical_plan::expressions::AvgAccumulator;
use crate::test;
Expand All @@ -2056,20 +2058,23 @@ mod tests {
let ctx1 = SessionContext::new();

// configure with same memory / disk manager
let memory_manager = ctx1.runtime_env().memory_manager.clone();
let memory_pool = ctx1.runtime_env().memory_pool.clone();

let mut allocation = TrackedAllocation::new(&memory_pool, "test".to_string());
allocation.grow(100);

let disk_manager = ctx1.runtime_env().disk_manager.clone();

let ctx2 =
SessionContext::with_config_rt(SessionConfig::new(), ctx1.runtime_env());

assert!(std::ptr::eq(
Arc::as_ptr(&memory_manager),
Arc::as_ptr(&ctx1.runtime_env().memory_manager)
));
assert!(std::ptr::eq(
Arc::as_ptr(&memory_manager),
Arc::as_ptr(&ctx2.runtime_env().memory_manager)
));
assert_eq!(ctx1.runtime_env().memory_pool.allocated(), 100);
assert_eq!(ctx2.runtime_env().memory_pool.allocated(), 100);

drop(allocation);

assert_eq!(ctx1.runtime_env().memory_pool.allocated(), 0);
assert_eq!(ctx2.runtime_env().memory_pool.allocated(), 0);

assert!(std::ptr::eq(
Arc::as_ptr(&disk_manager),
Expand Down
77 changes: 23 additions & 54 deletions datafusion/core/src/execution/memory_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,45 +50,6 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
fn allocated(&self) -> usize;
}

/// A cooperative MemoryManager which tracks memory in a cooperative fashion.
/// `ExecutionPlan` nodes such as `SortExec`, which require large amounts of memory
/// register their memory requests with the MemoryManager which then tracks the total
/// memory that has been allocated across all such nodes.
///
/// The associated [`MemoryPool`] determines how to respond to memory allocation
/// requests, and any associated fairness control
#[derive(Debug)]
pub struct MemoryManager {
pool: Arc<dyn MemoryPool>,
}

impl MemoryManager {
/// Create new memory manager based on the configuration
pub fn new(pool: Arc<dyn MemoryPool>) -> Self {
Self { pool }
}

/// Returns the number of allocated bytes
///
/// Note: this can exceed the pool size as a result of [`MemoryManager::allocate`]
pub fn allocated(&self) -> usize {
self.pool.allocated()
}

/// Returns a new empty allocation identified by `name`
pub fn new_allocation(&self, name: String) -> TrackedAllocation {
self.new_allocation_with_options(AllocationOptions::new(name))
}

/// Returns a new empty allocation with the provided [`AllocationOptions`]
pub fn new_allocation_with_options(
&self,
options: AllocationOptions,
) -> TrackedAllocation {
TrackedAllocation::new_empty(options, Arc::clone(&self.pool))
}
}

/// Options associated with a [`TrackedAllocation`]
#[derive(Debug)]
pub struct AllocationOptions {
Expand Down Expand Up @@ -131,12 +92,21 @@ pub struct TrackedAllocation {
}

impl TrackedAllocation {
fn new_empty(options: AllocationOptions, policy: Arc<dyn MemoryPool>) -> Self {
policy.allocate(&options);
/// Create a new [`TrackedAllocation`] in the provided [`MemoryPool`]
pub fn new(pool: &Arc<dyn MemoryPool>, name: String) -> Self {
Self::new_with_options(pool, AllocationOptions::new(name))
}

/// Create a new [`TrackedAllocation`] in the provided [`MemoryPool`]
pub fn new_with_options(
pool: &Arc<dyn MemoryPool>,
options: AllocationOptions,
) -> Self {
pool.allocate(&options);
Self {
options,
size: 0,
policy,
policy: Arc::clone(pool),
}
}

Expand Down Expand Up @@ -231,31 +201,30 @@ mod tests {

#[test]
fn test_memory_manager_underflow() {
let policy = Arc::new(GreedyMemoryPool::new(50));
let manager = MemoryManager::new(policy);
let mut a1 = manager.new_allocation("a1".to_string());
assert_eq!(manager.allocated(), 0);
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut a1 = TrackedAllocation::new(&pool, "a1".to_string());
assert_eq!(pool.allocated(), 0);

a1.grow(100);
assert_eq!(manager.allocated(), 100);
assert_eq!(pool.allocated(), 100);

assert_eq!(a1.free(), 100);
assert_eq!(manager.allocated(), 0);
assert_eq!(pool.allocated(), 0);

a1.try_grow(100).unwrap_err();
assert_eq!(manager.allocated(), 0);
assert_eq!(pool.allocated(), 0);

a1.try_grow(30).unwrap();
assert_eq!(manager.allocated(), 30);
assert_eq!(pool.allocated(), 30);

let mut a2 = manager.new_allocation("a2".to_string());
let mut a2 = TrackedAllocation::new(&pool, "a2".to_string());
a2.try_grow(25).unwrap_err();
assert_eq!(manager.allocated(), 30);
assert_eq!(pool.allocated(), 30);

drop(a1);
assert_eq!(manager.allocated(), 0);
assert_eq!(pool.allocated(), 0);

a2.try_grow(25).unwrap();
assert_eq!(manager.allocated(), 25);
assert_eq!(pool.allocated(), 25);
}
}
28 changes: 14 additions & 14 deletions datafusion/core/src/execution/memory_manager/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::memory_manager::{AllocationOptions, MemoryPool};
use crate::execution::TrackedAllocation;
use crate::execution::memory_manager::{
AllocationOptions, MemoryPool, TrackedAllocation,
};
use datafusion_common::{DataFusionError, Result};
use parking_lot::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -220,24 +221,23 @@ fn insufficient_capacity_err(
mod tests {
use super::*;
use crate::execution::memory_manager::AllocationOptions;
use crate::execution::MemoryManager;
use std::sync::Arc;

#[test]
fn test_fair() {
let manager = MemoryManager::new(Arc::new(FairSpillPool::new(100)));
let pool = Arc::new(FairSpillPool::new(100)) as _;

let mut a1 = manager.new_allocation("unspillable".to_string());
let mut a1 = TrackedAllocation::new(&pool, "unspillable".to_string());
// Can grow beyond capacity of pool
a1.grow(2000);
assert_eq!(manager.allocated(), 2000);
assert_eq!(pool.allocated(), 2000);

let options = AllocationOptions::new("s1".to_string()).with_can_spill(true);
let mut a2 = manager.new_allocation_with_options(options);
let mut a2 = TrackedAllocation::new_with_options(&pool, options);
// Can grow beyond capacity of pool
a2.grow(2000);

assert_eq!(manager.allocated(), 4000);
assert_eq!(pool.allocated(), 4000);

let err = a2.try_grow(1).unwrap_err().to_string();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for s1 with 2000 bytes already allocated - maximum available is 0");
Expand All @@ -248,23 +248,23 @@ mod tests {
a1.shrink(1990);
a2.shrink(2000);

assert_eq!(manager.allocated(), 10);
assert_eq!(pool.allocated(), 10);

a1.try_grow(10).unwrap();
assert_eq!(manager.allocated(), 20);
assert_eq!(pool.allocated(), 20);

// Can grow a2 to 80 as only spilling consumer
a2.try_grow(80).unwrap();
assert_eq!(manager.allocated(), 100);
assert_eq!(pool.allocated(), 100);

a2.shrink(70);

assert_eq!(a1.size(), 20);
assert_eq!(a2.size(), 10);
assert_eq!(manager.allocated(), 30);
assert_eq!(pool.allocated(), 30);

let options = AllocationOptions::new("s2".to_string()).with_can_spill(true);
let mut a3 = manager.new_allocation_with_options(options);
let mut a3 = TrackedAllocation::new_with_options(&pool, options);

let err = a3.try_grow(70).unwrap_err().to_string();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for s2 with 0 bytes already allocated - maximum available is 40");
Expand All @@ -276,7 +276,7 @@ mod tests {

// But dropping a2 does
drop(a2);
assert_eq!(manager.allocated(), 20);
assert_eq!(pool.allocated(), 20);
a3.try_grow(80).unwrap();
}
}
1 change: 0 additions & 1 deletion datafusion/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@ pub mod registry;
pub mod runtime_env;

pub use disk_manager::DiskManager;
pub use memory_manager::{human_readable_size, MemoryManager, TrackedAllocation};
pub use registry::FunctionRegistry;
7 changes: 3 additions & 4 deletions datafusion/core/src/execution/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::datasource::object_store::ObjectStoreRegistry;
use crate::execution::memory_manager::{
GreedyMemoryPool, MemoryPool, UnboundedMemoryPool,
};
use crate::execution::MemoryManager;
use datafusion_common::DataFusionError;
use object_store::ObjectStore;
use std::fmt::{Debug, Formatter};
Expand All @@ -42,7 +41,7 @@ use url::Url;
/// Execution runtime environment.
pub struct RuntimeEnv {
/// Runtime memory management
pub memory_manager: Arc<MemoryManager>,
pub memory_pool: Arc<dyn MemoryPool>,
/// Manage temporary files during query execution
pub disk_manager: Arc<DiskManager>,
/// Object Store Registry
Expand All @@ -67,11 +66,11 @@ impl RuntimeEnv {
table_factories,
} = config;

let pool =
let memory_pool =
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));

Ok(Self {
memory_manager: Arc::new(MemoryManager::new(pool)),
memory_pool,
disk_manager: DiskManager::try_new(disk_manager)?,
object_store_registry,
table_factories,
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_plan/aggregates/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ impl GroupedHashAggregateStream {

timer.done();

let allocation = context
.memory_manager()
.new_allocation(format!("GroupedHashAggregateStream[{}]", partition));
let allocation = TrackedAllocation::new(
context.memory_pool(),
format!("GroupedHashAggregateStream[{}]", partition),
);

let inner = GroupedHashAggregateStreamInner {
schema: Arc::clone(&schema),
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_plan/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ impl AggregateStream {
let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
let accumulators = create_accumulators(&aggr_expr)?;

let allocation = context
.memory_manager()
.new_allocation(format!("AggregateStream[{}]", partition));
let allocation = TrackedAllocation::new(
context.memory_pool(),
format!("AggregateStream[{}]", partition),
);

let inner = AggregateStreamInner {
schema: Arc::clone(&schema),
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ impl GroupedHashAggregateStreamV2 {
let aggr_schema = aggr_state_schema(&aggr_expr)?;

let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
let allocation = context
.memory_manager()
.new_allocation(format!("GroupedHashAggregateStreamV2[{}]", partition));
let allocation = TrackedAllocation::new(
context.memory_pool(),
format!("GroupedHashAggregateStreamV2[{}]", partition),
);

let aggr_state = AggregationState {
allocation,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl ExecutionPlan for ExplainExec {

let metrics = ExecutionPlanMetricsSet::new();
let tracking_metrics =
MemTrackingMetrics::new(&metrics, context.memory_manager(), partition);
MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);

debug!(
"Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_plan/metrics/composite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Metrics common for complex operators with multiple steps.

use crate::execution::MemoryManager;
use crate::execution::memory_manager::MemoryPool;
use crate::physical_plan::metrics::tracker::MemTrackingMetrics;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time,
Expand Down Expand Up @@ -69,18 +69,18 @@ impl CompositeMetricsSet {
pub fn new_intermediate_tracking(
&self,
partition: usize,
memory_manager: &MemoryManager,
pool: &Arc<dyn MemoryPool>,
) -> MemTrackingMetrics {
MemTrackingMetrics::new(&self.mid, memory_manager, partition)
MemTrackingMetrics::new(&self.mid, pool, partition)
}

/// create a new final memory tracking metrics
pub fn new_final_tracking(
&self,
partition: usize,
memory_manager: &MemoryManager,
pool: &Arc<dyn MemoryPool>,
) -> MemTrackingMetrics {
MemTrackingMetrics::new(&self.final_, memory_manager, partition)
MemTrackingMetrics::new(&self.final_, pool, partition)
}

fn merge_compute_time(&self, dest: &Time) {
Expand Down
Loading

0 comments on commit 718a94b

Please sign in to comment.