Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify MemoryManager #4522

Merged
merged 12 commits into from
Dec 19, 2022
Merged

Simplify MemoryManager #4522

merged 12 commits into from
Dec 19, 2022

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Dec 5, 2022

Which issue does this PR close?

Closes #2829.

Rationale for this change

The existing memory manager interface is quite tricky to use, and has resulted in the proliferation of "utility" wrappers to make it easier to work with. It also mixes the concept of limiting memory, with "memory scheduling" where it tries to distribute memory "fairly". In practice I'm not sure this would have worked, and would likely have blocked tokio worker threads #4325.

This PR proposes a drastic simplification of MemoryManager

What changes are included in this PR?

There are a couple of major changes worth highlighting:

  • MemoryManager will no longer block waiting for capacity
  • MemoryManager "fairness" can be configured based on the provided MemoryPool
  • ExternalSorter will refuse to sort batches that are larger than it can allocate from the memory pool

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Dec 5, 2022
let batch =
RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();
stagger_batch_with_seed(batch, 42)
let max_batch = 1024;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, slicing batches causes the memory limiter to fail as it doesn't consider array slicing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe stagger batches also adds zero length record batches, which was an important edge case to cover as I recall

@@ -91,6 +98,10 @@ impl BatchBuilder {

let num_entries = rng.gen_range(1024..8192);
for i in 0..num_entries {
if self.is_finished() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early return to make tests run faster

schema: SchemaRef,
in_mem_batches: Mutex<Vec<BatchWithSortArray>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Mutex aren't actually needed, took the opportunity to just remove them


impl Drop for MemTrackingMetrics {
fn drop(&mut self) {
self.metrics.try_done();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant as the Drop of BaselineMetrics already calls this

impl Drop for MemTrackingMetrics {
fn drop(&mut self) {
self.metrics.try_done();
if self.mem_used() != 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed as TrackedAllocation does it automatically

/// allocations while processing data.
///
/// This consumer will NEVER spill.
pub struct MemoryConsumerProxy {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is entirely replaced by TrackedAllocation

@tustvold tustvold requested a review from alamb December 5, 2022 22:02
Copy link
Contributor

@crepererum crepererum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that any form of fairness or distribution shall be built either into the consumer itself (if it is capable of doing this) or into the scheduler (which is some future work). Simplification looks good.

@alamb
Copy link
Contributor

alamb commented Dec 6, 2022

cc @yjshen and @richox (Blaze-rs team) who I think contributed some/all of the original implementation; Also cc @milenkovicm who I think has expressed some interest in memory management before

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold -- I agree I found the existing manager allocation algorithm a bit of a mismatch for DataFusion's needs and this rework matches it better. I would very much like to hear @yjshen's opinion of this change prior to merging.

I think the major downside to this new allocation approach is that that some operators that don't benefit from more memory after a point (e.g. Sort) because they will spill anyways will now consume all of the available memory in the plan at the expense of upstream operators. This may cause a plan with multiple such operators to fail (as the downstream operator will not be able to allocate any memory as it has all been given to the upstream operator). I think there are different ways around this problem, such as adjust the spilling operator strategy so that once it spills it can give back some of the memory it requested, etc.

This situation doesn't really affect the "error if memory budget is exceeded" usecase I do think it affects what happens when operators begin to spill.

If we can't reach consensus on the particular strategy, perhaps we can follow the traditional DataFusion approach and provide an extension point -- namely make MemoryManager a trait and provide a basic implementation that can be customized by other systems if need be.

I wonder if this closes either #4328 or #4325

Also possible related is @richox 's report that the memory manager divvys up memory like #2829 which may or may not be relevant after this PR

datafusion/core/src/execution/memory_manager/mod.rs Outdated Show resolved Hide resolved
datafusion/core/src/execution/memory_manager/mod.rs Outdated Show resolved Hide resolved
datafusion/core/src/execution/memory_manager/mod.rs Outdated Show resolved Hide resolved
datafusion/core/src/execution/memory_manager/mod.rs Outdated Show resolved Hide resolved
datafusion/core/src/execution/memory_manager/mod.rs Outdated Show resolved Hide resolved
datafusion/core/tests/memory_limit.rs Show resolved Hide resolved
let batch =
RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();
stagger_batch_with_seed(batch, 42)
let max_batch = 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe stagger batches also adds zero length record batches, which was an important edge case to cover as I recall

@alamb
Copy link
Contributor

alamb commented Dec 7, 2022

In the interests of moving this PR along, I have posted to the mailing list https://lists.apache.org/thread/plrq40ldy4y9l6bj3157bk54dc029b3w and slack trying to solicit comments

@alamb
Copy link
Contributor

alamb commented Dec 11, 2022

I propose if we don't get any comments in another 2 days, we merge this PR as is. We can address any comments / other features as follow ons.

Please comment here if you need additional time to review or have other commentary

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tustvold working on this new version of the memory manager!

Yes, you are correct that the current manager contains both memory usage accounting and memory scheduling (fairness schedule as its current state). And its complexity comes with two folds:

  1. how to grant memory to a set of memory consumers fairly(consumer id for identifying different consumers)
  2. how to differentiate consumers requiring more memory during execution (evenly divide available memory of them all) between the ones that take an in-neglectable amount of memory which would return later.

And yes, fair scheduling is only for the simplicity of implementation, to avoid starvation of parallel executed partitions or pipelines and lots of spaces or strategies for more sophisticated scheduling.

Currently, the existing memory manager works well in the Blaze workload which we use DataFusion to run several Spark tasks in parallel inside each Spark Executor. And memory manager controls both Sort and Shuffle. No #4325 is witnessed. Perhaps @richox could correct me or provide more details on developing Blaze and operating it in Kuaishou in production since I left in May.

I would suggest we start the design of memory scheduling and pooling mechanisms and think about how these interact with the new manager and yield a complete plan for the new memory management. I am concerned that after we consider scheduling, we'll have to add code with similar functionality.

I'm also fine with merging the PR as it is if scheduling or pooling is planned and would work shortly. A short time of functionality regression @alamb pointed out doesn't hurt much.

@tustvold
Copy link
Contributor Author

tustvold commented Dec 13, 2022

I'll add a policy component to this prior to merge in that case, thank you for taking the time to review

@alamb alamb marked this pull request as draft December 15, 2022 20:12
@alamb
Copy link
Contributor

alamb commented Dec 15, 2022

Marking as a draft until the policy is added, so this PR is removed from my review queue

@tustvold tustvold marked this pull request as ready for review December 16, 2022 18:07
@tustvold
Copy link
Contributor Author

I've added a MemoryPool abstraction allowing customisation of the allocation strategy PTAL

@@ -142,8 +126,10 @@ impl Default for RuntimeEnv {
pub struct RuntimeConfig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR, but the proliferation of *Config structs is a pet peeve of mine that I am working to fix under #4617 and #4349


#[derive(Debug, Clone)]
/// Configuration information for memory management
pub enum MemoryManagerConfig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bit of a perverse API imo, I'm looking into properly addressing the *Config profileration in #4349 but I figured I might as well clean this up whilst I was here.

If people want to reuse the same pool, they can just use the same Arc<dyn MemoryPool>

}
/// Options associated with a [`TrackedAllocation`]
#[derive(Debug)]
pub struct AllocationOptions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is designed to be an extension point, so we can potentially add more allocation options and correspondingly more sophisticated MemoryPool.

/// │ z z │
/// └───────────────────────z──────────────────────z───────────────┘
///
/// Unspillable memory is allocated in a first-come, first-serve fashion
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is equivalent to the old behaviour

Arc::as_ptr(&memory_manager),
Arc::as_ptr(&ctx2.runtime_env().memory_manager)
));
assert_eq!(ctx1.runtime_env().memory_pool.allocated(), 100);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test had to be rewitten as pointer equality of fat pointers such as &dyn is funky

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold -- I really like where this is going. I had a suggestion about the API design (MemoryPool::register()) rather than creating TrackedAllocations directly.

I also think we could merge this PR as is if you don't want to make any more changes -- I could try to update the API as part of a follow on PR.


/// A [`MemoryPool`] that enforces no limit
#[derive(Debug, Default)]
pub struct UnboundedMemoryPool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very nice

}

impl MemoryPool for FairSpillPool {
fn allocate(&self, options: &AllocationOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time convincing myself this would work correctly if there are multiple SortExecs in the plan such that one would completely run before the other (e.g. as happens when there are multiple window functions with different partition / order by clauses)

I believe it will be fine because each ExternalSorter creates the potentially spilling allocation allocation upon construction, though I am still not 100% sure.

What would you think about moving the construction of TrackedAllocation into MemoryPool, something like

/// Create a new tracked allocation with the MemoryManager. Subsequent requests
/// can grow or shrink the memory allocated to this allocation.
///
/// The allocation is automatically deregistered on drop
fn register(self: &Arc<Self>, options: AllocationOptions) -> TrackedAllocation;

That way the intent would be clearer that all ExternalSort instances register themselves with the MemoryManager on creation, and part of registering would increase the num_spill count.

I think having the MemoryPool create TrackedAllocations makes more logical sense as they are so tightly bound and you need a MemoryPool for a tracked allocation anyways

Copy link
Contributor Author

@tustvold tustvold Dec 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it will be fine because each ExternalSorter creates the potentially spilling allocation allocation upon construction, though I am still not 100% sure.

Now that ExecutionPlan is sync I don't think this is an issue, the streams are all created prior to execution starting. Yes theoretically you could lazily construct the TrackedAllocation, I can't think of why you would structure your code in this way. Regardless this issue existed before the changes in this PR, it is just easier to see

That way the intent would be clearer that all ExternalSort instances register themselves with the MemoryManager on creation, and part of registering would increase the num_spill count.

Perhaps I'm being stupid but I don't see a tangible difference between TrackedAllocation::new(pool, options) and pool.register(options), you're just reordering the parameters? More specifically in order for register to be implemented, you still need a public TrackedAllocation::new so you're just adding an indirection?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- I agree keeping TrackedAllocation::new makes sense.

I guess I was getting confused by the fact that allocate doesn't actually allocate any memory, but instead it really registers a TrackedAllocation with the memory manager. Likewise free doesn't free any memory

Could we somehow make the relationship between MemoryManager, AllocationOptions and TrackedAllocation clearer

What about changing some names:

MemoryManager::allocate --> MemoryManager::register
MemoryManager::free --> MemoryManager::deregister

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, instead of AllocationOption what do you think about the name AllocationId or Allocator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed TrackedAllocation to MemoryReservation and AllocationOptions to MemoryConsumer. I also tweaked the registration API to fit with this. PTAL

@tustvold
Copy link
Contributor Author

tustvold commented Dec 17, 2022

I had a suggestion about the API design (MemoryPool::register()) rather than creating TrackedAllocations directly.

I think having the MemoryPool create TrackedAllocations makes more logical sense as they are so tightly bound and you need a MemoryPool for a tracked allocation anyways

I originally had this prior to 718a94b, however, it results in passing round this MemoryManager struct that has no state of its own which seemed a bit pointless. In order for MemoryPool to be able to construct a TrackedAllocation the construction methods on TrackedAllocation need to be public regardless, so that they can be used downstream, it seemed simpler just to not bother?

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tustvold! I really like the new design of pools with different allocation policies, and moving spill() out of memory management simplifies the APIs a lot.

datafusion/core/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
datafusion/core/src/execution/runtime_env.rs Outdated Show resolved Hide resolved

/// Return the total amount of memory allocated
fn allocated(&self) -> usize;
}

/// Options associated with a [`TrackedAllocation`]
/// A memory consumer that can be tracked by [`MemoryReservation`] in a [`MemoryPool`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +85 to +86
/// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice interface

@alamb alamb merged commit dba34fc into apache:master Dec 19, 2022
@alamb
Copy link
Contributor

alamb commented Dec 19, 2022

Thanks again @tustvold

@ursabot
Copy link

ursabot commented Dec 19, 2022

Benchmark runs are scheduled for baseline = 30de028 and contender = dba34fc. dba34fc is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

false => {
let available = self
.pool_size
.saturating_sub(state.unspillable + state.unspillable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sub state.unspillable twice here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, @tustvold ?

Copy link
Contributor Author

@tustvold tustvold Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a mistake, good spot

Edit: fix in #5160

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory manager triggers unnecessary spills
6 participants