Skip to content

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Oct 6, 2025

Which issue does this PR close?

Drafted/proposed solution for #16904

Rationale for this change

Various changes to make the OOM error messages more readable.
If we agree with the basic approach, then I'll breakup this draft into smaller PRs for code review.

What changes are included in this PR?

General changes, NOT having to do with the OOM error stack:

Changes for the OOM consumer stack:

  • add lineage information to each MemoryConsumer (such that we can later on build traces): 987192c
  • new ReportedConsumer which represents a snapshot: 0bc7630
    • reduce lock holding, such that we can use this snapshot in other ways too (maybe realtime tracking?)
  • new ConsumerStackTrace: ab62fcb

Example usage:

  • use the consumer parent/child relationship in ParquetWriter: 3bc6820
  • see the changes in the ParquetWriter OOM error messages, when we enable for TrackConsumersPool::report_top: c7e869f

Are these changes tested?

Yes.

Are there any user-facing changes?

Only nicer error messages.

@github-actions github-actions bot added core Core DataFusion crate execution Related to the execution crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Oct 6, 2025
@wiedld wiedld force-pushed the 16904/fine-grain-usage branch from d703964 to cf6f807 Compare October 6, 2025 19:58
@wiedld wiedld changed the title 16904/fine grain usage Add trace of consumers to OOM error messages Oct 6, 2025
@wiedld wiedld force-pushed the 16904/fine-grain-usage branch from cf6f807 to c7e869f Compare October 6, 2025 20:14
Comment on lines +396 to +427
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=1)) with top memory consumers (across reservations) as:
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
,
ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
,
ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
,
ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
,
ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
.
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=1)) with x KB already allocated for this reservation - x KB remain available for the total pool",
Copy link
Contributor Author

@wiedld wiedld Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of using the parent/child relationship to build a trace of consumers.

Currently, this approach is limited to the current way that memory reservations work. Meaning, the parent's bytes (consumed & peak) do NOT include cummulative from all the children. If this is desired, we can make this change at report generation time using the snapshot ReportedConsumer (to not hold the lock).

@wiedld wiedld marked this pull request as ready for review October 6, 2025 21:19
@2010YOUY01
Copy link
Contributor

Thank you for this awesome work!

Making memory consumers hierarchical makes sense to me. There is one consideration: the existing memory pool strategies (e.g. FairSpillingPool) was implemented assuming one operator instance (xxStream in impl) has one-to-one mapping to one MemoryConsumer, and it's splitting the available memory evenly to available MemoryConsumers. If we want to make it hierarchical, I think we have to update memory pools -- for instance in FairSpillingPool, split the memory evenly according to top-level consumers, and ignore the child consumers.

Regarding the error message format, IIUC now it's displaying the lineage of the consumer that triggered OOM, for instance OOM error happened in ConsumerX, the error message will look like

OperatorX - 100M
ParentOfOperatorX - 200M
(root)GrandParentOfOpertorX - 500M

I think this lineage information might not be the most straightforward for troubleshooting. How about just printing the whole picture instead? It would be showing top memory consumers, along with all its child consumers recursively down to the leaf.

Also, here is a related issue, I think we want to make sure the changes are compatible (otherwise choose one to proceed): #17901

@comphead
Copy link
Contributor

comphead commented Oct 7, 2025

@andygrove FYI

@comphead
Copy link
Contributor

comphead commented Oct 7, 2025

Thanks @wiedld my question similar to @2010YOUY01 how to use the lineage, it sounds very interesting

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wiedld

pub fn new_with_parent(name: impl Into<String>, parent_id: usize) -> Self {
Self {
name: name.into(),
can_spill: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be always false? 🤔

Copy link
Contributor Author

@wiedld wiedld Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's meant to be an alternative to MemoryConsumer::new, and it's expected that the caller would switch from:

MemoryConsumer::new("foo").with_can_spill(true)

To have lineage with:

MemoryConsumer::new_wth_parent("foo", parent_id).with_can_spill(true)

Although, this does point out that the added MemoryReservation::new_child_reservation should provide a spill config option. It's currently:

/// Create a new [`MemoryReservation`] with a new [`MemoryConsumer] that
/// is a child of this reservation's consumer.
///
/// This is useful for creating memory consumers with lineage tracking.
pub fn new_child_reservation(&self, name: impl Into<String>) -> MemoryReservation {
MemoryConsumer::new_with_parent(name, self.consumer().id())
.register(&self.registration.pool)
}

I'll go add that now. Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added: a77e316

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than add a bunch of new APIs for creating new with parent id, how about just adding a method like

fn with_parent_id(mut self, parent_id: ..) -> Self { 
..
}

That mirrors the other fields?

That might also make the defaults less confusing

@wiedld
Copy link
Contributor Author

wiedld commented Oct 7, 2025

Thanks @wiedld my question similar to @2010YOUY01 how to use the lineage, it sounds very interesting

The lineage is created by using child reservations. I made an example commit here, using the parallel parquet writing:
3bc6820

That is why in the following commit, when I switch to using the reservation stacktrace (based on lineage) for the OOM error reporting (a.k.a. TrackConsumersPool::report_top), we have the expanded OOM error trace for the parallel parquet writing:
c7e869f

If I started using the child reservations for other physical plan nodes, I would also expect to see a similar change in the OOM error messaging -- to include these traces.

@wiedld
Copy link
Contributor Author

wiedld commented Oct 7, 2025

I should also clarify. The request (in the issue) was to help better debug OOM ing, but also ways to see how the different memory consumers are related (at least in this example the lineage was apparent to me 🤷🏼 ).

I choose specific abstractions in hopes that we could repurpose the bits. We have a lineage of reservations, and a new snapshot construct (to perhaps grab snapshots of the locked state during a running query). I then created the stack trace, and used it for the OOM error message, as one specific example. Although we could also use it for other memory analysis/visualization tooling. 🤔

@wiedld
Copy link
Contributor Author

wiedld commented Oct 7, 2025

I think this lineage information might not be the most straightforward for troubleshooting. How about just printing the whole picture instead? It would be showing top memory consumers, along with all its child consumers recursively down to the leaf.

@2010YOUY01 - Should I make this change to the OOM messages? Or should we use the snapshots & lineage trees with another debug tool which shows all consumers?

There is one consideration: the existing memory pool strategies (e.g. FairSpillingPool)

The TrackConsumersPool wraps around other pools, and keeps track of successful requests to change reservation sizing. As long as the size requested matches the size allotted/changed (on success), then I believe the TrackConsumersPool would reflect the reservation state? I'm not familiar with the FairSpillingPool but I think the actual allotment is the requested amount? (If it passes the fairness test.) Although I may be missing something.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks like a nice change -- thank you @wiedld

@2010YOUY01 and @rluvaton perhaps you might also be interested in this one

I left a few API comments, but the overall idea of introducing parent ids to memory reservations seems like a great first step towards more fine grained control

let parallel_options_clone = parallel_options.clone();
let pool = Arc::clone(context.memory_pool());
// Create a reservation for the parallel parquet writing
let reservation = MemoryConsumer::new("ParquetSink(ParallelWriter)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a new child reservation of the reservation above ? Or maybe I am misreading the diff and it already is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the MemoryConsumer::new(format("ParquetSink[path={path}]")) on line 1273? That one is for the non-parallel writing.

Whereas this reservation here is the root of the parallel path writing. Perhaps I should change naming?

name: String,
can_spill: bool,
id: usize,
parent_id: Option<usize>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to expand out the memory consumer documentation to mention how the parent/child relationship was used

pub fn new_with_parent(name: impl Into<String>, parent_id: usize) -> Self {
Self {
name: name.into(),
can_spill: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than add a bunch of new APIs for creating new with parent id, how about just adding a method like

fn with_parent_id(mut self, parent_id: ..) -> Self { 
..
}

That mirrors the other fields?

That might also make the defaults less confusing

pub fn new_child_reservation(
&self,
name: impl Into<String>,
can_spill: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about always passing can_spill here -- I think we can use the existing with_can_spill API instead of forcing a bunch of bool parameters

///
/// This is useful for creating memory consumers with lineage tracking,
/// while dealing with multithreaded scenarios.
pub fn cloned_reservation(&self) -> MemoryReservation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is strange to me that clone_with_new_id doesn't also register the reservation 🤔

As now users have two APIs to think about and figure out which to use

Could we unify them somehow (e.g. maybe deprecate clone_with_new_id (as a follow on PR)?)

@wiedld wiedld marked this pull request as draft October 9, 2025 11:50
@wiedld
Copy link
Contributor Author

wiedld commented Oct 9, 2025

Converting to draft, since @alamb 's comments have me changing approach a bit. I'm mark ready-for-review later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate execution Related to the execution crate physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants