Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36252: [Python] Add non decomposable hash aggregate UDF #36253

Merged
merged 11 commits into from
Jun 29, 2023

Conversation

icexelloss
Copy link
Contributor

@icexelloss icexelloss commented Jun 22, 2023

Rationale for this change

In #35515,

I have implemented a Scalar version of the non decomposable UDF (Scalar as in SCALAR_AGGREGATE). I would like to support the Hash version of it (Hash as in HASH_AGGREGATE)

With this PR, user can register an aggregate UDF once with pc.register_aggregate_function and it can be used as both scalar aggregate function and hash aggregate function.

Example:

def median(x):
    return pa.scalar(np.nanmedian(x))

pc.register_aggregate_function(func=median, func_name='median_udf', ...)

table = ...
table.groupby("id").aggregate([("v", 'median_udf')])

What changes are included in this PR?

The main changes are:

  • In ResigterAggregateFunction (udf.cc), we now register the function both as a scalar aggregate function and a hash aggregate function (with signature adjustment for hash aggregate kernel because we need to append the grouping key)
  • Implemented PythonUdfHashAggregateImpl, similar to the PythonUdfScalarAggregateImpl. In Consume, it will accumulate both the input batches and the group id array. In Merge, it will merge the input batches and group id array (with the group_id_mapping). In Finalize, it will apply groupings to the accumulated batches to create one record batch per group, then apply the UDF over each group.
  • Some code clean up - UdfWrapperCallback objects are named cb (previously, agg_cb or wrapper) now and the user defined python function is now just called function (previously agg_function)

For table.groupby().aggregate(...), the space complexity is O(n) where n is the size of the table (and therefore, is not very useful). However, this is more useful in the segmented aggregation case, where the space complexity of O(s), where s the size of the segments.

Are these changes tested?

Added new test in test_udf.py (with table.group_by().aggregate() and test_substrait.py (with segmented aggregation)

Are there any user-facing changes?

Yes with this change, user can call use registered aggregate UDF with table.group_by().aggregate() or Acero's segmented aggregation.

Checklist

  • Self Review
  • API Documentation

@github-actions github-actions bot added the awaiting committer review Awaiting committer review label Jun 22, 2023
@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Jun 22, 2023
@@ -355,18 +536,10 @@ Status RegisterTabularFunction(PyObject* user_function, UdfWrapperCallback wrapp
wrapper, options, registry);
}

Status AddAggKernel(std::shared_ptr<compute::KernelSignature> sig,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inlined now.

@github-actions github-actions bot added Component: Python awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Jun 22, 2023
@icexelloss
Copy link
Contributor Author

icexelloss commented Jun 22, 2023

@westonpace I would like a request a review on this PR. The code should be relatively straight forward and similar to #35514 so hopefully no confusion/surprises here.

For the implementation of the grouping, I used an approach similar to GroupedListImpl aggregator and partition.cc

For the registration, I decided to register both scalar/hash kernel with one API register_aggregate_function because I think scalar/hash difference is not really something user should be worry about (from user's point of view, it is just "aggregation", whether it is hash/scalar is an compute implementation detail).

More details in the PR description.

Let me know if those sounds OK to you.

@icexelloss icexelloss changed the title GH-36252: [Python] Compute hash aggregate udf GH-36252: [Python] Add non decomposable hash aggregate UDF Jun 22, 2023
@westonpace
Copy link
Member

@icexelloss I should have some time to take a look tomorrow

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good set of tests. It's nice and convenient that the same python implementation can work for both. I have a few minor suggestions / questions. I think the only concerning thing is that we need to restrict group sizes to things that will fit in a single batch. Do you think this will be a problem for your use cases?

python/pyarrow/src/arrow/python/udf.cc Outdated Show resolved Hide resolved
python/pyarrow/src/arrow/python/udf.cc Outdated Show resolved Hide resolved
python/pyarrow/src/arrow/python/udf.cc Outdated Show resolved Hide resolved
const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
DCHECK_EQ(groups_array_data.offset, 0);
int64_t batch_num_values = groups_array_data.length;
const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just groups_array_data.GetValues<uint32_t>(1);?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - updated

}

num_values += other.num_values;
return Status::OK();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does num_groups need to be updated here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think num_groups need to be updated here. Reasoning:

From the code in https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/kernels/hash_aggregate.cc#L233 and https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/groupby_aggregate_node.cc#L248
(1) Other hash kernel implementation updates the num_groups in Resize
(2) resize is always called before consume and merge

UdfContext udf_context{ctx->memory_pool(), table->num_rows()};

if (rb->num_rows() == 0) {
return Status::Invalid("Finalized is called with empty inputs");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - I was being lazy here and didn't want to bother with empty aggregation. But now I look at this I can just return empty result here. Will update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated


ARROW_ASSIGN_OR_RAISE(auto table,
arrow::Table::FromRecordBatches(input_schema, values));
ARROW_ASSIGN_OR_RAISE(auto rb, table->CombineChunksToBatch(ctx->memory_pool()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some cases where this won't be possible. For example, if you have an array of strings then the array may only have 2GB of string data (regardless of how many elements it has). So any single group can't have more than 2GB of string data. I don't know this is fatal but you may want to mention in the user docs somewhere or wrap this failure with extra context.

Copy link
Contributor Author

@icexelloss icexelloss Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python/pyarrow/src/arrow/python/udf.cc Outdated Show resolved Hide resolved
@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Jun 24, 2023
Co-authored-by: Weston Pace <weston.pace@gmail.com>
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Jun 26, 2023
@icexelloss
Copy link
Contributor Author

I think the only concerning thing is that we need to restrict group sizes to things that will fit in a single batch. Do you think this will be a problem for your use cases?

Thanks @westonpace. Currently we only plan to use this with segmented aggregation so each group is not going to be very large. (grouping inside a segment), so I don't think it would be a problem.

@github-actions github-actions bot removed the awaiting change review Awaiting change review label Jun 26, 2023
@github-actions github-actions bot added the awaiting changes Awaiting changes label Jun 26, 2023
@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Jun 26, 2023
@icexelloss
Copy link
Contributor Author

icexelloss commented Jun 26, 2023

@westonpace This should be clean now (all comments addressed, CI green) - another look?

@icexelloss
Copy link
Contributor Author

Gentle ping @westonpace anything else you want me to change here?

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor wording suggestion. Otherwise this looks good.

python/pyarrow/_compute.pyx Outdated Show resolved Hide resolved
std::vector<std::shared_ptr<DataType>> input_types,
std::shared_ptr<DataType> output_type)
: function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
Py_INCREF(function->obj());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These INCREF's still seem superfluous to me but I don't think it's critical. We could test in a follow-up using temporary function registries to see if we are preventing UDF functions from being garbage collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. I plan to address this in #36000 but haven't got to it.

@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting changes Awaiting changes labels Jun 28, 2023
Co-authored-by: Weston Pace <weston.pace@gmail.com>
@icexelloss
Copy link
Contributor Author

Thanks @westonpace. I applied your suggestion and will merge once CI passes.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting merge Awaiting merge labels Jun 28, 2023
@icexelloss
Copy link
Contributor Author

CI failure is unrelated. Merging.

@icexelloss icexelloss merged commit baf17a2 into apache:main Jun 29, 2023
@conbench-apache-arrow
Copy link

Conbench analyzed the 6 benchmark runs on commit baf17a20.

There was 1 benchmark result with an error:

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++][Python] Non decomposable aggregation UDF (Hash version)
2 participants