Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Implement Hash Aggregation query execution node #21501

Closed
asfimport opened this issue Mar 25, 2019 · 3 comments
Closed

[C++] Implement Hash Aggregation query execution node #21501

asfimport opened this issue Mar 25, 2019 · 3 comments

Comments

@asfimport
Copy link

asfimport commented Mar 25, 2019

Dear all,

I wonder what the best way forward is for implementing GroupBy kernels. Initially this was part of

https://issues.apache.org/jira/browse/ARROW-4124

but is not contained in the current implementation as far as I can tell.

It seems that the part of group by that just returns indices could be conveniently implemented with the HashKernel. That seems useful in any case. Is that indeed the best way forward/should this be done?

GroupBy + Aggregate could then either be implemented with that + the Take kernel + aggregation involving more memory copies than necessary though or as part of the aggregate kernel. Probably the latter is preferred, any thoughts on that?

Am I missing any other JIRAs related to this?

Best, Philipp.

Reporter: Philipp Moritz / @pcmoritz

Related issues:

Note: This issue was originally created as ARROW-5002. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Francois Saint-Jacques / @fsaintjacques:
The Take kernel should be out of the solution, like you mentioned, we want to minimize memory copy. My intent is to extend the AggregateFunction class with the ConsumeWithFilters and ConsumeWithGroups as follows:

class AggregateFunction {                                                                                     
 public:                                                                                                      
  /// \brief Consume an array into a state.                                                                   
  /// `SELECT AGG(x) FROM T;`                                                                                 
  virtual Status Consume(const Array& input, void* state) const = 0;             
                            
  /// \brief Consume an array with a mask into a state.                                                       
  /// `SELECT AGG(x) FROM T WHERE pred;`                                                                      
  virtual Status ConsumeWithFilter(const Array& input, const Array& mask, void* state) const = 0;             

  /// \brief Consume an array with a scatter group into states.                                               
  /// `SELECT k, AGG(x) FROM T GROUP BY k;`                                                                      
  /// `SELECT k, AGG(x) FROM T WHERE pred GROUP BY k;`                                                           
  virtual Status ConsumeWithGroups(const Array& input, const Array& groups, IndexableState* states) const = 0;

The GroupBy kernel would emit an array of index (it also needs to provide the hash table of original keys). One desirable property of the GroupBy kernel is that it is not a full barrier to run the aggregates, in other words, you can run GroupBy & aggregates in parallel assuming you have a final consolidating phase (which is the barrier). Read section 4.4. of Morsel-driven parallelism: a NUMA-aware query evaluation framework for the many-core age.

As a side note, we should specialize the GroupBy on the group types, e.g. for any primitive types of width <=16 (assuming a single column expression), we can use a fixed size array and no hash required.

@asfimport
Copy link
Author

Wes McKinney / @wesm:
I think what we are discussing here is called "hash aggregation" in the database literature and is a type of execution node. This is related to ARROW-3978, where we need to be able to compute group ordinal indexes via hashing for multiple keys

I'm not sure about the "ConsumeWithGroups" API. What do other analytic database engines (Impala, Clickhouse, etc.) do?

@asfimport
Copy link
Author

Wes McKinney / @wesm:
I renamed the issue. I need to be able to execute hash aggregations in the next few months so I be working to implement the appropriate machinery for this under arrow/compute (since hash aggregations need to compose with array/kernel expressions)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant