Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Limited GroupBy (Externalized / Spill) #1570

Closed
3 of 4 tasks
Tracked by #587
alamb opened this issue Jan 15, 2022 · 23 comments · Fixed by #7400
Closed
3 of 4 tasks
Tracked by #587

Memory Limited GroupBy (Externalized / Spill) #1570

alamb opened this issue Jan 15, 2022 · 23 comments · Fixed by #7400

Comments

@alamb
Copy link
Contributor

alamb commented Jan 15, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Support Grouping "arbitrarily" large inputs (e.g. when the group by hash table doesn't fit in RAM or within some user defined budget)

This typically happens when there are a "large" number of distinct groups. For example, with queries like

-- Find the top ten users by event count
SELECT user_id, count(events) 
FROM users 
GROUP BY user_id 
ORDER BY count(events) DESC
limit 10

When there are large number of groups

This ticket concerns the memory used the HashAggregateExec operator -- it doesn't cover other potential targets (e.g. externalized sort or join). That will be covered by other tasks tracked by #587

Describe the solution you'd like

  1. Allow DataFusion users to specify a RAM budget (aka via the config introduced in Initial MemoryManager and DiskManager APIs for query execution + External Sort implementation #1526) and have their queries complete running without the group by exceeding the budget allocated to it via the memory manager.

For the HashAggregateExec operator, I think the best behavior would be:

  1. Use an in memory hash table (as is done today in HashAggregateExec ), if the memory budget allows
  2. If all the input does not fit in the RAM budget, the hash table and partial aggregates are sorted by group key and written to temporary disk files
  3. Temporary disk files are read / merged to produce the final results

Hopefully after #1568 is complete we'll have an efficient N-way merge that can be reused.

Some ideas of how to break this task down

Describe alternatives you've considered
TBD

Context
This is follow on work from the great PR from @yjshen in #1526 and part of the story of limiting memory used by DataFusion #587

@jhorstmann
Copy link
Contributor

For hash aggregation I quite like the implementation described in https://dl.acm.org/doi/pdf/10.1145/2588555.2610507 (4.4). The algorithm there optimizes for cache locality by pre-aggregating into fixed-size thread-local hash tables and then overflowing into main memory partitions, but I think it should be easy to extend with also flushing these partitions to disk if they get too large.

@milenkovicm
Copy link
Contributor

An idea to brainstorm this feature. master...milenkovicm:arrow-datafusion:spill_aggregation_to_mmap has a POC code which uses a memory mapped file as the aggregation store, which should spill to disc in case of memory pressure.

This solution is just a prototype, far from perfect, and good chance it is not fit for purpose.

The main change is that Vec<RowGroupState> has been replaced with Vec<u8>, which is then replaced with mmap file.

Problems with this solution:

  • assumption is that aggregation buffer size will not change. This assumption may go away but working with underlying state will get way more complicated, as we will need to implement buffer reallocation, eviction ...
  • we don't have control when the data is going to be evicted from memory, we can use lock and unlock on mmap to try to keep it in memory but not sure if that would be the case

good point(s):

  • this solution chunks aggregation state in multiple batches when downstream producer finishes, thus limiting memory needed to read aggregation state (in theory)

no performance numbers at the moment, and not much experience with datafusion

Any comments welcomed

@alamb
Copy link
Contributor Author

alamb commented Sep 13, 2022

I will try and review this sometime later today.

@milenkovicm
Copy link
Contributor

thanks @alamb,

just a follow up with some numbers

running a simple query on 10M distinct keys and 50M records total (each key has 5 records) ( /usr/bin/time -l ./bench_10m)

with this change:

      112.10 real       363.15 user        21.28 sys
          2246549504  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
             3689321  page reclaims
              503785  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
                5924  voluntary context switches
              442244  involuntary context switches
       2212997211010  instructions retired
       1232264629987  cycles elapsed
          1254973440  peak memory footprint

without change:

      109.09 real       355.84 user        23.10 sys
          3891322880  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
             5401700  page reclaims
                   0  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
                  15  voluntary context switches
              414334  involuntary context switches
       2222284866547  instructions retired
       1208398130091  cycles elapsed
          3404832768  peak memory footprint

@alamb
Copy link
Contributor Author

alamb commented Sep 13, 2022

Thanks for the info @milenkovicm

In general I think database systems tend to try and shy away from using mmap for spilling operations, for the reasons very clearly articulated in "Are You Sure You Want to Use MMAP in Your Database Management System?" https://db.cs.cmu.edu/mmap-cidr2022/

I think the current state of the art / best practice for dealing with grouping when the hashtable doesn't fit in memory is to:

  1. Sort the data by groups and write that data to a spill file (ideally sharing / reusing the code used for externalized sort)
  2. Create a new hash table, spilling repeatedly if needed
  3. When all the input has been processed, then the spill files are read back and merged (again sharing code for externalized sort) and a merge group by is performed.

Thanks to @yjshen we have much of the spilling / merging code already. We are lacking:

  • merge group by (where the groups are guaranteed to be contiguous in the input stream)
  • The orchestration logic to spill and then re-merge

Does that make sense? I know it is a high level description

@milenkovicm
Copy link
Contributor

thanks @alamb,
your comment makes perfect sense, I can't disagree with you. As I said in my comment approach is not without its problems and shortcomings. This solution is quite simple and it is question if there is a speed from simplicity :)

Would you have any design doc regarding merge group by and re-merge which you can share?

@alamb
Copy link
Contributor Author

alamb commented Sep 13, 2022

Would you have any design doc regarding merge group by and re-merge which you can share?

I do not have a design document, sadly. I can try and help write one (perhaps I can make a diagram and example) if you are interested in working on it @milenkovicm

@milenkovicm
Copy link
Contributor

milenkovicm commented Sep 13, 2022

I'd be interested. after a big sleep I think I get your approach, but if you can produce a diagram it would be great.
can't really commit on timeframe, will have to have a look at code first

thanks @alamb

@alamb
Copy link
Contributor Author

alamb commented Sep 14, 2022

😆 sounds good -- I will try and write up something in the coming days to start a discussion

alamb pushed a commit that referenced this issue Oct 15, 2022
* change how final aggregation row group is created ...

this change would prevent of cloning of whole state, doubling memory
needed for aggregation.

this PR relates to #1570

* Fix clippy issues

* read batch size from `session_config`
@alamb
Copy link
Contributor Author

alamb commented Nov 28, 2022

Update on this task is @crepererum has added initial support to track the memory use in the grouping operation. We @tustvold / @crepererum (and myself as ticket master) have plans to improve the grouping operation which I will make a bit more discoverable shortly

@alamb
Copy link
Contributor Author

alamb commented Jan 31, 2023

I'd be interested. after a big sleep I think I get your approach, but if you can produce a diagram it would be great.

@milenkovicm very belatedly, here is a document / diagrams: https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing

@mingmwang
Copy link
Contributor

@alamb @yjshen
Can we make the GroupState and the Accumulator states serializable ?
With this approach, we do not need to do any sort when spiiling data to disks. And when we read the data back, we reconstruct our raw hash table quickly from the hash values and indexes, because our hashmap is very lightweight, the hash value can be re-calculated from grouping rows, or we can cache the hash value inside the GroupState to avoid the re-calculating.

@crepererum
Copy link
Contributor

Can we make the GroupState and the Accumulator states serializable ? With this approach, we do not need to do any sort when spiiling data to disks. And when we read the data back, we reconstruct our raw hash table quickly from the hash values and indexes, because our hashmap is very lightweight, the hash value can be re-calculated from grouping rows, or we can cache the hash value inside the GroupState to avoid the re-calculating.

You still need to disk spilling, no? Or where do you store the serialized state? Also I guess that serialization may become a major bottleneck for some of the accumulators.

@mingmwang
Copy link
Contributor

Can we make the GroupState and the Accumulator states serializable ? With this approach, we do not need to do any sort when spiiling data to disks. And when we read the data back, we reconstruct our raw hash table quickly from the hash values and indexes, because our hashmap is very lightweight, the hash value can be re-calculated from grouping rows, or we can cache the hash value inside the GroupState to avoid the re-calculating.

You still need to disk spilling, no? Or where do you store the serialized state? Also I guess that serialization may become a major bottleneck for some of the accumulators.

Yes, we still need the disk spilling, the disk spilling can be managed and tracked by the disk_manager in the RuntimeEnv, but anyway it avoid sort the entire group data or hash table before the spilling.

@milenkovicm
Copy link
Contributor

I'm not sure I see many benefits of having it serializable, would agree with @crepererum
Now this discussion would make more sense if we would know more about your implementation.

IMHO, aggregation should start with hash map, we can assume that there is not going to be spill, if we're wrong we would pay penalty of being wrong as we will have to sort it before spill.

Once we have it spill to disc I'd argue it would make more sense to switch from hash map to b-tree, as we would need to merge it with spill, it is slower but from my experience it is a bit faster than sorting hash map.

Spilling can be implemented using two column parquet file (key: blob, value: blob) .

Implementation like this works quite well from my experience, especially that in most cases we wont trigger spill

@alamb
Copy link
Contributor Author

alamb commented Apr 27, 2023

Can we make the GroupState and the Accumulator states serializable ?
With this approach, we do not need to do any sort when spiiling data to disks. And when we read the data back, we reconstruct our raw hash table quickly from the hash values and indexes, because our hashmap is very lightweight, the hash value can be re-calculated from grouping rows, or we can cache the hash value inside the GroupState to avoid the re-calculating.

@mingmwang when we serialize the groups to disk when the hash table is full, we then need to read them back in again somehow and do the final combination. My assumption is that we don't have the memory to read them all back in at once as we had to spill in the first place

If we sort the data that is spilled on the group keys, we can stream data from the different spill files in parallel, merge, and then do a streaming group by, which we will have sufficient memory to accomplished.

IMHO, aggregation should start with hash map, we can assume that there is not going to be spill, if we're wrong we would pay penalty of being wrong as we will have to sort it before spill.

Yes I agree with @milenkovicm - this approach will work well. It does have a disadvantage of a performance "cliff" when the query goes from in memory --> spilling (it doesn't degrade gracefully) but it is likely the fastest for queries that have sufficient memory

@alamb
Copy link
Contributor Author

alamb commented Jul 15, 2023

BTW after #6889 I think we'll be in pretty good shape to dump the accumulator state to disk (as Arrow arrays, perhaps) if we want to pursue this more

@milenkovicm
Copy link
Contributor

Hi @alamb i have a poc code, a bit stale though, which will dump state to arrow. I've tried to rebase it recently but did not have time to finish it. It's in my branch at https://github.com/milenkovicm/arrow-datafusion/commit/e9d17d2a456707f88497606b3977a8a82199c7d1

Not sure if it still makes sense after all this time.

@alamb
Copy link
Contributor Author

alamb commented Jul 16, 2023

Hi @alamb i have a poc code, a bit stale though, which will dump state to arrow. I've tried to rebase it recently but did not have time to finish it. It's in my branch at https://github.com/milenkovicm/arrow-datafusion/commit/e9d17d2a456707f88497606b3977a8a82199c7d1

Thanks @milenkovicm -- that actually looks quite cool. If/when someone picks up this ticket I think it would be a good place to start from. I especially like how you encapsulated the spilling / state code in its own module

@kazuyukitanimura
Copy link
Contributor

Hello,

We have an internal spilling implementation for partial aggregation. We are currently in the middle of applying back the recent improvements of aggregation (such as #7095). After that is done, we plan to contribute back the work to DataFusion, but I just found this ticket. I am wondering what the current status of this ticket is.
Our implementation is quite similar to what the description of this thicket says, but (obviously) different from @milenkovicm 's poc.

I am wondering if the community is willing to review yet another spilling implementation...

@alamb
Copy link
Contributor Author

alamb commented Jul 27, 2023

After that is done, we plan to contribute back the work to DataFusion, but I just found this ticket. I am wondering what the current status of this ticket is.

That sounds good

I am wondering if the community is willing to review yet another spilling implementation as it is one of the missing pices

I am willing to review another spilling implementation for sure -- it is one of the missing features for datafusion to be a complete analytic solution

To assit review, it would help to explain / document how it works (e.g. is it based on mmap, or are the intermediate groups spilled as arrow, or are they sorted?) ideally in code comments

I made some diagrams here that might be helpful (but are mostly related to streaming groupby)
https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing

@kazuyukitanimura
Copy link
Contributor

Thank you @alamb
Our implementation is using the intermediate groups spilled as arrow, very similar to ExternalSorter. I will post the PR and doc shortly

@kazuyukitanimura
Copy link
Contributor

I finally posted a PR for this #7400
In particular, I explained how the implementation works at https://github.com/apache/arrow-datafusion/pull/7400/files#diff-0b72cdc7a5e02b6650430c3eb7bdea4f1d2d32867b927633d92abb4fcf31c81bR135
Thanks! cc @alamb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants