Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Allow more flexible return type for UDFs #2776

Merged
merged 14 commits into from
May 18, 2021

Conversation

emilyreff7
Copy link
Contributor

@emilyreff7 emilyreff7 commented May 12, 2021

Overview

This PR adds support for more flexible return types of single- and multi-column UDFs.

Proposed Changes

Before this PR:

Supported return types for single column UDF: pd.Series
Supported return types for multi column UDF: Tuple[pd.Series]
Supported return types for multi column aggregation UDF: tuple of scalar

After this PR:

Supported return types for single column UDF: pd.Series, list, np.ndarray
Supported return types for multi column UDF: Tuple[pd.Series], List[pd.Series], Tuple[np.ndarray], List[np.ndarray]
Supported return types for multi column aggregation UDF: tuple of scalar, list of scalar, np.ndarray of scalar

The proposed return types make the UDF API a bit more flexible for the user, and are also non-ambiguous when the return type of the UDF is not an Array type.

Tests

Added tests in test_vectorized_udf.py to exercise the above return types for elementwise, analytic, and reduction UDFs.

@emilyreff7 emilyreff7 changed the title Allow more flexible return type for UDFs ENH: Allow more flexible return type for UDFs May 12, 2021
def demean_struct_tuple_np_array(v, w):
assert isinstance(v, pd.Series)
assert isinstance(w, pd.Series)
return np.array(v - v.mean()), np.array(w - w.mean())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too hard to parameterized the return shape / type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean like define a single UDF and then have the test cases pass in the return type / shape (e.g. 'tuple', 'np.ndarray') and have the UDF figure out how to structure the output?

Copy link
Contributor

@icexelloss icexelloss May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like generating different test fixture for each return shape

demean_struct_udfs = [
    create_demean_struct_udf(output_shape=np.array),
    create_demean_struct_udf(output_shape=tuple),
    create_demean_struct_udf(output_shape=pd.Series),
    create_demean_struct_udf(output_shape=pd.DataFrame)
]

Copy link
Contributor

@icexelloss icexelloss May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or sth like

demean_struct_udfs = [
create_demean_struct_udf(output_shape=lambda v1, v2: tuple(v1.values, v2.values)) # tuple of numpy array,
create_demean_struct_udf(output_shape=lambda v1, v2: pd.DataFrame({'v1': v1, 'v2': v1})) # dataframe with matching names 
]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After trying this out I think there are pros and cons of this approach - the pros are that the list (e.g. demean_struct_udfs) makes it very clear what is the set of UDFs and their different return types that we're testing, and would be easy to add to in the future. The con is that in order for create_demean_struct_udf to return a function that is decorated with the UDF annotation and then wrapped in the lambda that formats the result, we change the signature of the original UDF to be a regular function, which then affects other tests that reuse these UDFs - e.g. they also have to call the create function to use them.

result = self.func(*args, **kwargs)

if isinstance(self.output_type, dt.Struct):
if isinstance(result, pd.DataFrame) or (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have function "coerce_struct_output" to capture this logic here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why don't we do

if isinstance(self.output_type, dt.Struct):
    result = coerce_to_dataframe(result, self.output_type.names)
else:
    # coerce non struct output to standard form

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I can encapsulate some of this logic to make it cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before encapsulate, maybe explain first why checking array is needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if the intended return type of the UDF is an array, then we would mis-shape the result data here by coercing to DataFrame or Series.

For example, if result is
[np.array(1, 2, 3), np.array(4, 5, 6)] with column names 'A' and 'B'
and the output type of my UDF is Struct(Array, Array),
then what I want to return in my result is something like:

index       A           B
0       [1, 2, 3]   [4, 5, 6]

Without this check, we would instead return

index         A            B
0             1            4
1             2            5
2             3            6

The same is true for single-col UDFs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass the output type into coerce_to_dataframe to do the right thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately if we check the type directly inside this function, we get a circular dependency:

ibis/__init__.py:5: in <module>
    import ibis.expr.types as ir
ibis/expr/types.py:11: in <module>
    import ibis.util as util
ibis/util.py:27: in <module>
    import ibis.expr.datatypes as dt
ibis/expr/datatypes.py:146: in <module>
    class Null(DataType):
ibis/expr/datatypes.py:147: in Null
    scalar = ir.NullScalar
E   AttributeError: module 'ibis.expr.types' has no attribute 'NullScalar'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoided the circular dependency by moving this function into schema.py

ibis/udf/vectorized.py Outdated Show resolved Hide resolved
@@ -290,11 +397,13 @@ def foo1(v, amount):
return v + 1


@pytest.mark.only_on_backends(['pandas', 'pyspark', 'dask'])
@pytest.mark.only_on_backends(['pandas', 'pyspark'])
@pytest.mark.xfail_backends(['dask'])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we already have an existing issue for the dask destructure refactoring work that would be required here? @gerrymanoim

)(_format_struct_udf_return_type(demean_struct, result_formatter))


demean_struct_udfs = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think DataFrames with matching names are also easy to support?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps also add a test for mismatching names (error out)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on our implementation I believe we already support this: https://github.com/ibis-project/ibis/blob/master/ibis/util.py#L119

Actually if demean_struct returns a 2-column pd.DataFrame, the test will currently pass whether or not the returned df has matching names because it just sets the names at the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see so if there is no matching names, we will just match by position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, based on the current implementation, coerce_to_dataframe just cares if the result DataFrame has the correct number of columns based on output_type.names.

)(_format_struct_udf_return_type(mean_struct, result_formatter))


mean_struct_udfs = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Series can also be supported here I think:

pd.Series([v1, v2])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to include a test for this as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm maybe this isn't directly supported in the dask backend, will have to take a look.

@jreback jreback added feature Features or general enhancements udf Issues related to user-defined functions labels May 17, 2021
Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments, also pls rebase

ibis/backends/tests/test_vectorized_udf.py Outdated Show resolved Hide resolved
ibis/expr/schema.py Outdated Show resolved Hide resolved
# Promote scalar to Series
result = pd.concat([pd.Series([v]) for v in data], axis=1)
else:
raise ValueError(f"Cannot coerce to DataFrame: {data}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test hits this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just a scalar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if we don't add np.ndarray here (which I think we might as well, as you suggest below), a test that returns np.ndarray of np.ndarray would hit this ValueError. Other than that, I'm not sure realistically what other non-0-len type outside of DataFrame, Series, list, tuple, np.ndarray could make it through here given that this function is only called on multi-column UDFs. I don't believe there are any unit tests for this function but I could add one.

ibis/expr/schema.py Outdated Show resolved Hide resolved
ibis/udf/vectorized.py Outdated Show resolved Hide resolved

if isinstance(self.output_type, dt.Struct):
if isinstance(result, pd.DataFrame) or (
isinstance(result[0], (pd.Series, list, np.ndarray))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does a 0-len return type hit here? (it would error), tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionally this should never happen because it's guarded by

if isinstance(self.output_type, dt.Struct)

which by definition means the output should not be 0-len.

elif isinstance(expr, (ir.DestructScalar, ir.StructScalar)):
# Here there are two cases, if this is groupby aggregate,
# then the result e a Series of tuple/list, or
# if this is non grouped aggregate, then the result
return ibis.util.coerce_to_dataframe(result, expr.type().names)
return coerce_to_dataframe(
result, expr.type().names, expr.type().types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just passing expr.type()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep can do that, I saw that dask defines a similar function so wanted to be consistent with it, but probably doesn't matter too much: https://github.com/ibis-project/ibis/blob/master/ibis/backends/dask/execution/util.py#L153

@functools.wraps(func)
def wrapped(*args, **kwargs):
result = func(*args, **kwargs)
return coerce_to_dataframe(result, output_cols)
return coerce_to_dataframe(result, output_cols, output_types)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to coerce for struct scalar?

@jreback jreback added this to the Next release milestone May 17, 2021
Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emilyreff7 can you also add a release note (also let's create a followon issue for updating the docs proper for all the fun things udfs can do now)

Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -269,7 +269,7 @@ def aggregator(first, *rest):
# because this is the inner loop and we do not want
# to wrap a scalar value with a series.
if isinstance(op._output_type, dt.Struct):
return coerce_to_dataframe(result, op._output_type.names)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this since coercion happens in the udf wrapper?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for the pyspark wrapper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed both

return coerce_to_series
# Case 4: Array output type, reduction UDF -> coerce to np.ndarray
elif isinstance(self.output_type, dt.Array):
return coerce_to_np_array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have test to cover this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this test hits it: https://github.com/ibis-project/ibis/blob/master/ibis/backends/dask/tests/test_udf.py#L487
I can add a more general one for all backends in test_vectorized_udf.py

@jreback jreback merged commit b46c60b into ibis-project:master May 18, 2021
@jreback
Copy link
Contributor

jreback commented May 18, 2021

thanks @emilyreff7 very nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Features or general enhancements udf Issues related to user-defined functions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants