Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Fix reduction UDFs over ungrouped, bounded windows on Pandas backend #2395

Merged

Conversation

timothydijamco
Copy link
Contributor

@timothydijamco timothydijamco commented Sep 17, 2020

Overview

Currently, trying to apply a reduction UDF over an ungrouped, bounded window on the Pandas backend will hit an error.

This is because the execution rule for reduction UDFs (over an ungrouped aggregation context) doesn't handle the case that the aggregation context could be a bounded window (assumes it is unbounded).

This PR adds logic to handle this case.

Edit: Incidental improvements

This PR also ended up making some improvements to aggregation contexts (ibis/pandas/aggcontext.py) and how they are used to execute reduction/analytic UDFs (ibis/pandas/udf.py) to simplify things and increase consistency:

  1. Removed branching in execute_udaf_node_no_groupby (now simply calling aggcontext.agg regardless of the specific type of AggregationContext)
  2. Because of (1), we are using aggcontext.agg in more cases. This surfaced an error, which I fixed by no longer passing kwargs to aggcontext.agg (in execute_udaf_node_no_groupby and execute_udaf_node_groupby).
    This is because reduction/analytic UDFs already have kwargs applied when they were created. Trying to pass kwargs to the functions again leads to an error (~unexpected keyword argument) in some cases.
  3. No longer pass duplicate columns to aggcontext.agg.
    In other words, always do something like this:
    aggcontext.agg(args[0], func, *args[1:], **kwargs)
    rather than this:
    aggcontext.agg(args[0], func, *args, **kwargs)

Example

Code

import pandas as pd
import ibis
from ibis.udf.vectorized import reduction

client = ibis.pandas.connect({'test_table': df})

table = client.table('test_table')

@reduction(
    input_type=['double'],
    output_type='double'
)
def my_mean_udf(col):
    return col.mean()

# Ungrouped, bounded window
win = ibis.window(
    following=0,
    order_by=table.id
)

expr = table.mutate(
    val=my_mean_udf(table.value_col).over(win)
)

result = expr.execute()

print(result.to_markdown())

Output

Before

...

~/ibis/ibis/pandas/execution/window.py in execute_window_op(op, data, window, scope, timecontext, aggcontext, clients, **kwargs)
    356         **kwargs,
    357     )
--> 358     series = post_process(result, data, ordering_keys, grouping_keys)
    359     assert len(data) == len(
    360         series

~/ibis/ibis/pandas/execution/window.py in _post_process_order_by(series, parent, order_by, group_by)
     67     if len(names) > 1:
     68         series = series.reorder_levels(names)
---> 69     series = series.iloc[index.argsort(kind='mergesort')]
     70     return series
     71 

AttributeError: 'numpy.float64' object has no attribute 'iloc'

After

id value_col val
0 0 4 4
1 1 1 2.5
2 2 2 2.33333
3 3 3 2.5

No error, and the result is correct

Testing

I added a new test for ungrouped, bounded windows in test_window.py.

Readability changes

I also renamed the tests in test_window.py to make it more clear which variant of windows each test is handling. This is what the list of tests in test_window.py now look like:

  • test_grouped_bounded_expanding_window
  • test_ungrouped_bounded_expanding_window (new in this PR)
  • test_grouped_bounded_following_window
  • test_grouped_bounded_preceding_windows
  • test_grouped_unbounded_window
  • test_ungrouped_unbounded_window

I'm open to any suggestions to try to simplify this, since test_window.py keeps growing. I've found this difficult to simplify further because there are so many combinations of windows and operations over windows (several dimensions: grouped/ungrouped, bounded/unbounded, ordered/unordered, reduction/analytic, udf/non-udf). To make things worse, specific permutations may need to be xfailed on certain backends, so there's a minimum amount explicitness required for test parameters (to let us xfail very specific permutations).

@jreback jreback added window functions Issues or PRs related to window functions pandas The pandas backend labels Sep 17, 2020
@jreback jreback added this to the Next Bugfix Release milestone Sep 17, 2020
Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

ibis/pandas/udf.py Outdated Show resolved Hide resolved
ibis/pandas/udf.py Outdated Show resolved Hide resolved
ibis/pandas/udf.py Outdated Show resolved Hide resolved
Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed one around

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. +1

ibis/tests/all/test_window.py Outdated Show resolved Hide resolved
@icexelloss
Copy link
Contributor

Sorry revoked my approved. There is a small typo otherwise LGTM.

@timothydijamco
Copy link
Contributor Author

timothydijamco commented Sep 17, 2020

I updated the comments to be more accurate and also a little more concise. I also updated when an AssertionError is thrown to be more accurate. These changes are mostly because I realized that the general area of code I am changing in ibis/pandas/udf.py is also used by the Aggregation node (not just windows), so I've made the comments and logic reflect that now

I also fixed the typo in test_window.py

@timothydijamco
Copy link
Contributor Author

CI is failing--looks like there is a use case of creating a custom aggcontext (link). Let me look into how to handle this

@timothydijamco
Copy link
Contributor Author

OK, to handle custom AggregationContexts, I removed the AssertionError that I was raising when the type of the AggregationContext is unknown. In place of raising an error, I am now just calling the custom AggregationContext's agg method.

Note:

For the most part, this is exactly how the execution rules handled custom AggregationContexts before this PR.

The only difference is that before this PR, when handling a grouped aggregation, the execution rule would pre-process the data. Below is what the execution rule did before this PR in the case that the AggregationContext type is Summarize, Transform, or a custom AggregationContext:

ibis/ibis/pandas/udf.py

Lines 242 to 252 in 899804c

else:
iters = create_gens_from_args_groupby(args[1:])
# TODO: Unify calling convension here to be more like
# window
def aggregator(first, *rest, **kwargs):
# map(next, *rest) gets the inputs for the next group
# TODO: might be inefficient to do this on every call
return func(first, *map(next, rest))
result = aggcontext.agg(args[0], aggregator, *iters, **kwargs)

In this PR, I'm changing this so that custom AggregationContexts don't get this pre-processing. This is what the execution rule for custom AggregationContexts looks like in this PR:

ibis/ibis/pandas/udf.py

Lines 283 to 286 in 7239cd6

else:
# We must be aggregating over a custom AggregationContext. We'll
# call its .agg method and let it handle any further custom logic.
return aggcontext.agg(args[0], func, *args, **kwargs)

I believe this is more proper. I don't think we should pre-process any data passed to custom AggregationContexts, because custom AggregationContexts can/should be written to do any processing it needs

@jreback
Copy link
Contributor

jreback commented Sep 21, 2020

@icexelloss if you'd have a look / approve when ready. ignore the 1 failing part of the CI for now.

ibis/pandas/udf.py Outdated Show resolved Hide resolved
ibis/pandas/udf.py Outdated Show resolved Hide resolved
ibis/pandas/udf.py Outdated Show resolved Hide resolved
@timothydijamco
Copy link
Contributor Author

timothydijamco commented Sep 23, 2020

Changes:

  1. Removed branching in execute_udaf_node_no_groupby (now simply calling aggcontext.agg regardless of the specific type of AggregationContext)
  2. Because of (1), we are using aggcontext.agg in more cases. This surfaced an error, which I fixed by no longer passing kwargs to aggcontext.agg (in execute_udaf_node_no_groupby and execute_udaf_node_groupby).
    This is because reduction/analytic UDFs already have kwargs applied when they were created. Trying to pass kwargs to the functions again leads to an error (~unexpected keyword argument) in some cases.
  3. No longer pass duplicate columns to aggcontext.agg.
    In other words, always do something like this:
    aggcontext.agg(args[0], func, *args[1:], **kwargs)
    rather than this:
    aggcontext.agg(args[0], func, *args, **kwargs)

ibis/pandas/udf.py Outdated Show resolved Hide resolved
@icexelloss
Copy link
Contributor

@timothydijamco Can you also add #2395 (comment) to the overview section?

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. +1

@jreback jreback merged commit 4c14a3b into ibis-project:master Sep 25, 2020
@jreback
Copy link
Contributor

jreback commented Sep 25, 2020

thanks @timothydijamco

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pandas The pandas backend window functions Issues or PRs related to window functions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants