Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized hash grouping #6904

Merged
merged 32 commits into from Jul 13, 2023
Merged

Vectorized hash grouping #6904

merged 32 commits into from Jul 13, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jul 10, 2023

Which issue does this PR close?

Part of #6889
Closes #6800
Closes #4973
Closes #956
Closes #846
Closes #418

Rationale for this change

Much faster grouping performance and lower memory usage for large numbers of groups

TPCH (SF1)

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ alamb_fast_gby_hash ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  521.08ms │            478.59ms │ +1.09x faster │
│ QQuery 2     │  238.03ms │            148.34ms │ +1.60x faster │
│ QQuery 3     │  171.12ms │            156.71ms │ +1.09x faster │
│ QQuery 4     │  113.90ms │            111.03ms │     no change │
│ QQuery 5     │  366.87ms │            359.57ms │     no change │
│ QQuery 6     │   39.49ms │             37.80ms │     no change │
│ QQuery 7     │  858.86ms │            854.47ms │     no change │
│ QQuery 8     │  234.09ms │            219.26ms │ +1.07x faster │
│ QQuery 9     │  564.17ms │            535.61ms │ +1.05x faster │
│ QQuery 10    │  323.68ms │            323.61ms │     no change │
│ QQuery 11    │  171.33ms │            153.55ms │ +1.12x faster │
│ QQuery 12    │  154.57ms │            156.19ms │     no change │
│ QQuery 13    │  562.33ms │            286.53ms │ +1.96x faster │
│ QQuery 14    │   45.14ms │             47.25ms │     no change │
│ QQuery 15    │   81.27ms │             49.87ms │ +1.63x faster │
│ QQuery 16    │  187.83ms │            154.13ms │ +1.22x faster │
│ QQuery 17    │ 2597.93ms │            893.75ms │ +2.91x faster │
│ QQuery 18    │ 2239.90ms │           1505.12ms │ +1.49x faster │
│ QQuery 19    │  156.73ms │            145.82ms │ +1.07x faster │
│ QQuery 20    │  779.21ms │            309.94ms │ +2.51x faster │
│ QQuery 21    │ 1049.99ms │            997.71ms │     no change │
│ QQuery 22    │   85.38ms │             85.93ms │     no change │
└──────────────┴───────────┴─────────────────────┴───────────────┘

ClickBench results

+-------+----------+-------------------+--------------+----------+-------------------+--------------+
| query | main_min | fast_gby_hash_min | cmp          | main_avg | fast_gby_hash_avg | cmp21        |
+-------+----------+-------------------+--------------+----------+-------------------+--------------+
| 1     | 0.439    | 0.41              | NO CHANGE    | 1.24     | 1.23              | NO CHANGE    |
| 2     | 0.199    | 0.2               | NO CHANGE    | 0.24     | 0.23              | NO CHANGE    |
| 3     | 0.38     | 0.46              | NO CHANGE    | 0.42     | 0.7               | NO CHANGE    |
| 4     | 0.386    | 0.4               | NO CHANGE    | 0.63     | 0.65              | NO CHANGE    |
| 5     | 8.945    | 3.87              | 2.31x faster | 9.06     | 4.12              | 2.2x faster  |
| 6     | 5.782    | 4.0               | NO CHANGE    | 7.59     | 4.11              | 1.85x faster |
| 7     | 0.375    | 0.19              | 1.94x faster | 0.41     | 0.22              | 1.86x faster |
| 8     | 0.401    | 0.21              | 1.92x faster | 0.46     | 0.25              | 1.88x faster |
| 9     | 10.36    | 4.58              | 2.26x faster | 16.85    | 4.75              | 3.55x faster |
| 10    | 8.818    | 8.3               | NO CHANGE    | 9.07     | 8.46              | NO CHANGE    |
| 11    | 1.194    | 1.06              | NO CHANGE    | 1.3      | 1.17              | NO CHANGE    |
| 12    | 1.286    | 1.13              | NO CHANGE    | 1.38     | 1.24              | NO CHANGE    |
| 13    | 5.252    | 3.44              | 1.53x faster | 5.37     | 3.53              | 1.52x faster |
| 14    | 10.012   | 6.52              | 1.54x faster | 10.23    | 6.8               | 1.5x faster  |
| 15    | 5.555    | 3.64              | 1.53x faster | 5.72     | 3.77              | 1.52x faster |
| 16    | 10.95    | 4.19              | 2.61x faster | 11.15    | 4.38              | 2.54x faster |
| 17    | 15.721   | 8.37              | 1.88x faster | 16.27    | 8.72              | 1.87x faster |
| 18    | 14.158   | 8.16              | 1.73x faster | 14.38    | 8.24              | 1.75x faster |
| 19    | 33.824   | 16.62             | 2.04x faster | 37.64    | 17.31             | 2.17x faster |
| 20    | 0.29     | 0.3               | NO CHANGE    | 0.52     | 0.51              | NO CHANGE    |
| 21    | 3.696    | 3.69              | NO CHANGE    | 5.2      | 5.17              | NO CHANGE    |
| 22    | 4.591    | 4.58              | NO CHANGE    | 6.2      | 6.22              | NO CHANGE    |
| 23    | 13.191   | 13.24             | NO CHANGE    | 15.07    | 15.22             | NO CHANGE    |
| 24    | 31.688   | 32.11             | NO CHANGE    | 36.17    | 37.21             | NO CHANGE    |
| 25    | 1.824    | 1.85              | NO CHANGE    | 2.23     | 2.24              | NO CHANGE    |
| 26    | 1.499    | 1.49              | NO CHANGE    | 1.59     | 1.58              | NO CHANGE    |
| 27    | 1.883    | 1.85              | NO CHANGE    | 2.57     | 2.26              | NO CHANGE    |
| 28    | 6.902    | 5.39              | NO CHANGE    | 9.19     | 6.61              | NO CHANGE    |
| 29    | 15.324   | 15.13             | NO CHANGE    | 19.39    | 15.63             | NO CHANGE    |
| 30    | 1.672    | 1.62              | NO CHANGE    | 1.71     | 1.69              | NO CHANGE    |
| 31    | 6.689    | 3.13              | 2.14x faster | 7.02     | 4.23              | 1.66x faster |
| 32    | 9.18     | 8.13              | NO CHANGE    | 9.73     | 9.17              | NO CHANGE    |
| 33    |          |                   |              |          |                   |              |
| 34    | 22.425   | 18.66             | NO CHANGE    | 23.43    | 29.08             | NO CHANGE    |
| 35    | 23.101   | 18.83             | NO CHANGE    | 23.69    | 19.46             | NO CHANGE    |
| 36    | 10.674   | 5.15              | 2.07x faster | 10.85    | 5.26              | 2.06x faster |
| 37    | 0.845    | 0.79              | NO CHANGE    | 0.93     | 0.84              | NO CHANGE    |
| 38    | 0.465    | 0.43              | NO CHANGE    | 0.5      | 0.47              | NO CHANGE    |
| 39    | 0.386    | 0.37              | NO CHANGE    | 0.45     | 0.42              | NO CHANGE    |
| 40    | 1.546    | 1.59              | NO CHANGE    | 1.66     | 1.68              | NO CHANGE    |
| 41    | 0.222    | 0.2               | NO CHANGE    | 0.27     | 0.24              | NO CHANGE    |
| 42    | 0.207    | 0.19              | NO CHANGE    | 0.24     | 0.22              | NO CHANGE    |
| 43    | 0.237    | 0.22              | NO CHANGE    | 0.27     | 0.24              | NO CHANGE    |
+-------+----------+-------------------+--------------+----------+-------------------+--------------+
Methodology

1. Run `bash.run.sh` this script: https://github.com/ClickHouse/ClickBench/tree/main/datafusion 2. Save the resulting `result.csv`: [result-fast_gby_hash.csv](https://github.com/apache/arrow-datafusion/files/12014914/result-fast_gby_hash.csv) [result-main.csv](https://github.com/apache/arrow-datafusion/files/12014915/result-main.csv) 3. Run report: `datafusion-cli -f report.sql`

report.sql

create external table main(query int, run int, time float) stored as csv location 'result-main.csv';
create external table fast_gby_hash(query int, run int, time float) stored as csv location 'result-fast_gby_hash.csv';

--Comparison

SELECT
  main.query as query,
  main.min_time as main_min,
  round(fast_gby_hash.min_time, 2) as fast_gby_hash_min,

  CASE
    -- ignore results that are within 5%
    WHEN abs(1 - abs(main.min_time / fast_gby_hash.min_time)) < 0.5
      THEN 'NO CHANGE'
    WHEN main.min_time > fast_gby_hash.min_time
      THEN round(main.min_time / fast_gby_hash.min_time, 2) || 'x faster'
    ELSE
      round(fast_gby_hash.min_time / main.min_time, 2) || 'x slower'
  END as cmp,

  round(main.avg_time, 2) as main_avg,
  round(fast_gby_hash.avg_time,2) as fast_gby_hash_avg,
  CASE
    -- ignore results that are within 5%
    WHEN abs(1 - abs(main.avg_time / fast_gby_hash.avg_time)) < 0.5
      THEN 'NO CHANGE'
    WHEN main.avg_time > fast_gby_hash.avg_time
      THEN round(main.avg_time / fast_gby_hash.avg_time, 2) || 'x faster'
    ELSE
      round(fast_gby_hash.avg_time / main.avg_time, 2) || 'x slower'
  END as cmp21
FROM
  (select query, min(time) as min_time, avg(time) as avg_time from main group by 1) as main
    JOIN
  (select query, min(time) as min_time, avg(time) as avg_time from fast_gby_hash group by 1) as fast_gby_hash
    USING (query)
ORDER BY
  query
;

What changes are included in this PR?

The code in this PR was written by myself and @Dandandan

  • Rewrite GroupedHashAggregateStream to use vectorized / multi-group updates
  • A new GroupsAccumulator trait with the new vectorized API for managing and updating group state
  • An generic implementation of GroupsAccumulator for all aggregators that have RowAccumulator variants
  • Fuzz testing of new code accumulate function
  • An adapter that implements GroupsAccumulator in terms of Accumulator (for slower, but simpler accumulators)
  • Significant documentation for all new code

Here is the list of RowAccumulators (aka accumulators that have specialized implementations).

  • CountRowAccumulator
  • MaxRowAccumulator
  • MinRowAccumulator
  • AvgRowAccumulator
  • SumRowAccumulator
  • BitAndRowAccumulator
  • BitOrRowAccumulator
  • BitXorRowAccumulator
  • BoolAndRowAccumulator
  • BoolOrRowAccumulator

Follow on work

I plan to remove the GroupedAggregateStream (and all RowAccumulators) as a follow on PR

Are these changes tested?

Yes -- both new and existing tests

Are there any user-facing changes?

Much faster performance -- see above

@github-actions github-actions bot added physical-expr Physical Expressions core Core datafusion crate labels Jul 10, 2023
@@ -16,100 +16,181 @@
// under the License.

//! Hash aggregation through row format
//!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This diff is going to be hard to read -- I recommend anyone interested look at the code directly here: https://github.com/alamb/arrow-datafusion/blob/alamb/fast_gby_hash/datafusion/core/src/physical_plan/aggregates/row_hash.rs

self.sums
.resize_with(total_num_groups, || T::default_value());

// NullState dispatches / handles tracking nulls and groups that saw no values
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite pleased that most of the update logic for handling nulls / filters is encapsulated (and tested) in NullState::accumulate

group_indices,
values.nulls(), // ignore values
opt_filter,
|group_index| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use wrapping add as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what the implications are -- would that be faster (but not check overflow)? I am happy to change it but I honestly don't know

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested it mainly for consistency.
AFAIK wrapping_add doesn't check for overflow but wraps, which is the default already, but in debug mode it causes an exception.
Maybe we should remove the wrapping_add in the other places and "just" use + etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I tried to use wrapping_add then rustc told me I needed to check the return type. Given I never expect an overflow and this is on the hot path, I think we should not use wrapping_add

warning: unused return value of `core::num::<impl i64>::wrapping_add` that must be used
   --> datafusion/physical-expr/src/aggregate/count.rs:130:17
    |
130 |                 self.counts[group_index].wrapping_add(1);
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |
    = note: this returns the result of the operation, without modifying the original
    = note: `#[warn(unused_must_use)]` on by default
help: use `let _ = ...` to ignore the resulting value
    |
130 |                 let _ = self.counts[group_index].wrapping_add(1);
    |                 +++++++

Maybe we should remove the wrapping_add in the other places and "just" use + etc.

What other places are you referring to? I didn't see any in this PR but maybe I am misunderstanding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using add_wrapping in sum, average:

sum.add_wrapping(new_value) etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was confused -- I used add_wrapping (part of std::ops) rather than wrapping_add (part of Arrow NativeType) 🤦

I played around with it -- it turns out anything with an actual rust type (like i64) I can use += but for templated code on ArrowNativeType I can't use += I have to use wrapping_add

This is how the code would look like if I use add_wrapping, which I think looks a bit more tortured (though it is consistent). Do you think I should make the change?

diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index 287970de29..cfb1713d8e 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -156,9 +156,10 @@ impl GroupsAccumulator for CountGroupsAccumulator {
                 .iter()
                 .zip(group_indices.iter())
                 .zip(partial_counts.iter())
-                .for_each(|((filter_value, &group_index), partial_count)| {
+                .for_each(|((filter_value, &group_index), &partial_count)| {
                     if let Some(true) = filter_value {
-                        self.counts[group_index] += partial_count;
+                        let count = &mut self.counts[group_index];
+                        *count = count.add_wrapping(partial_count);
                     }
                 }),
             None => group_indices.iter().zip(partial_counts.iter()).for_each(

.zip(partial_counts.iter())
.for_each(|((filter_value, &group_index), partial_count)| {
if let Some(true) = filter_value {
self.counts[group_index] += partial_count;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapping_add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above: #6904 (comment)

@@ -44,6 +49,9 @@ impl<T> VecAllocExt for Vec<T> {

self.push(x);
}
fn allocated_size(&self) -> usize {
std::mem::size_of::<T>() * self.capacity()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this calculation into its own function as leaving it inline in the group code made it harder to follow in my opinion

@@ -1022,6 +1112,224 @@ impl RowAccumulator for MinRowAccumulator {
}
}

trait MinMax {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold do you know of any pre-existing trait that will provide a min based on type?

@@ -0,0 +1,879 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains the templated, inner loops for the accumulators that handles filtering and nulls with specialized loops. I think it could be made faster with some more specialization but I think the complexity might have diminishing returns.

.map(|array| array.slice(offset, length))
.collect();

if let Some(f) = filter_opt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but seems weird/inefficient to me the filter works on sliced arrays rather than just on the original arrays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I agree -- that might make a nice follow on PR if someone wants to optimize aggregates with filters. They aren't used in TPCH or ClickBench so this isn't likely to show up in any profiling we are doing

@alamb alamb marked this pull request as ready for review July 11, 2023 11:14
@alamb
Copy link
Contributor Author

alamb commented Jul 11, 2023

I think this PR is ready to run. I am still working to gather some clickbench numbers, but otherwise I think this is ready for review

@alamb
Copy link
Contributor Author

alamb commented Jul 11, 2023

Given the size and scope of this PR I plan to leave it open for at least a few days to allow anyone who wants to a chance to review or comment. If you need more time please let me know

Also, thank you so much @Dandandan for all the help. It has been great working on this PR with you

@alamb
Copy link
Contributor Author

alamb commented Jul 11, 2023

(added clickbench results to the PR summary: TLDR they look quite promising :bowtie: )

Update: q32 now runs too where previously it failed due to OOM

Details

❯ CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 'hits.parquet';
0 rows in set. Query took 0.086 seconds.
❯ SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;
+---------------------+-------------+---+---------------------+---------------------------+
| WatchID             | ClientIP    | c | SUM(hits.IsRefresh) | AVG(hits.ResolutionWidth) |
+---------------------+-------------+---+---------------------+---------------------------+
| 8566928176839891583 | -1402644643 | 2 | 0                   | 1368.0                    |
| 7224410078130478461 | -776509581  | 2 | 0                   | 1368.0                    |
| 6655575552203051303 | 1611957945  | 2 | 0                   | 1638.0                    |
| 7904046282518428963 | 1509330109  | 2 | 0                   | 1368.0                    |
| 7506464702820257539 | -1066330088 | 1 | 0                   | 1996.0                    |
| 6553204517012084433 | 74043427    | 1 | 0                   | 1087.0                    |
| 8893914839453858858 | -1598586988 | 1 | 0                   | 1990.0                    |
| 7282498940148684154 | -1598586988 | 1 | 0                   | 1990.0                    |
| 4993912885580444823 | -1598586988 | 1 | 0                   | 1990.0                    |
| 6971535228112507024 | 1953699426  | 1 | 0                   | 1011.0                    |
+---------------------+-------------+---+---------------------+---------------------------+
10 rows in set. Query took 57.068 seconds.

r4ntix and others added 6 commits July 13, 2023 07:33
* parallel csv scan

* add max line length

* Update according to review comments

* Update Configuration doc

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
…ps/decimals (apache#6939)

* Add additional test coverage for aggregaes using dates/times/timestamps/decimals

* Add coverage for date32/date64
@alamb
Copy link
Contributor Author

alamb commented Jul 13, 2023

I wrote some additional tests and actually found another bug (related to bitand): #6952

@alamb
Copy link
Contributor Author

alamb commented Jul 13, 2023

a98b6a0 and 5ab75b1 are new -- the rest of the commits are from merging.

I may just merge this PR in as is and then fix the remaining bugs on master to avoid keeping it open for too much longer

@alamb
Copy link
Contributor Author

alamb commented Jul 13, 2023

I am going to run one final performance measurement and if that looks good merge this in and keep working on main

@alamb
Copy link
Contributor Author

alamb commented Jul 13, 2023

My most recent changes have slowed things down a little, but I have an idea of how to get the performance back. Let's merge this and proceed with development on main.

Thank you everyone for your help and encouragement

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ alamb_fast_gby_hash_pre ┃ alamb_fast_gby_hash ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │                484.92ms │            536.15ms │  1.11x slower │
│ QQuery 2     │                148.89ms │            153.45ms │     no change │
│ QQuery 3     │                165.97ms │            162.14ms │     no change │
│ QQuery 4     │                114.07ms │            127.66ms │  1.12x slower │
│ QQuery 5     │                366.55ms │            378.28ms │     no change │
│ QQuery 6     │                 41.96ms │             38.40ms │ +1.09x faster │
│ QQuery 7     │                868.25ms │            895.61ms │     no change │
│ QQuery 8     │                246.07ms │            241.03ms │     no change │
│ QQuery 9     │                572.43ms │            563.44ms │     no change │
│ QQuery 10    │                317.69ms │            311.76ms │     no change │
│ QQuery 11    │                160.81ms │            167.52ms │     no change │
│ QQuery 12    │                156.69ms │            174.96ms │  1.12x slower │
│ QQuery 13    │                284.68ms │            312.62ms │  1.10x slower │
│ QQuery 14    │                 48.32ms │             50.55ms │     no change │
│ QQuery 15    │                 50.41ms │             52.45ms │     no change │
│ QQuery 16    │                149.03ms │            148.36ms │     no change │
│ QQuery 17    │                812.65ms │            882.47ms │  1.09x slower │
│ QQuery 18    │               1560.64ms │           1575.56ms │     no change │
│ QQuery 19    │                154.62ms │            162.10ms │     no change │
│ QQuery 20    │                305.76ms │            305.81ms │     no change │
│ QQuery 21    │               1028.46ms │           1057.46ms │     no change │
│ QQuery 22    │                 92.21ms │             88.00ms │     no change │
└──────────────┴─────────────────────────┴─────────────────────┴───────────────┘

@alamb alamb merged commit e0cc8c8 into apache:main Jul 13, 2023
21 checks passed
@alamb alamb deleted the alamb/fast_gby_hash branch July 13, 2023 14:24
@Dandandan
Copy link
Contributor

amazing 🚀

@mingmwang
Copy link
Contributor

@alamb @Dandandan @ozankabak
After this PR, the bottleneck of tpch q17 and q18 is the join ordering. I will start work on the CBO stats estimation from next week and enhance the join ordering(Join Selection rule).

@alamb
Copy link
Contributor Author

alamb commented Jul 13, 2023

After this PR, the bottleneck of tpch q17 and q18 is the join ordering. I will start work on the CBO stats estimation from next week and enhance the join ordering(Join Selection rule).

Awesome -- thank you @mingmwang

cc @ozankabak who I think has been thinking about join ordering as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core datafusion crate physical-expr Physical Expressions sqllogictest
Projects
None yet
8 participants