Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Fix same window op with different window size on table lead to incorrect results for pyspark backend #2414

Merged
merged 4 commits into from
Oct 2, 2020

Conversation

LeeTZ
Copy link
Contributor

@LeeTZ LeeTZ commented Sep 30, 2020

Overview

This PR fixes #2412 where the same window op with different window size on tables will lead to incorrect results for pyspark backend.
A refactor of param window in pyspark translation is proposed.

Example

Let's say we create two windows with different sizes to calculate Mean() on the same column of a table

def test_multiple_windows(client):
    table = client.table('time_indexed_table')
    window1 = ibis.trailing_window(
        preceding=ibis.interval(hours=1), order_by='time', group_by='key'
    )
    window2 = ibis.trailing_window(
        preceding=ibis.interval(hours=2), order_by='time', group_by='key'
    )
    result = table.mutate(
        mean_1h=table['value'].mean().over(window1),
        mean_2h=table['value'].mean().over(window2),
    ).compile()
    result_pd = result.toPandas()

    df = table.compile().toPandas()
    expected_win_1 = (
        df.set_index('time')
        .groupby('key')
        .value.rolling('1h', closed='both')
        .mean()
        .rename('mean_1h')
    ).reset_index(drop=True)
    expected_win_2 = (
        df.set_index('time')
        .groupby('key')
        .value.rolling('2h', closed='both')
        .mean()
        .rename('mean_2h')
    ).reset_index(drop=True)
    tm.assert_series_equal(result_pd['mean_1h'], expected_win_1)
    tm.assert_series_equal(result_pd['mean_2h'], expected_win_2)

Currently, this will fail. The compiled result in result_pd is wrong and the two columns of different window sizes are the same.

The Problem

The reason why these two columns are the same is: the value we stored in scope is wrong. The second window is not compiled.

Here we are calculating Mean() on 2 different windows, 1h and 2h. Note that in compile_window_op, we call

result = t.translate(operand, scope, timecontext, window=pyspark_window, context=context)
return result

passing window as a param and calculate window in the dispatched compile process. e.g. compile_aggregator

This is actually wrong. since we pass window as a param to compile_aggregator, then Mean() will be compiled to Col<... over 1h window> . When we translate the second window (2h). translate() will try to look up in scope first for Mean(). And Mean() is already compiled to Col<... over 1h window>. There is no window information in the key in scope for Mean(). So the 2h window will not be translated again and we get an incorrect result of 2 windows with the same size.

The Change

To make our scope cache correct and reliable, we should lift the logic to run .over(window) in compile_window_op.
This is a little complicated here:

There are operations like Lead, Lag, NotAny, NotAll, Rank, etc. These operations take window as a param, and they run .over(window) inside the dispatch function to compile these ops. If we move .over(window) outside of these ops, there will be several issues:

  1. for compile_rank and compile_dense_rank, they do:
F.dense_rank().over(window).astype('long') - 1

So if we move .over(window) to compile_window_op, we will have to do some post processing to do typecast.

  1. for compile_notany and compile_notall, they do:
if context is None:
        def fn(col):
            return ~(F.max(col))

        return compile_aggregator(
            t, expr, scope, timecontext, fn=fn, context=context, **kwargs
        )
    else:
        return ~compile_any(
            t,
            expr,
            scope,
            timecontext,
            context=context,
            window=window,
            **kwargs,
        )

The negation ~ here is very tricky to lift out here. We need to call .over(window) and then negate it. The solution we propose here is a rewrite in pre-translation to pass Any and All op to translation, and negate them back after window operation.

How is this tested

For window operations, cases are covered in ibis/tests/all/test_window.py. For multi-window, tests are added in ibis/pyspark/tests/test_window.py (Actually it's a re-enable of previously marked failure test)

# For NotAll and NotAny, negation must be applied after .over(window)
# Here we rewrite node to be its negation, and negate it back after
# translation and window operation
negated = False
Copy link
Contributor

@icexelloss icexelloss Sep 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have cleaner branching like this:

if isinstance(op, (ops.NotAll, ops.NotAny)):
    return ...
elif isinstance(op, (ops.MinRank,ops.DenseRank, ops.RowNumber)):
    return ...
else:
    return ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's indeed more readable. However, I argue that this may bring duplication in code:

result = t.translate(operand, scope, timecontext, context=context).over(
        pyspark_window
    )

will repeat 3 times.
The motivation for the current structure is pre_processing, translation, post_processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revised as you suggested for more readability

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in general minor comments.

@jreback jreback added pyspark The Apache PySpark backend backends - spark window functions Issues or PRs related to window functions labels Oct 1, 2020
elif isinstance(res_op, (ops.MinRank, ops.DenseRank, ops.RowNumber)):
# result must be cast to long type for rank / rownumber
return (
t.translate(operand, scope, timecontext, context=context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

factor out this to a variable
t.translate(operand, scope, timecontext, context=context)

@LeeTZ LeeTZ changed the title BUG: Same window op with different window size on table lead to incorrect results for pyspark backend BUG: Fix same window op with different window size on table lead to incorrect results for pyspark backend Oct 1, 2020
@jreback jreback added this to the Next Bugfix Release milestone Oct 1, 2020
@jreback
Copy link
Contributor

jreback commented Oct 1, 2020

lgtm ping on green

@LeeTZ
Copy link
Contributor Author

LeeTZ commented Oct 1, 2020

@jreback Green now

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed another around

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jreback jreback merged commit ae01cd2 into ibis-project:master Oct 2, 2020
@jreback
Copy link
Contributor

jreback commented Oct 2, 2020

thanks @LeeTZ

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pyspark The Apache PySpark backend window functions Issues or PRs related to window functions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BUG: Same window op with different window size on table lead to incorrect results for pyspark backend
3 participants