-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT: Add time context in scope in execution for pandas backend
#2306
FEAT: Add time context in scope in execution for pandas backend
#2306
Conversation
scope in execution for pandas backendscope in execution for pandas backend
ibis/pandas/core.py
Outdated
| @@ -554,3 +610,113 @@ def canonicalize_context( | |||
| f'begin time {begin} must be before or equal' f' to end time {end}' | |||
| ) | |||
| return begin, end | |||
|
|
|||
|
|
|||
| class TimeContextRelation(enum.Enum): | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps move this to a timecontext module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, will move this to an upper level (ibis level) so that we could reuse this module for other backends
ibis/pandas/core.py
Outdated
| NONOVERLAP = 3 | ||
|
|
||
|
|
||
| def compare_timecontext(cur_context: TimeContext, old_context: TimeContext): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to a timecontext module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, same as above.
ibis/pandas/core.py
Outdated
| if op not in scope: | ||
| return None | ||
| # for ops without timecontext | ||
| if timecontext is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency, we should perhaps use an timecontext place holder for the timecontext is None case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean have an entry of {"value":None, "timecontext":None}?
ibis/pandas/execution/window.py
Outdated
| @@ -161,14 +161,17 @@ def trim_with_timecontext(data, timecontext: Optional[TimeContext]): | |||
| if not timecontext: | |||
| return data | |||
|
|
|||
| df = data.reset_index() | |||
| df = data.reset_index(level=1) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK this is tricky. Here, data is a Series of MultiIndex (int, Timestamp) where int is some index (0,1,2,3 ...) and Timestamp column is our time column. When we filter by time, the index is also filtered so we will have something like:
index time value
2 2010-01-02 2
3 2010-01-03 3
As you can see the index may not start from 0. And later if we put this Series back to generate result. There will be NaN and NaT filling in, since pandas will align rows according to index number.
Therefore, all changes in this function is trying to reindex the int inside the Multiindex after filtering, to guarantee the correctness of our result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think this needs to be explained in the code, otherwise it's kind of confusing.
Also, is it always the case that time is an index here? What if this is not a moving window, e.g.
win = window(group_by='id')
What's the behavior of this code in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or perhaps
win = window(preceding=interval(days=1), group_by='id', order_by='some_other_time_column')
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or
win = window(preceding=10, group_by='id', order_by='time')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I think this function is not very clear about what the expected input and output. It would be helpful to clarify.
ibis/pandas/execution/window.py
Outdated
| data, | ||
| order_by, | ||
| group_by=group_by, | ||
| timecontext=new_timecontext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename new_timecontext to adjusted_timecontext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
ibis/pandas/execution/window.py
Outdated
| factory=OrderedDict, | ||
| ) | ||
| additional_scope = {} | ||
| for t in operand.op().root_tables(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why changing the list comprehension to for loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be better ways but let me explain why, scope_item() now return with a dict {op: {"value":xx, "timecontext":xx}}, how can I turn it into (key, value) like (t, source) in a one-linere we had previously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your code is the same as
toolz.merge(
set_scope_item(t, source, adjusted_timecontext)
for t in
operand.op().root_tables()
)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
ibis/pandas/core.py
Outdated
| return TimeContextRelation.OVERLAP | ||
|
|
||
|
|
||
| def scope_item(op, result, timecontext): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we lift scope related code to its own module so the pyspark backend can also reuse it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, will move this to Ibis level
ibis/pandas/core.py
Outdated
| # in result Dataframe, cannot use cached result with different time | ||
| # context to optimize calculation. For simplicity, we skip these ops | ||
| # for now. | ||
| if isinstance(op, ops.TableColumn) or isinstance(op, ops.TableNode): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a xfail test for projection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per discussion offline. I think we should just enable this for all nodes instead of leaf nodes.
ibis/pandas/core.py
Outdated
| @@ -220,7 +264,7 @@ def execute_with_scope( | |||
| **kwargs, | |||
| ), | |||
| **kwargs, | |||
| )[op] | |||
| )[op]['value'] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the value here is for a context that is larger than the one that user requested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked again, this seems unlikely to happen because execute_until_in_scope would return with a dict with op as the only key. This is not reading from the scope. Time context should be already taken care of in execution phases in execute_until_in_scope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think execute_until_in_scope used to return the scope after execution. Is that still the case? If so, shouldn't the code here be:
result = get_scope_item(scope, op, timecontext)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it makes sense, however, execute_until_in_scope is a recursive call and it always returns the executed result of the expr, instead of the whole scope. Before this change, the return value is just {op: computed}. So I think we don't need to call get method again here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
ibis/pandas/core.py
Outdated
| @@ -330,7 +380,7 @@ def execute_until_in_scope( | |||
|
|
|||
| # pass our computed arguments to this node's execute_node implementation | |||
| data = [ | |||
| new_scope[arg.op()] if hasattr(arg, 'op') else arg | |||
| get_scope(new_scope, arg.op()) if hasattr(arg, 'op') else arg | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think get_scope_item and set_scope_item is better. You basically want a get and set method to get/set item from scope with a op and timecontext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I will change these signature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level looks good. Let's try to decouple timecontext scope and execution modules. I think timecontext and scope module can be reused by other backends as well.
ibis/pandas/core.py
Outdated
| @@ -343,7 +394,7 @@ def execute_until_in_scope( | |||
| **kwargs, | |||
| ) | |||
| computed = post_execute_(op, result, timecontext=timecontext) | |||
| return {op: computed} | |||
| return set_scope_item(op, computed, timecontext) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the first arg to set_scope_item should be scope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored and changed arg accrodingly
ibis/common/scope.py
Outdated
| from ibis.timecontext.util import TimeContextRelation, compare_timecontext | ||
|
|
||
|
|
||
| def set_scope_item(op, result, timecontext: Optional[TimeContext]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a "set" method. A set method should change the state of object and returns nothing.
This set method would make more sense to me
def set_scope_item(scope, op, timecontext, value):
scope[op] = {'value': result, 'timecontext': timecontext}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I first name it as scope_item for the same reason. Now it is refactored and this is addressed.
ibis/pandas/core.py
Outdated
| timecontext, | ||
| ) | ||
|
|
||
| if get_scope_item(scope, op, timecontext) is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change the order to be after Literal check here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, this is a bug.
ibis/pandas/core.py
Outdated
| @@ -330,7 +381,7 @@ def execute_until_in_scope( | |||
|
|
|||
| # pass our computed arguments to this node's execute_node implementation | |||
| data = [ | |||
| new_scope[arg.op()] if hasattr(arg, 'op') else arg | |||
| get_scope_item(new_scope, arg.op()) if hasattr(arg, 'op') else arg | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing timecontext here when calling get_scope_item?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is the same bug as the previous comment, op was directly executed and not add to op result in a None in getting time context. Will fix it
ibis/timecontext/util.py
Outdated
| TIME_COL = 'time' | ||
|
|
||
|
|
||
| class TimeContextRelation(enum.Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps call this ibis/expr/timecontext.py?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, renamed the file.
ibis/timecontext/adjustment.py
Outdated
| from ibis.expr.typing import TimeContext | ||
|
|
||
|
|
||
| def adjust_context_asof_join( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably define dispatch methods for extending.
@singledispatch
def adjust_timecontext(op, timecontext):
raise NotImplementedError()
@adjust_timecontext.register(AsOfJoin):
def adjust_timecontext_asof_join(op, timecontext):
return ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order for this to work, I added an extra arg clients in compute_time_context dispatch, we will need to pass clients so that in these dispatch function at expr level, we could run execute on op args.
ibis/timecontext/util.py
Outdated
|
|
||
| Returns | ||
| ------- | ||
| result : Enum[TimeContextRelation] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the type for this is just TimeContextRelation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed another around. Left some comments.
ibis/expr/scope.py
Outdated
|
|
||
| `scope` in Scope class is the main cache. It is a dictionary mapping | ||
| ibis node instances to concrete data, and the time context associate | ||
| with it(if any). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you update here
ibis/expr/scope.py
Outdated
| ------- | ||
| Scope: a new Scope instance with op in it. | ||
| """ | ||
| item = namedtuple('item', ['value', 'timecontext']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be ScopeItem and created in the module (the definition)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the top of this module, do
ScopeItem= namedtuple(....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks really good. just a small change in the impl of Scope and should be good
ibis/expr/scope.py
Outdated
| ------- | ||
| Scope: a new Scope instance with op in it. | ||
| """ | ||
| item = namedtuple('item', ['value', 'timecontext']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the top of this module, do
ScopeItem= namedtuple(....)
ibis/expr/scope.py
Outdated
|
|
||
| class Scope: | ||
| def __init__(self, items: Dict[str, ScopeItem] = None): | ||
| self.items = items or {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
save this as self._items, then you can make a .items() method which return iter(self._items) and then Scope becomes more like a dictionary
ibis/expr/scope.py
Outdated
| a new Scope instance with items in two scope merged. | ||
| """ | ||
| result = Scope() | ||
| for op, v in self._get_items().items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you can fix this as above
d706fd1
to
f9e2b46
Compare
…dd_time_context_in_scope
f9e2b46
to
8b83630
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one small additional comment, then lgtm. ping on green.
ibis/expr/scope.py
Outdated
| """ | ||
| result = Scope() | ||
| for op in self.items(): | ||
| result._items[op] = self._items[op] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on Scope I would define __getitem__(self, op) and __setitem__(self, op) as public methods (and then use them here)
def __getitem__(self, op):
return self._items[op]
def __setitem__(self, op, value):
self._items[op] = value
ibis/expr/scope.py
Outdated
| result = Scope() | ||
|
|
||
| for op in self.items(): | ||
| result.__setitem__(op, self.__getitem__(op)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use
result[op] = self[op]
ibis/expr/scope.py
Outdated
| """ | ||
| result = Scope() | ||
| for op in self.items(): | ||
| result.__setitem__(op, self.__getitem__(op)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result[op] = self[op]
ibis/expr/scope.py
Outdated
| def __getitem__(self, op): | ||
| return self._items[op] | ||
|
|
||
| def __setitem__(self, op, value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should return None, type value as Any, op is ScopeItem?
ibis/expr/scope.py
Outdated
| def items(self): | ||
| return iter(self._items) | ||
|
|
||
| def __getitem__(self, op): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
op is ScopeItem? returns -> Any
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the diff between __getitem__ and get?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
__getitem__ returns a ScopeItem like {'value': 'foo', 'timecontext': None}
get is an api to return the value, which is foo in the case above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. But be careful this is exposing implementation details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping on green
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@LeeTZ if you can add a release note. pls ping when green. |
|
Ping @jreback for green. Thank you all for reviewing! |
|
awesome @LeeTZ very nice! |
What is the change
This PR purpose a change to the data structure
scopein pandas execution. Time context is added toscopeand getting, setting scope methods are changed accordingly.Existing mechanism
Scope key is the
opassociated with each expression.Scope value is a
pd.DataFrameorpd.Seriesthat is the result of executing keyop.Set scope kv pair: This is done in multiple places:
in
execute_until_in_scope,new_scopeis a combination of scopes of all children(calculating recursively), and scope frompre_execute.in
window.pywindow has a similar log of setting scope kv pair.in
selection.pyselction is addingadditional_scopetoscopeby remapping columns to data. Seecompute_projection_scalar_expr,compute_projection_column_expretc.Although this kv is set in multiple places by different code pieces, conceptually, it's just
scope[op] = result.Get scope kv pair:
execute_until_in_scopeusingdata = [ new_scope[arg.op()] if hasattr(arg, 'op') else arg for arg in computable_args ]that retrives computed data from scope, and pass on toexecute_nodeimplementation.execute_util_in_scopethat return anopwithout execution ifopis already inscope.Scope miss: If there is a miss in getting
scope[op], we will execute op.Proposed mechanism
Scope key is still
opassociated with each expression.Scope value is another key-value map
-
value:pd.DataFrameorpd.Seriesthat is the result of executing keyop, this is same as the original scope value.-
timecontext: of typeTimeContext, the time context associated with the data stored invalueNote that the idea is: data should be determined by both
opandtimecontext. So the data structure above is conceptually same as making(op, timecontext)as the key for scope. But that may increase the time complexity in querying, so we maketimecontextas another key ofop. See following getting and setting logic for details.Set scope kv pair: before setting the value
opinscopewe need to perform the following check first:opis inscopeyetopinscope, settimecontextto be the currenttimecontext(None if timecontext is not present), setvalueto be theDataFrameorSeriesof the actual data.timecontextstored inscopeforopasold_timecontext, and compare it with currenttimecontext:timecontextis same asold_timecontext, do nothing.timecontextis a subset ofold_timecontext, that means we already cache a larger range of data. Do nothing and we will trim data in later execution process.timecontextis a superset ofold_timecontext, that means we need to update cache. Setvalueto be the current data and settimecontextto be the currenttimecontextforop.timecontextis neither a subset nor a superset ofold_timcontext, If current time context is neither a subset nor a superset of old_timcontext, but they overlap, or not overlap at all. For example, this will happen when there is a window that looks forward, over a window that looks back. So in this case, we should not trust the data stored either, and go on to execute this node. For simplicity, we update the cache in this case as well.Get scope kv pair: getting will change from
scope[op]toget_scope(scope, op, timecontext)in get calls.Scope miss: If there is a miss in getting
scope[op], we will executeopto get the result.Restrictions
scopeto cache leaf table nodes now. We may assume that if we cache a large time range that is the superset of all other time contexts, we could always use this cache and trim data to their associated time context later. But this assumption doesn't hold true for all cases. This is because some ops cannot use the cached results with a different time context to get the correct result. For example, agroupbyfollowed bycount, if we use a larger or smaller from the cache, we will probably get an error in the result. Such ops global aggregation, ops whose result is depending on other rows in result Dataframe, cannot use the cached result with different context to optimize calculation. For simplicity, we skip these ops for now and will address these in followups.How is this change tested
Unit tests are added in
ibis/pandas/tests/test_timecontext.pyto test if time context comparison works. Also, concrete examples that use this feature likewindowinmutateand multi-window tests, are added as well. See inline comments for the explanation of each test case.