Skip to content

Commit

Permalink
Address comments and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
icexelloss committed Nov 25, 2019
1 parent f9af69a commit 700a22a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 34 deletions.
23 changes: 19 additions & 4 deletions benchmarks/benchmarks.py
Expand Up @@ -203,10 +203,10 @@ def setup(self):
def my_mean(series):
return series.mean()

self.low_card_grouped_rolling_udf = my_mean(t.value).over(
self.low_card_grouped_rolling_udf_mean = my_mean(t.value).over(
low_card_rolling_window
)
self.high_card_grouped_rolling_udf = my_mean(t.value).over(
self.high_card_grouped_rolling_udf_mean = my_mean(t.value).over(
high_card_rolling_window
)

Expand All @@ -225,6 +225,18 @@ def my_zscore(series):
high_card_window
)

@udf.reduction(['double', 'double'], 'double')
def my_wm(v, w):
return np.average(v, weights=w)

self.low_card_grouped_rolling_udf_wm = my_wm(t.value, t.value).over(
low_card_rolling_window
)

self.high_card_grouped_rolling_udf_wm = my_wm(t.value, t.value).over(
low_card_rolling_window
)

def time_high_cardinality_group_by(self):
self.high_card_group_by.execute()

Expand Down Expand Up @@ -264,5 +276,8 @@ def time_high_card_grouped_rolling_udf(self):
def time_low_card_window_analytics_udf(self):
self.low_card_window_analytics_udf.execute()

def time_high_card_window_analytics_udf(self):
self.high_card_window_analytics_udf.execute()
def time_high_card_grouped_rolling_udf_wm(self):
self.high_card_grouped_rolling_udf_wm.execute()

def time_low_card_grouped_rolling_udf_wm(self):
self.low_card_grouped_rolling_udf_wm.execute()
15 changes: 3 additions & 12 deletions ibis/pandas/aggcontext.py
Expand Up @@ -218,11 +218,9 @@
import functools
import itertools
import operator
import warnings

import numpy as np
import pandas as pd
from pandas import Series
from pandas.core.groupby import SeriesGroupBy

import ibis
Expand Down Expand Up @@ -365,7 +363,6 @@ def agg(self, grouped_data, function, *args, **kwargs):
else:
# do mostly the same thing as if we did NOT have a grouping key,
# but don't call the callable just yet. See below where we call it.

if callable(function):
method = operator.methodcaller(
'apply', make_applied_function(function, args, kwargs)
Expand Down Expand Up @@ -435,9 +432,7 @@ def create_input_gen(grouped_series, window_size):
drop=True
)
mask = ~(raw_window_size.isna())
window_size = raw_window_size[~(raw_window_size.isna())].astype(
'i8'
)
window_size = raw_window_size[mask].astype('i8')
window_size_array = window_size.values

# If there is no args, then the UDF only takes a single
Expand All @@ -449,7 +444,7 @@ def create_input_gen(grouped_series, window_size):

input_gens = list(
create_input_gen(arg, window_size)
if isinstance(arg, (Series, SeriesGroupBy))
if isinstance(arg, (pd.Series, SeriesGroupBy))
else itertools.repeat(arg)
for arg in inputs
)
Expand All @@ -465,11 +460,7 @@ def create_input_gen(grouped_series, window_size):
result[mask] = valid_result
result.index = obj.index
else:
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message=".+raw=True.+", category=FutureWarning
)
result = method(windowed)
result = method(windowed)
index = result.index
result.index = pd.MultiIndex.from_arrays(
[frame.index]
Expand Down
25 changes: 14 additions & 11 deletions ibis/pandas/tests/test_udf.py
@@ -1,3 +1,5 @@
import collections

import numpy as np
import pandas as pd
import pandas.util.testing as tm
Expand Down Expand Up @@ -273,18 +275,19 @@ def test_udaf_window():


def test_udaf_window_interval():
@udf.reduction(['double'], 'double')
def my_mean(series):
return series.mean()

df = pd.DataFrame(
{
"time": pd.date_range(
start='20190105', end='20190101', freq='-1D'
),
"key": [1, 2, 1, 2, 1],
"value": np.arange(5),
}
collections.OrderedDict(
[
(
"time",
pd.date_range(
start='20190105', end='20190101', freq='-1D'
),
),
("key", [1, 2, 1, 2, 1]),
("value", np.arange(5)),
]
)
)

con = ibis.pandas.connect({'df': df})
Expand Down
35 changes: 28 additions & 7 deletions ibis/pandas/udf.py
Expand Up @@ -101,6 +101,29 @@ def arguments_from_signature(signature, *args, **kwargs):
return args, new_kwargs


def create_gens_from_args_groupby(args):
""" Create generators for each args for groupby udaf.
If the arg is SeriesGroupBy, return a generator that outputs each group.
If the arg is not SeriesGroupBy, return a generator that repeats the arg.
Parameters
----------
args : Tuple[object...]
Returns
-------
Tuple[Generator]
"""
iters = (
(data for _, data in arg)
if isinstance(arg, SeriesGroupBy)
else itertools.repeat(arg)
for arg in args
)
return iters


@rule_to_python_type.register(dt.Array)
def array_rule(rule):
return (list,)
Expand Down Expand Up @@ -531,21 +554,19 @@ def execute_udaf_node_groupby(op, *args, **kwargs):
#
# If the argument is not a SeriesGroupBy then keep
# repeating it until all groups are exhausted.

aggcontext = kwargs.pop('aggcontext', None)
assert aggcontext is not None, 'aggcontext is None'

if isinstance(aggcontext, Window):
# Call the func differently for Window because of
# the custom rolling logic.
result = aggcontext.agg(args[0], func, *args, **kwargs)
else:
iters = (
(data for _, data in arg)
if isinstance(arg, SeriesGroupBy)
else itertools.repeat(arg)
for arg in args[1:]
)
iters = create_gens_from_args_groupby(args[1:])
funcsig = signature(func)

# TODO: Unify calling convension here to be more like
# window
def aggregator(first, *rest, **kwargs):
# map(next, *rest) gets the inputs for the next group
# TODO: might be inefficient to do this on every call
Expand Down

0 comments on commit 700a22a

Please sign in to comment.