-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[Aggregate] DISTINCT aggregates *with* GROUP BY are now executed in parallel #5146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…s with move constructor
…ther distinct/non-distinct aggregates
…ut for Sink, but all aggregate children come out as NULL
…is data with GetData
…using the size of the group_expressions instead of the size of the grouping_set of the current grouping when initializing the grouping_set of the distinct aggregate data
This reverts commit c1a7168.
…yond the boundaries of the current rowgroup" This reverts commit a5f3a0e.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Looks great. Very nice performance improvement!
Some comments below:
test/sql/aggregate/distinct/grouped/distinct_and_non_distinct_mixed.test
Show resolved
Hide resolved
src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp
Outdated
Show resolved
Hide resolved
|
Could we also add some large grouping set tests with TPC-H, e.g.: CALL dbgen(sf=1);
select grouping(l_returnflag, l_linestatus), l_returnflag, l_linestatus, count(distinct l_orderkey), count(distinct l_comment)
from lineitem
group by cube(l_returnflag, l_linestatus)
order by all;
SELECT COUNT(DISTINCT l_orderkey), COUNT(DISTINCT l_partkey), COUNT(*), MIN(l_orderkey), MAX(l_orderkey), MIN(l_partkey), MAX(l_partkey), SUM(distinct_comment), AVG(distinct_comment)
FROM (
select l_orderkey, l_partkey, count(distinct l_comment) AS distinct_comment
from lineitem
group by cube(l_orderkey, l_partkey)
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - one more comment and some answers to inline questions, otherwise everything looks good.
|
Ran all the tests introduced in this PR on master, all pass except for the case mentioned in #5070 as expected. |
|
Thanks! Everything looks good now |
This PR addresses #4275
Previously the GROUP BY aggregates (handled by the
HashAggregateOperator) would be forced to run (essentially) single-threaded when any of the aggregates to be computed would have the DISTINCT modifier.This was done because aggregate states were always being directly computed.
If that would be done for DISTINCT aggregates, that could result in two threads both updating their local aggregate state, and when merging there would be no way to reconcile that problem.
What now happens is for every
grouping_set(a regular GROUP BY regardless of the number of columns is considered onegrouping_set) is that we createRadixPartitionedHashTables for every distinct aggregate we have.These hash tables are then grouped on the columns of the group + the children (inputs) of the distinct aggregate.
That way we can ensure that we register each new (distinct) combination of those columns, and we preserve the context so we can create these in parallel and merge without loss of information.
We also delay computing the
AggregateStatefor these aggregates until theFinalizestep, where we scan the hashtables of the distinct aggregate for a grouping, and add them to the main hashtable for that grouping.I have added a benchmark:
old timings (master):
new timings (this branch):