Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Support multi args window UDF for pandas backend #2035

Merged
merged 16 commits into from
Dec 10, 2019

Conversation

icexelloss
Copy link
Contributor

@icexelloss icexelloss commented Nov 19, 2019

This PR addresses issue #1998

Currently, when using UDAF with rolling window, pandas backend will throw an exception:

import ibis
import pandas as pd
import numpy as np

from ibis.pandas.udf import udf
import ibis.expr.datatypes as dt

client = ibis.pandas.connect({'table': pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6], 'key': ['a', 'a', 'a']})})
t = client.table('table')
w = ibis.trailing_window(preceding=1, order_by='key', group_by='key')
#w = ibis.window(group_by='key')


@udf.reduction(input_type=[dt.double, dt.double], output_type=dt.double)
def my_average(v, w):
    return np.average(v, weights=w)

t = t.mutate(new_col=my_average(t.a, t.b).over(w))

t.execute()

With this PR, it will support this use case.

The main idea of this PR is that instead of using groupby().rolling().apply(func) to compute the result, we use groupby().rolling().apply(len, raw=True) to get the size of each window, and then manually apply func to each window in a Python for-loop. This way, we work around issue that groupby().rolling().apply(func) can only take function that apply on a single series.

Benchmarks

Before the change:

In [4]: %time pandas.time_low_card_window_analytics_udf()                                                                                                     
CPU times: user 898 ms, sys: 248 ms, total: 1.15 s
Wall time: 1.15 s

In [5]: %time pandas.time_high_card_window_analytics_udf()                                                                                                    
CPU times: user 12.4 s, sys: 324 ms, total: 12.7 s
Wall time: 12.8 s

In [6]: %time pandas.time_low_card_grouped_rolling()                                                                                                          
CPU times: user 7.99 s, sys: 1.95 s, total: 9.94 s
Wall time: 10.1 s

In [7]: %time pandas.time_high_card_grouped_rolling()                                                                                                         
CPU times: user 15.3 s, sys: 1.83 s, total: 17.1 s
Wall time: 17.3 s

In [8]: %time pandas.time_low_card_grouped_rolling_udf()                                                                                                      
CPU times: user 2min 1s, sys: 1.96 s, total: 2min 3s
Wall time: 2min 4s

In [9]: %time pandas.time_high_card_grouped_rolling_udf()                                                                                                     
CPU times: user 1min 41s, sys: 2.17 s, total: 1min 43s
Wall time: 1min 43s

After the change:

time pandas.time_low_card_window_analytics_udf()
CPU times: user 856 ms, sys: 235 ms, total: 1.09 s
Wall time: 1.08 s
time pandas.time_high_card_window_analytics_udf()
CPU times: user 12.7 s, sys: 316 ms, total: 13 s
Wall time: 13.1 s
time pandas.time_low_card_grouped_rolling()
CPU times: user 9.3 s, sys: 3.15 s, total: 12.4 s
Wall time: 13 s
time pandas.time_high_card_grouped_rolling()
CPU times: user 15.5 s, sys: 1.9 s, total: 17.4 s
Wall time: 17.5 s
time pandas.time_low_card_grouped_rolling_udf()
CPU times: user 1min 26s, sys: 1.74 s, total: 1min 27s
Wall time: 1min 28s
time pandas.time_high_card_grouped_rolling_udf()
CPU times: user 1min 4s, sys: 1.79 s, total: 1min 6s
Wall time: 1min 6s

@icexelloss
Copy link
Contributor Author

cc @jreback @toryhaavik

with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message=".+raw=True.+", category=FutureWarning
# get the DataFrame from which the operand originated
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the original logic for built-in aggregation functions.

def test_udaf_window_multi_params():
@udf.reduction(['double', 'double'], 'double')
def my_wm(v, w):
print("v")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these

@@ -530,25 +531,33 @@ def execute_udaf_node_groupby(op, *args, **kwargs):
#
# If the argument is not a SeriesGroupBy then keep
# repeating it until all groups are exhausted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove white space

@@ -361,6 +364,7 @@ def agg(self, grouped_data, function, *args, **kwargs):
else:
# do mostly the same thing as if we did NOT have a grouping key,
# but don't call the callable just yet. See below where we call it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove white space

@@ -52,15 +52,8 @@ def test_array_collect(t, df):
tm.assert_frame_equal(result, expected)


@pytest.mark.xfail(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now supported as well

Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments

import pandas as pd
from pandas import Series
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob can just use of.Series

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

# create a generator for each input series
# the generator will yield a slice of the
# input series for each valid window
data = getattr(grouped_series, 'obj', grouped_series).values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn’t use .values here a that coerces to a ndarray

rather leave as a Series and use .iloc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I choose to do ndarray here because:

  • This preserve the same udf API as before.
  • Passing Series is about 15x slower than ndarray

I don't want to introduce API change and performance regression in this PR. I think we can have a separate chat whether window UDF should take ndarray or Series.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn’t show any perf issues must be something odd going on

does it currently take ndarray?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it currently take ndarray (By using raw=True I think)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok can you create an issue to fix this, meaning to use iloc. .values is not type preserving and generally a bad idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created: #2045

inputs = args if len(args) > 0 else [grouped_data]

input_gens = list(
create_input_gen(arg, window_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are you trying to do here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am creating generators for each inputs so later I can just call next(input). This hides the details of how next is implemented and unifies how we send data inputs and arg inputs to the user function.

result[mask] = valid_result
result.index = obj.index
else:
with warnings.catch_warnings():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would move these out to 2 module level functions rather than nesting like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is not needed anymore because we don't use raw=True - it was used for UDF and now UDF is handled separately.

ibis/pandas/execution/tests/test_arrays.py Show resolved Hide resolved
@@ -271,6 +272,87 @@ def my_mean(series):
tm.assert_frame_equal(result, expected)


def test_udaf_window_interval():
@udf.reduction(['double'], 'double')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn’t this defined above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

# the custom rolling logic.
result = aggcontext.agg(args[0], func, *args, **kwargs)
else:
iters = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would move this to a module level function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ibis/pandas/udf.py Show resolved Hide resolved
@icexelloss icexelloss force-pushed the pandas-backend-multi-args-udf branch 2 times, most recently from 700a22a to ee64a17 Compare November 25, 2019 19:13
@jreback jreback added the feature Features or general enhancements label Nov 27, 2019
@jreback jreback added this to the Next Feature Release milestone Nov 27, 2019
# create a generator for each input series
# the generator will yield a slice of the
# input series for each valid window
data = getattr(grouped_series, 'obj', grouped_series).values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok can you create an issue to fix this, meaning to use iloc. .values is not type preserving and generally a bad idea.

raw_window_size = windowed.apply(len, raw=True).reset_index(
drop=True
)
mask = ~(raw_window_size.isna())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls make this a separate function, this is too hard to grok inline here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -271,6 +274,88 @@ def my_mean(series):
tm.assert_frame_equal(result, expected)


def test_udaf_window_interval():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are all of the window udf tests here or in test_window?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all winndow udf tests are here

@@ -100,6 +101,29 @@ def arguments_from_signature(signature, *args, **kwargs):
return args, new_kwargs


def create_gens_from_args_groupby(args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you type args

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

""" Create generators for each args for groupby udaf.

If the arg is SeriesGroupBy, return a generator that outputs each group.
If the arg is not SeriesGroupBy, return a generator that repeats the arg.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what else could this be? can you type it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved docstring

ibis/pandas/udf.py Show resolved Hide resolved
)

valid_result = pd.Series(valid_result)
valid_result.index = window_size.index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you testing the output indexes are correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes there are tests (test_udaf_window_interval) that cover out of order indices and make sure the output is correct


valid_result = pd.Series(valid_result)
valid_result.index = window_size.index
result = pd.Series(np.repeat(None, len(obj)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really strange to do, what are you trying?

result = pd.Series(np.repeat(None, len(obj)))
result[mask] = valid_result
result.index = obj.index
else:
result = method(windowed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some commments on what is this case (again would be much better to split these out to free functions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have separated this into separate functions

@jreback
Copy link
Contributor

jreback commented Dec 3, 2019

if you'd rebase can look again

@icexelloss
Copy link
Contributor Author

@jreback Haven't addressed all comments. Will ping again when it's done.

@icexelloss icexelloss changed the title ENH: Support multi param window UDF for pandas backend ENH: Support multi arg window UDF for pandas backend Dec 4, 2019
@icexelloss icexelloss changed the title ENH: Support multi arg window UDF for pandas backend ENH: Support multi args window UDF for pandas backend Dec 4, 2019
@@ -326,6 +327,83 @@ def __init__(self, kind, *args, **kwargs):
)
self.construct_window = operator.methodcaller(kind, *args, **kwargs)

def _agg_built_in(self, frame, windowed, function, *args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make both of these module level functions (don't pass self, I think you can just pass max_lookback as as kord arg).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that these methods are very particular to Window aggregation context and therefore probably belongs in the class. I curious why module level functions are better here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Moved to module level.

@icexelloss icexelloss added the pandas The pandas backend label Dec 4, 2019
Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good. can you add a release note (likely point to a new doc section) & a doc section (can be a followup PR).

@@ -313,6 +314,92 @@ def compute_window_spec_interval(_, expr):
return pd.tseries.frequencies.to_offset(value)


def _window_agg_built_in(
frame, windowed, function, max_lookback, *args, **kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally if you can type these arguments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ibis/pandas/aggcontext.py Show resolved Hide resolved
ibis/pandas/aggcontext.py Show resolved Hide resolved


def _window_agg_udf(
grouped_data, windowed, function, dtype, max_lookback, *args, **kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you type this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

):
"""Apply window aggregation with UDFs.
"""
# Use custom logic to computing rolling window UDF instead of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include this in the doc-string under Notes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# This is because pandas's rolling function doesn't support
# multi param UDFs.

def create_input_gen(grouped_series, window_size):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

obj = getattr(grouped_data, 'obj', grouped_data)
name = obj.name
if frame[name] is not obj:
name = "{}_{}".format(name, ibis.util.guid())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use an fstring here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Donne

# TODO: see if we can do this in the caller, when the context
# is constructed rather than pulling out the data
columns = group_by + order_by + [name]
indexed_by_ordering = frame.loc[:, columns].set_index(order_by)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use frame[columns].set_index(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done



def test_udaf_window_multi_args():
@udf.reduction(['double', 'double'], 'double')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the issue number as a comment (or this PR number)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

name = obj.name
if frame[name] is not obj:
name = f"{name}_{ibis.util.guid()}"
frame[name] = obj
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this like

frame = frame.assign(name=obj)

to avoid mutating the input

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you are just copying original code (but this should change anyhow)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ibis/pandas/aggcontext.py Show resolved Hide resolved
}
)
con = ibis.pandas.connect({'df': df})
t = con.table('df')
window = ibis.trailing_window(2, order_by='a', group_by='key')
expr = t.mutate(rolled=my_mean(t.b).over(window))
expr = t.mutate(
wm_b=my_wm(t.b, t.d).over(window), wm_c=my_wm(t.c, t.d).over(window)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this will work if we use different windows? can you add a test for that, or if it doesn't work can you test the exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@@ -225,49 +250,107 @@ def my_corr2(lhs, **kwargs):
pass


def test_compose_udfs():
def test_compose_udfs(t2, df2):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the udaf raises an exception? are these caught in a reasonable way? just asking for a test (not even sure what this should do)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_udf_error

Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. minor comment & can you add a note in release.rst? ping on green.

**kwargs,
)
try:
return result.astype(self.dtype, copy=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to add a comment on when this can fail

@jreback jreback merged commit a95e32f into ibis-project:master Dec 10, 2019
@jreback
Copy link
Contributor

jreback commented Dec 10, 2019

thanks @icexelloss very nice patch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Features or general enhancements pandas The pandas backend
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants