Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Add destructure support for analtyic and reduction UDF #2487

Merged

Conversation

icexelloss
Copy link
Contributor

@icexelloss icexelloss commented Oct 21, 2020

What is this change

This PR is a follow up of #2473. In this PR, I added the support for destructure for analytics and reduction UDFs:

  • Support for groupby aggregation UDF
  • Support for window analytic UDF (unbounded window)
  • Support for window aggregation UDF (moving window)

Example:

  • groupby aggregation UDF
    Define and use an UDF with multiple return columns:                                                                                                                                                     
                                                                                                                                                                                                            
    >>> @reduction(                                                                                                                                                                                         
    ...     input_type=[dt.double],                                                                                                                                                                         
    ...     output_type=dt.Struct(['mean', 'std'], [dt.double, dt.double])                                                                                                                                  
    ... )                                                                                                                                                                                                   
    ... def mean_and_std(v):                                                                                                                                                                                
    ...     return v.mean(), v.std()                                                                                                                                                                        
    >>>                                                                                                                                                                                                     
    >>> # create aggregation columns "mean" and "std"                                                                                                                                                       
    >>> table = table.groupby('key').aggregate(                                                                                                                                                             
    ...     mean_and_std(table['v']).destructure()                                                                                                                                                          
    ... )
  • window analytic UDF
    >>> @analytic(                                                                                                                                                                                          
    ...     input_type=[dt.double],                                                                                                                                                                         
    ...     output_type=dt.Struct(['demean', 'zscore'], [dt.double, dt.double])                                                                                                                             
    ... )                                                                                                                                                                                                   
    ... def demean_and_zscore(v):                                                                                                                                                                           
    ...     mean = v.mean()                                                                                                                                                                                 
    ...     std = v.std()                                                                                                                                                                                   
    ...     return v - mean, (v - mean) / std                                                                                                                                                               
    >>>                                                                                                                                                                                                     
    >>> win = ibis.window(preceding=None, following=None, group_by='key')                                                                                                                                   
    >>> # add two columns "demean" and "zscore"                                                                                                                                                             
    >>> table = table.mutate(                                                                                                                                                                               
    ...     demean_and_zscore(table['v']).over(win).destructure()                                                                                                                                           
    ... )                                                                                                                                                                                                                                                                                                                                                          
  • window aggregation UDF
    >>> @reduction(                                                                                                                                                                                         
    ...     input_type=[dt.double],                                                                                                                                                                         
    ...     output_type=dt.Struct(['mean', 'std'], [dt.double, dt.double])                                                                                                                                  
    ... )                                                                                                                                                                                                   
    ... def mean_and_std(v):                                                                                                                                                                                
    ...     return v.mean(), v.std()                                                                                                                                                                        
    >>>                                                                                                                                                                                                     
    >>> # create aggregation columns "mean" and "std"                                                                                                                                                       
    >>> table = table.groupby('key').aggregate(                                                                                                                                                             
    ...     mean_and_std(table['v']).destructure()                                                                                                                                                          
    ... )
    >>> win = ibis.window(preceding=ibis.interval(hours=1), following=None, group_by='key', order_by='time')
    >>> # add two columns "mean" and "std"
    >>> table = table.mutate(                                                                                                                                                                               
    ...     mean_and_std(table['v']).over(win).destructure()                                                                                                                                           
    ... )      

Note

A lot of these changes don't work in PySpark backend because the support for struct in pandas_udf is limited (Only works in elementwise UDF from 3.0)

Tests

Added tests in test_vectorized_udf.py

@icexelloss icexelloss changed the title Add destruct support for analtyic and reduction UDF FEAT: Add destructure support for analtyic and reduction UDF Oct 21, 2020
max_lookback=None,
output_type=None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this so we can check what's the expected output_type

# Instead, we need to use "apply", which can return a non
# numeric type, e.g, tuple of two double.
if isinstance(self.output_type, dt.Struct):
return grouped_data.apply(function, *args, **kwargs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because transform cannot take a func that returns non numeric data.

@@ -863,14 +863,22 @@ class StructColumn(AnyColumn, StructValue):
pass # noqa: E701,E302


class DestructColumn(AnyColumn):
""" Class that represents a destruct column.
class DestructValue(AnyValue):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mimic the class hierarchy of Struct

@datapythonista datapythonista added the udf Issues related to user-defined functions label Oct 22, 2020
@icexelloss icexelloss added pandas The pandas backend pyspark The Apache PySpark backend labels Oct 22, 2020
for metric in op.metrics
]

if isinstance(metric, ir.DestructValue):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm can you make a single function to do this (e.g. you can still call coerce_to_dataframe), but conceptually this should be something like

pieces = [ coerce_to_output_type(metric, execute(metric, scope=scope, timecontext=timecontext, **kwargs)) for metric in op.metrics]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I made a new function coerce_to_output for this.

@@ -146,8 +147,7 @@ def compute_projection_column_expr(
name=result_name,
)
elif isinstance(expr, ir.DestructColumn):
result.columns = expr.type().names
return result
return coerce_to_dataframe(result, expr.type().names)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this can also be much cleaner if you do the above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated


if not group_by and not order_by:
aggcontext = agg_ctx.Summarize()
aggcontext = agg_ctx.Summarize(parent=parent, output_type=output_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is output_type now required (wouldn't object if it was)

Copy link
Contributor Author

@icexelloss icexelloss Oct 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not. It probably should be but currently this is created here: https://github.com/ibis-project/ibis/blob/master/ibis/backends/pandas/core.py#L207 and the expr here can be a TableExpr so it's not clear what the output type even mean in this case..

@@ -249,7 +250,21 @@ def execute_udaf_node_groupby(op, *args, aggcontext, **kwargs):
def aggregator(first, *rest):
# map(next, *rest) gets the inputs for the next group
# TODO: might be inefficient to do this on every call
return func(first, *map(next, rest))
result = func(first, *map(next, rest))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I didn't change because this is inner loop and a bit too messy to make coerce_to_output to work with this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok can you create an issue to fix this up (as your comment indicates)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Added #2496

for e in self.by + self.metrics:
names.append(e.get_name())
types.append(e.type())
if isinstance(e, ir.DestructValue):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment here for future readers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@icexelloss
Copy link
Contributor Author

@jreback Thanks! I addressed all comments. This is ready for another look.

@@ -249,7 +250,21 @@ def execute_udaf_node_groupby(op, *args, aggcontext, **kwargs):
def aggregator(first, *rest):
# map(next, *rest) gets the inputs for the next group
# TODO: might be inefficient to do this on every call
return func(first, *map(next, rest))
result = func(first, *map(next, rest))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok can you create an issue to fix this up (as your comment indicates)

@@ -73,6 +74,36 @@ def is_one_of(values: Sequence[T], t: Type[U]) -> Iterator[bool]:
all_of = toolz.compose(all, is_one_of)


def coerce_to_dataframe(data: Any, names: List[str]) -> pd.DataFrame:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is only used in pandas backend can you put it right about coerce_to_output, or even better integrate so you don't need this (e.g. just use coerce_to_output), but this could be a helper. is this possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is also used by the pyspark backend to handle struct output from UDFs, so I left it in top level (ibis.util). Do you think we should still move it to pandas backend namespace?

pd.DataFrame
"""
if isinstance(data, pd.DataFrame):
result = data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is not a great function because it sometimes returns the same object and sometimes a new one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why is it not good to return same object sometimes and new one some other time?

@jreback jreback added this to the Next Feature Release milestone Oct 26, 2020
@jreback
Copy link
Contributor

jreback commented Oct 26, 2020

needs a whatsnew note

@jreback jreback merged commit 400b079 into ibis-project:master Oct 26, 2020
@jreback
Copy link
Contributor

jreback commented Oct 26, 2020

thanks @icexelloss pls do a followon for the whatsnew note

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pandas The pandas backend pyspark The Apache PySpark backend udf Issues related to user-defined functions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants