Skip to content

Downstream consumers of AggregateExec significantly overcount memory usage #22757

@Samyak2

Description

@Samyak2

Describe the bug

We have seen this internally with HashJoinExec and RepartitionExec. The repartition case is a bit hard to reproduce, since it depends on how many batches are being buffered. So I will be using HashJoinExec as the example. I'm suspecting any operator that holds a sequence of RecordBatches in memory has this problem.

Consider a tree like this:

HashJoinExec
  AggregateExec: mode=Final
    CoalescePartitionsExec
      AggregateExec: mode=Partial
        ...
  ... (probe side)
  • This is AggregateExec on the build-side of a hash join.
  • The hash join buffers all the batches from agg in a Vec<RecordBatch> in memory. Ref.
  • The memory accounting is done by adding get_record_batch_memory_size of each batch separately.
    • This helper de-duplicates the same buffer being referenced multiple times within the same record batch.
  • Now see how AggregateExec produces output: ref
    • The agg itself produces a huge record batch, which is then sliced into batches of 8192 (default batch_size) rows and sent downstream.
    • All of these slices reference the same underlying huge buffer!
    • So even if join is de-duplicating buffers within a record batch, it's still counting the same buffer for every emitted batch.
    • The memory is being overcounted by num_output_rows / batch_size times! This can be a huge multiplier for large aggs.

To Reproduce

I have a reproducer test here: Samyak2#2

peak_mem_used for final aggregate is ~104MB in the test. But hash join fails even with 3GB memory limit!

We can see the test failing even with memory limit set to 30x (!) of the aggregate peak mem. I can understand it being 2x of that - the memory counting might duplicate once in join and once in agg, but 30x does not make sense. Ideally, this should be close to 1x since we're not really allocating more memory.

Expected behavior

The query in the above test should pass with at most 2x the size of the aggregate peak mem.

Additional context

This is not just hash join, any operator that can buffer data coming from an agg will show this behavior:

  • HashJoinExec: buffers all batches coming on the build side
  • NestedLoopJoinExec
  • CrossJoinExec
  • SortMergeJoinExec: although it won't buffer many records
  • RepartitionExec: can buffer an unbounded number of batches in some cases. See: RepartitionExec channels grow unboundedly with one slow consumer #22090
  • SortExec: buffers batches in-mem
  • TopK: buffers heap size number of batches
  • SortPreservingMergeExec

There may be more I'm missing.

For the fix, I'm proposing a new helper alongside get_record_batch_memory_size that is stateful. It keeps track of buffer pointers that were previously seen (in either a HashSet or a HashMap) and de-duplicates across batches using the state. We have been using a version of this successfully internally.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions