Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Support Ibis interval for window in pyspark backend #2409

Merged

Conversation

LeeTZ
Copy link
Contributor

@LeeTZ LeeTZ commented Sep 26, 2020

Overview

Currently, window size defined by ibis.interval is broken for pyspark backend. This PR aims to fix this issue.

The Problem

In pyspark backend, for a table generated by the following df:

df_time_indexed = client._session.createDataFrame(
        [
            [datetime(2017, 1, 2, 5, tzinfo=timezone.utc), 1, 1.0],
            [datetime(2017, 1, 2, 6, tzinfo=timezone.utc), 1, 3.0],
        ],
        ['time', 'key', 'value'],
    )

And calculate mean with a trailing_window, ordered by time column on this table:

window = ibis.trailing_window(
    preceding=ibis.interval(hours=2), order_by='time', group_by='key'
)
result = table.mutate(
    mean_2h=table['value'].mean().over(window)
).compile()

will result in error:

-> pyspark_window = pyspark_window.rowsBetween(start, end)  (compile_window_op inibis/pyspark/compiler.py)
TypeError: Arguments with datatype interval<int8>(unit='h') and int64 are not comparable

The Fix

  1. In compiling window op for pyspark backend, we cast TimeStampColumn to long type (seconds since epoch), this is required for spark to filtering and comparing time.
  2. Add support to pass ibis interval in preceding and following, now None, IntervalScalar and int are supported for preceding and following in pyspark window.

Tests

Tests are added in ibis/pyspark/tests/test_window.py.

Follow-ups

Once this is fixed, the next step is to add logic for context adjustment in pyspark backend. Again, to make things easier to review, will do context adjustment in follow-up PRs.

@LeeTZ LeeTZ force-pushed the tiezheng/fix_pyspark_time_indexed_window branch from 322f13d to 7a73fa1 Compare September 28, 2020 14:13
ordering_keys = [
key.to_expr().get_name()
F.col(key.to_expr().get_name()).cast('long')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really weird. Why are we getting op from expr and then use to_expr to get expr from op again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordering_keys = [
   F.col(expr.get_name()).cast('long')
   if isinstance(expr, types.TimestampColumn)
   else expr.get_name()
   for expr in order_by
]

Does this work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work but I get your point that we don't need to call op and to_expr at the same time. I refactored a little.

if window.following is None:
end = Window.unboundedFollowing
elif isinstance(window.following, ir.IntervalScalar):
end = int(execute(window.following).value / 1e9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are duplicate logic as window.preceding. Can you refactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added a function to canonicalize these intervals.


def test_time_indexed_window(client):
table = client.table('time_indexed_table')
window1 = ibis.trailing_window(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add tests for future window as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I add one to test following and compare the result with df generated by pyspark window.

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level looks good. Left some comments.

)

# These ops need type cast after running over window
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we agreed over.(window) should be lifted to this function instead of passing via kwargs.
This for sure works for compile_aggregator, but there are some other ops that is dispatched in this line. For example, lead, lag, row_numbers, first, last, etc. We used to pass window as a param to them.
I removed window param in these functions, but there are some ops like the three ones I listed here, they do type cast after running over window. Let me know if this change makes sense to you, or maybe there are better ways to do so? @icexelloss

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This is probably why over(window) was not in compile_window_op originally.

I don't think what you have here is much better than original implementation actually and I think we should roll back to the original implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree branching too much isn't a good idea, I reverted and commented below for thoughts

def compile_notany(
t, expr, scope, timecontext, *, context=None, window=None, **kwargs
):
def compile_notany(t, expr, scope, timecontext, *, context=None, **kwargs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/ibis-project/ibis/pull/2409/checks?check_run_id=1178201082#step:4:1284
We removed window here and seems tests are failing for this, and compile_notall, given the comments here, could you give some suggestions on how we should refactor? @icexelloss

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function looks fine and I don't think we need to change this. What exactly failed and how?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stacktrace is:

 pyspark.sql.utils.AnalysisException: "grouping expressions sequence is empty, and 'functional_alltypes.`index`' is not an aggregate function. Wrap '(min((functional_alltypes.`double_col` = CAST(0 AS DOUBLE))) AS `_w0`)' in windowing function(s) or wrap 'functional_alltypes.`index`'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debugged into this issue
For col_in_selection_order in compile_selection_op`, The correct value should be:

['index', 'Unnamed: 0', 'id', 'bool_col', 'tinyint_col', 'smallint_col', 'int_col', 'bigint_col', 'float_col', 'double_col', 'date_string_col', 'string_col', 'timestamp_col', 'year', 'month', Column<b'(NOT min((double_col = 0)) OVER (PARTITION BY string_col ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS `val`'>]

while the wrong value is

['index', 'Unnamed: 0', 'id', 'bool_col', 'tinyint_col', 'smallint_col', 'int_col', 'bigint_col', 'float_col', 'double_col', 'date_string_col', 'string_col', 'timestamp_col', 'year', 'month', Column<b'(NOT min((double_col = 0))) OVER (PARTITION BY string_col ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `val`'>]

Basically we need to call .over(window) inside F.col calls, and note that there are negate operation that we should also take care of. The fix we purpose to lift .over(window) to compile_window_op breaks this behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per discussion with @icexelloss , NotAny must be compiled to ~compile(Any) otherwse table aggregation breaks. Also we have to keep NotAny to call compile_aggregator because it could be used with table aggregation. e.g.

table.groupby('key').aggregate(new_col=table['bool'].not_any())

So we cannot easily change the rule for compiling NotAny to replement the order to insert over.window() inside.
Let me just revert this change to remove window and address this in followups

@LeeTZ LeeTZ force-pushed the tiezheng/fix_pyspark_time_indexed_window branch from 171637e to 41e2a39 Compare September 29, 2020 19:03
@LeeTZ
Copy link
Contributor Author

LeeTZ commented Sep 29, 2020

Overview

I revert refactoring to remove window from param, and add a flag in translate to force translation without using cached result in scope.

The Problem

The problem of window passed in param is: same op with different window size will not be compiled twice.
For example, the test case in test_timecontext.py, we are calculating Mean() on 2 different windows, 1h and 2h.
Since we pass window as a param to compile_aggregator, then Mean() will be compiled, and the result will be Col<... over 1h window> .
When we translate the second window (2h). translate will try to look up in scope first for Mean(). And Mean() is already compiled to Col<... over 1h window>. So this 2h window will not be translated again and we get an incorrect result of 2 window with the same range.

A perfect fix will be removing window from param and lift the logic to run .over(window) in compile_window_op. But this is a little complicated here:
There are operations like Lead, Lag, NotAny NotAll, Rank etc. These operations take window as a param, and they run .over(window) inside the dispatch function to compile these ops. If we move .over(window) outside of these ops, there will be several issues:

  1. for compile_rank and compile_dense_rank, they do:
F.dense_rank().over(window).astype('long') - 1

So if we move .over(window) to compile_window_op, we will have to branch on type of op and do such typecast. Too much branching and multiple calls of instance is not a good implementation.
2. for compile_notany and compile_notall, they do:

if context is None:

        def fn(col):
            return ~(F.max(col))

        return compile_aggregator(
            t, expr, scope, timecontext, fn=fn, context=context, **kwargs
        )
    else:
        return ~compile_any(
            t,
            expr,
            scope,
            timecontext,
            context=context,
            window=window,
            **kwargs,
        )

The negation ~ here is very tricky to lift out here. Also according to the stacktrace of failing tests in a previous commit in this PR,
Column<b'(NOT min((double_col = 0)) OVER (PARTITION BY string_col ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS val'>] is what we should get.
But if we lift .over(window) outside of compile_notany, column will be compiled to Column<b'(NOT min((double_col = 0))) OVER (PARTITION BY string_col ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS val'>]
Note the differences in the location of (). We need to call .over(window) inside F.col calls, and note that there is a negate operation that we should also take care of. The fix we purpose to lift .over(window) to compile_window_op breaks this behavior.
3. A solution might be: don't call negate on compile_any for compile_notany, and don't call compile_aggregator, implement the full logic to handle compile with window in compile_notany, and other functions that are causing issues. This is not a good solution, not only it brings more duplication in code, but also breaks the behavior. we have to keep NotAny to call compile_aggregator because it could be used with table aggregation. e.g.

table.groupby('key').aggregate(new_col=table['bool'].not_any())

So we cannot easily change the rule for compiling NotAny to reimplement the order to insert over.window() inside.

The Change

Given the complexity of removing window in param, I proposed to add a flag in translate to force translation to not trust result cached in scope, so that the same op with different windows will be translated twice with different window ranges, and we could get the correct result. This sacrifices performance a little bit but guarantee the correctness. This flag only applies to translate in compile_window_op now.

Follow-ups

We should definitely think about how we could refactor all window related op compilation. Will create an issue to track this.

@LeeTZ LeeTZ closed this Sep 29, 2020
@LeeTZ LeeTZ reopened this Sep 29, 2020
@icexelloss icexelloss changed the title Support time indexed windowing in pyspark backend FEAT: Support time based windowing in pyspark backend Sep 29, 2020
@LeeTZ LeeTZ changed the title FEAT: Support time based windowing in pyspark backend FEAT: Support Ibis interval for window in pyspark backend Sep 29, 2020
@LeeTZ
Copy link
Contributor Author

LeeTZ commented Sep 29, 2020

As per offline discussion with Li, I am leaving this PR simple as to support Ibis interval for pyspark window, let me create another PR to do the refactor.

@icexelloss
Copy link
Contributor

icexelloss commented Sep 29, 2020

Given the complexity of removing window in param, I proposed to add a flag in translate to force translation to not trust result cached in scope

Sorry, I am -1 to this idea. I think if we cannot trust the cache then the fundamental invariant of the compile algorithm breaks. (an op can uniquely define the compilation result)

Per discussion, we agree the best solution forward is to change the lift the pyspark .over(window) statement from the compilation result for aggregators (e.g., Min, Max, NotAny, etc) to the compilation of window op itself.

This increases the complexity in the "compile window op" but at least the complexity is local to that function. "local complexity" is better than "global complexity" which the flag solution will introduce.

"""
if isinstance(interval, ir.IntervalScalar):
# value is in nanosecond
return int(execute(interval).value / 1e9)
Copy link
Contributor

@icexelloss icexelloss Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a translation rule for IntervalScalar? Perhaps we can add a rule for IntervalScalar and do

@compiles(IntervalScalar)
def compile_interval_scalar(interval):
    return execute(interval)

timedelta = t.translate(interval, ...) # This is a pd.Timedelta
second_since_epoch = int(timedelta.value / 1e9)

Copy link
Contributor Author

@LeeTZ LeeTZ Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntervalScalar is not of type op (operations), while all other @ compiles are compiling ops. I found it a little strange to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

op is Literal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm i agree with @icexelloss here shouldn't we just be able to translate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or should this be a common method with other backends?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add this to compile(Literal)

pytestmark = pytest.mark.pyspark


def test_time_indexed_window(client):
Copy link
Contributor

@icexelloss icexelloss Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you parameterize this test by the window bounds?

(trailing_window(ibis.interval(hours=1), ...), Window.partitionBy(...).rangeBetween(...))

and

(ibis.range_window(..., ...), 
Window.partitionBy(...).rangeBetween(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot simply put Window in param because that might trigger error of 'NoneType' object has no attribute '_jvm' where spark jvm is not initialized properly. Instead, I put range as param.

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed again.

@jreback jreback added pyspark The Apache PySpark backend backends - spark labels Sep 30, 2020

if isinstance(interval, ir.IntervalScalar):

return int(execute(interval).value / 1e9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a concern if this is too big here? e.g. i think resolution on spark is only ms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here in window bounds spark uses seconds since epoch, execute gives us a Timedelta and value is in nanosecond

return int(execute(interval).value / 1e9)
elif isinstance(interval, int):
return interval
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just raise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I removed the else here

"""
if isinstance(interval, ir.IntervalScalar):
# value is in nanosecond
return int(execute(interval).value / 1e9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm i agree with @icexelloss here shouldn't we just be able to translate?

return interval
else:
raise com.UnsupportedOperationError(
'type {} is not supported in preceding/following in window'.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use f-strings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

"""
if isinstance(interval, ir.IntervalScalar):
# value is in nanosecond
return int(execute(interval).value / 1e9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or should this be a common method with other backends?



# TODO: multi windows don't update scope correctly
@pytest.mark.xfail
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add this int he reason for the xfail and create an issue for this (and list it here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np, this is just the next issue I am going to fix, #2412

Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm minor comment in tests & can you add a release note with this PR number. ping on green.



# TODO: multi windows don't update scope correctly
@pytest.mark.xfail(reason='Issue #2412', strict=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a description as well here

@LeeTZ
Copy link
Contributor Author

LeeTZ commented Sep 30, 2020

@icexelloss @jreback CI is green for tests now but failed for impala clickhouse / docbuild. Could we ignore and proceed or should I run again?

@jreback jreback added this to the Next Feature Release milestone Sep 30, 2020
@jreback jreback merged commit 67bca4e into ibis-project:master Sep 30, 2020
@jreback
Copy link
Contributor

jreback commented Sep 30, 2020

yep this is fine

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pyspark The Apache PySpark backend
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants