Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Python] Non decomposable aggregation UDF #35515

Closed
icexelloss opened this issue May 9, 2023 · 1 comment · Fixed by #35514
Closed

[C++][Python] Non decomposable aggregation UDF #35515

icexelloss opened this issue May 9, 2023 · 1 comment · Fixed by #35514

Comments

@icexelloss
Copy link
Contributor

Describe the enhancement requested

Non decomposable aggregation is aggregation that cannot be split into consume/merge/finalize. This is often when the logic rewritten with external python libraries (numpy, pandas, statmodels, etc) and those either cannot be decomposed or not worthy the effect (these are often one-off function instead of reusable one). This PR implements the support for non decomposable aggregation UDFs.

The major issue with non decomposable UDF is that the UDF needs to see all data at once, unlike scalar UDF where UDF only needs to see a batch at a time. This makes non decomposable not so useful as it is same as collect all the data to a pd.DataFrame and apply the UDF on it. However, one very application of non decomposable UDF is with segmented aggregation. To refresh, segmented aggregation works on ordered data and passed one logic chunk at a time (e.g., all data with the same date). With segmented aggregation and non decomposable aggregation UDF, the user can apply any custom aggregation logic over large stream of ordered data, with the memory overhead of a single segment.

Component(s)

C++, Python

@icexelloss
Copy link
Contributor Author

take

@icexelloss icexelloss added this to the 13.0.0 milestone Jun 8, 2023
icexelloss added a commit that referenced this issue Jun 29, 2023
### Rationale for this change

In #35515,

I have implemented a Scalar version of the non decomposable UDF (Scalar as in SCALAR_AGGREGATE). I would like to support the Hash version of it (Hash as in HASH_AGGREGATE)

With this PR, user can register an aggregate UDF once with `pc.register_aggregate_function` and it can be used as both scalar aggregate function and hash aggregate function.

Example:

```
def median(x):
    return pa.scalar(np.nanmedian(x))

pc.register_aggregate_function(func=median, func_name='median_udf', ...)

table = ...
table.groupby("id").aggregate([("v", 'median_udf')])
```

### What changes are included in this PR?

The main changes are:
* In ResigterAggregateFunction (udf.cc), we now register the function both as a scalar aggregate function and a hash aggregate function (with signature adjustment for hash aggregate kernel because we need to append the grouping key)
* Implemented PythonUdfHashAggregateImpl, similar to the PythonUdfScalarAggregateImpl. In Consume, it will accumulate both the input batches and the group id array. In Merge, it will merge the input batches and group id array (with the group_id_mapping). In Finalize, it will apply groupings to the accumulated batches to create one record batch per group, then apply the UDF over each group.
* Some code clean up - `UdfWrapperCallback` objects are named `cb` (previously, `agg_cb` or `wrapper`) now and the user defined python function is now just called `function` (previously `agg_function`) 

For table.groupby().aggregate(...), the space complexity is O(n) where n is the size of the table (and therefore, is not very useful). However, this is more useful in the segmented aggregation case, where the space complexity of O(s), where s the size of the segments.

### Are these changes tested?
Added new test in test_udf.py (with table.group_by().aggregate() and test_substrait.py (with segmented aggregation)

### Are there any user-facing changes?
Yes with this change, user can call use registered aggregate UDF with `table.group_by().aggregate() ` or Acero's segmented aggregation.

### Checklist

- [x] Self Review
- [x] API Documentation

* Closes: #36252

Lead-authored-by: Li Jin <ice.xelloss@gmail.com>
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
icexelloss added a commit that referenced this issue Aug 8, 2023
### Rationale for this change
In Arrow compute, there are four main types of functions: Scalar, Vector, ScalarAggregate and HashAggregate.

Some of the previous work added support for Scalar, ScalarAggregate(#35515) and HashAggregate(#36252). I think it makes sense to add support for vector function as well to complete all non-decomposable UDF kernel support.

Internally, we plan to extend Acero to implement a "SegmentVectorNode" which would use this API to invoke vector on a segment by segment basis, which will allow to use constant memory to compute things like "rank the value across all rows per segment using a python UDF".

### What changes are included in this PR?
The change includes is very similar to the support for aggregate function, which includes code to register the vector UDF, and a kernel that invokes the vector UDF on given inputs.

### Are these changes tested?
Yes. Added new test.

### Are there any user-facing changes?
Yes. This adds an user-facing API to register the vector function. 

* Closes: #36672

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
…ache#36673)

### Rationale for this change
In Arrow compute, there are four main types of functions: Scalar, Vector, ScalarAggregate and HashAggregate.

Some of the previous work added support for Scalar, ScalarAggregate(apache#35515) and HashAggregate(apache#36252). I think it makes sense to add support for vector function as well to complete all non-decomposable UDF kernel support.

Internally, we plan to extend Acero to implement a "SegmentVectorNode" which would use this API to invoke vector on a segment by segment basis, which will allow to use constant memory to compute things like "rank the value across all rows per segment using a python UDF".

### What changes are included in this PR?
The change includes is very similar to the support for aggregate function, which includes code to register the vector UDF, and a kernel that invokes the vector UDF on given inputs.

### Are these changes tested?
Yes. Added new test.

### Are there any user-facing changes?
Yes. This adds an user-facing API to register the vector function. 

* Closes: apache#36672

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant