Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize FinalAggregation/SingleAggregation #9829

Open
shiyu-bytedance opened this issue May 15, 2024 · 3 comments
Open

Parallelize FinalAggregation/SingleAggregation #9829

shiyu-bytedance opened this issue May 15, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@shiyu-bytedance
Copy link
Contributor

Description

Background

Currently, there can be a number of drivers partialAgg(each run by a driver), but always 1 finalAgg/singleAgg; meaning finalAgg/singleAgg always uses a single core. Our goal is to alter FinalAggregation/SingleAggregation so that it uses all cores on a node.

Current HashTable architecture
Blank diagram (1)

The TagPointerPair array is a contiguous alloc of memory that contains pointers of Row payloads in the RowContainer. The TagPointerPair array is not thread safe and thus can only be updated by 1 thread at a time.
Row's are allocated from the Operator's MemoryPool. The Operator's MemoryPool is thread unsafe and is a proxy to the global MemoryAllocator. The global MemoryAllocator is threadsafe through an internal mutex.

Proposed HashTable architecture
Blank diagram (2)

The overall idea is to eliminate thread unsafe data structures so that finalAgg can use all available cores on a node to run. Assuming a machine with 3 cores, we will create three:

  • TagPointerPair arrays
  • Operator MemoryPool's

RowContainer itself will remain unchanged.

New finalAgg/SingleAgg addInput flow

Upon receiving an input RowVector, finalAgg/singleAgg logic will remain unchanged until hashes of all input rows are computed. At this point, assuming a machine of 3 cores, the remaining work will be done by 3 threads, each working on a 3-way partitioned range of the total hash range.

New finalAgg/singleAgg getOutput flow

getOutput will remain essentially unchanged because it will retrieve output rows through the rowContainer::listRows() API.

Complication with spilling
Writing out spills:
Currently, spiller uses rowContainer::listRows() to read rows to spill; this logic can remain as is.
Reading back in spills:
When spills are read back in, they are merged into a brand new mergedRows_ RowContainer. This logic is orthologonal and unrelated to the proposed changes and additional logic in this doc.

Result measurements
Local benchmark
A local benchmark demonstrating the gains in finalAgg.addInput path will be authored and results collected.
Production
Similar to updateSpillFillTime, which times and collects timing for a particular section of Operator logic, we will add similar logic to time finalAgg.addInput and compare the before/after performance.

@mbasmanova @xiaoxmeng @oerl

@shiyu-bytedance shiyu-bytedance added the enhancement New feature or request label May 15, 2024
@mbasmanova
Copy link
Contributor

there can be a number of drivers partialAgg(each run by a driver), but always 1 finalAgg/singleAgg

This is not the case. Final / Single agg can run multi-threaded as long as inputs are hash-partitioned on group-by keys.

@shiyu-bytedance
Copy link
Contributor Author

@mbasmanova Thank you for the insight. Could this improvement be applicable for cases where input is not hash-partitioned on group-by keys?

@mbasmanova
Copy link
Contributor

Could this improvement be applicable for cases where input is not hash-partitioned on group-by keys?

To run final agg concurrently, one needs to partition data on grouping keys. This can be achieved with local or remote shuffle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants