From 84bac370b0e5f6d07efd466e64fb2e1ecc88c64e Mon Sep 17 00:00:00 2001 From: Chloe He Date: Wed, 15 May 2024 15:22:02 -0700 Subject: [PATCH] refactor(api): refactor the implementation of windowing --- ibis/backends/flink/compiler.py | 72 +++------ ibis/backends/flink/tests/test_compiler.py | 24 --- ibis/expr/format.py | 1 - ibis/expr/operations/temporal_windows.py | 71 +++------ ibis/expr/types/relations.py | 107 ++++++++----- ibis/expr/types/temporal_windows.py | 177 ++++++++------------- ibis/tests/expr/test_temporal_windows.py | 76 --------- 7 files changed, 176 insertions(+), 352 deletions(-) diff --git a/ibis/backends/flink/compiler.py b/ibis/backends/flink/compiler.py index 211ce3c9882aa..092cb08bc88df 100644 --- a/ibis/backends/flink/compiler.py +++ b/ibis/backends/flink/compiler.py @@ -142,66 +142,46 @@ def _minimize_spec(start, end, spec): spec.args["end_side"] = None return spec - def visit_TumbleWindowingTVF(self, op, *, table, time_col, window_size, offset): + def visit_WindowAggregate( + self, + op, + *, + parent, + window_type, + time_col, + groups, + metrics, + window_size, + window_step, + offset, + ): + if window_type == "tumble": + assert window_step is None + args = [ - self.v[f"TABLE {table.this.sql(self.dialect)}"], + self.v[f"TABLE {parent.this.sql(self.dialect)}"], # `time_col` has the table _alias_, instead of the table, but it is # required to be bound to the table, this happens because of the # way we construct the op in the tumble API using bind # # perhaps there's a better way to deal with this self.f.descriptor(time_col.this), + window_step, window_size, offset, ] - return sg.select( - sge.Column( - this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True) - ) - ).from_( - self.f.table(self.f.tumble(*filter(None, args))).as_( - table.alias_or_name, quoted=True - ) - ) - - def visit_HopWindowingTVF( - self, op, *, table, time_col, window_size, window_slide, offset - ): - args = [ - self.v[f"TABLE {table.this.sql(self.dialect)}"], - self.f.descriptor(time_col.this), - window_slide, - window_size, - offset, - ] - return sg.select( - sge.Column( - this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True) - ) - ).from_( - self.f.table(self.f.hop(*filter(None, args))).as_( - table.alias_or_name, quoted=True - ) - ) + window_func = getattr(self.f, window_type) - def visit_CumulateWindowingTVF( - self, op, *, table, time_col, window_size, window_step, offset - ): - args = [ - self.v[f"TABLE {table.this.sql(self.dialect)}"], - self.f.descriptor(time_col.this), - window_step, - window_size, - offset, - ] return sg.select( - sge.Column( - this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True) - ) + sg.column("window_start", table=parent.alias_or_name, quoted=True), + sg.column("window_end", table=parent.alias_or_name, quoted=True), + *self._cleanup_names(groups), + *self._cleanup_names(metrics), + copy=False, ).from_( - self.f.table(self.f.cumulate(*filter(None, args))).as_( - table.alias_or_name, quoted=True + self.f.table(window_func(*filter(None, args))).as_( + parent.alias_or_name, quoted=True ) ) diff --git a/ibis/backends/flink/tests/test_compiler.py b/ibis/backends/flink/tests/test_compiler.py index cdfe9a996d92c..c5096d5b9451c 100644 --- a/ibis/backends/flink/tests/test_compiler.py +++ b/ibis/backends/flink/tests/test_compiler.py @@ -1,7 +1,5 @@ from __future__ import annotations -from operator import methodcaller - import pytest from pytest import param @@ -105,28 +103,6 @@ def test_having(simple_table, assert_sql): assert_sql(expr) -@pytest.mark.parametrize( - "method", - [ - methodcaller("tumble", window_size=ibis.interval(minutes=15)), - methodcaller( - "hop", - window_size=ibis.interval(minutes=15), - window_slide=ibis.interval(minutes=1), - ), - methodcaller( - "cumulate", - window_size=ibis.interval(minutes=1), - window_step=ibis.interval(seconds=10), - ), - ], - ids=["tumble", "hop", "cumulate"], -) -def test_windowing_tvf(simple_table, method, assert_sql): - expr = method(simple_table.window_by(time_col=simple_table.i)) - assert_sql(expr) - - def test_window_aggregation(simple_table, assert_sql): expr = ( simple_table.window_by(time_col=simple_table.i) diff --git a/ibis/expr/format.py b/ibis/expr/format.py index 00b93ae5f2f6d..b427bdc9d8091 100644 --- a/ibis/expr/format.py +++ b/ibis/expr/format.py @@ -211,7 +211,6 @@ def fmt(op, **kwargs): @fmt.register(ops.Relation) -@fmt.register(ops.WindowingTVF) def _relation(op, parent=None, **kwargs): if parent is None: top = f"{op.__class__.__name__}\n" diff --git a/ibis/expr/operations/temporal_windows.py b/ibis/expr/operations/temporal_windows.py index 8eec01e257137..f48d037ff81f3 100644 --- a/ibis/expr/operations/temporal_windows.py +++ b/ibis/expr/operations/temporal_windows.py @@ -1,67 +1,38 @@ from __future__ import annotations -from typing import Optional +from typing import Literal, Optional from public import public import ibis.expr.datatypes as dt from ibis.common.annotations import attribute +from ibis.common.collections import FrozenOrderedDict from ibis.expr.operations.core import Column, Scalar # noqa: TCH001 -from ibis.expr.operations.relations import Relation +from ibis.expr.operations.relations import Relation, Unaliased from ibis.expr.schema import Schema @public -class WindowingTVF(Relation): - """Generic windowing table-valued function.""" - - # TODO(kszucs): rename to `parent` - table: Relation - time_col: Column[dt.Timestamp] # enforce timestamp column type here +class WindowAggregate(Relation): + parent: Relation + window_type: Literal["tumble", "hop"] + time_col: Unaliased[Column] + groups: FrozenOrderedDict[str, Unaliased[Column]] + metrics: FrozenOrderedDict[str, Unaliased[Scalar]] + window_size: Scalar[dt.Interval] + window_step: Optional[Scalar[dt.Interval]] = None + offset: Optional[Scalar[dt.Interval]] = None @attribute def values(self): - return self.table.fields + return FrozenOrderedDict({**self.groups, **self.metrics}) - @property + @attribute def schema(self): - names = list(self.table.schema.names) - types = list(self.table.schema.types) - - # The return value of windowing TVF is a new relation that includes all columns - # of original relation as well as additional 3 columns named “window_start”, - # “window_end”, “window_time” to indicate the assigned window - - # TODO(kszucs): this looks like an implementation detail leaked from the - # flink backend - names.extend(["window_start", "window_end", "window_time"]) - # window_start, window_end, window_time have type TIMESTAMP(3) in Flink - types.extend([dt.timestamp(scale=3)] * 3) - - return Schema.from_tuples(list(zip(names, types))) - - -@public -class TumbleWindowingTVF(WindowingTVF): - """TUMBLE window table-valued function.""" - - window_size: Scalar[dt.Interval] - offset: Optional[Scalar[dt.Interval]] = None - - -@public -class HopWindowingTVF(WindowingTVF): - """HOP window table-valued function.""" - - window_size: Scalar[dt.Interval] - window_slide: Scalar[dt.Interval] - offset: Optional[Scalar[dt.Interval]] = None - - -@public -class CumulateWindowingTVF(WindowingTVF): - """CUMULATE window table-valued function.""" - - window_size: Scalar[dt.Interval] - window_step: Scalar[dt.Interval] - offset: Optional[Scalar[dt.Interval]] = None + field_pairs = { + "window_start": dt.timestamp, + "window_end": dt.timestamp, + **{k: v.dtype for k, v in self.groups.items()}, + **{k: v.dtype for k, v in self.metrics.items()}, + } + return Schema(field_pairs) diff --git a/ibis/expr/types/relations.py b/ibis/expr/types/relations.py index 3b9dc3257093b..28e49f2cf031e 100644 --- a/ibis/expr/types/relations.py +++ b/ibis/expr/types/relations.py @@ -57,10 +57,10 @@ def _regular_join_method( def f( # noqa: D417 self: ir.Table, right: ir.Table, - predicates: str - | Sequence[ - str | tuple[str | ir.Column, str | ir.Column] | ir.BooleanValue - ] = (), + predicates: ( + str + | Sequence[str | tuple[str | ir.Column, str | ir.Column] | ir.BooleanValue] + ) = (), *, lname: str = "", rname: str = "{name}_right", @@ -2136,10 +2136,12 @@ def select( ) def relabel( self, - substitutions: Mapping[str, str] - | Callable[[str], str | None] - | str - | Literal["snake_case", "ALL_CAPS"], + substitutions: ( + Mapping[str, str] + | Callable[[str], str | None] + | str + | Literal["snake_case", "ALL_CAPS"] + ), ) -> Table: """Deprecated in favor of `Table.rename`.""" if isinstance(substitutions, Mapping): @@ -2148,11 +2150,13 @@ def relabel( def rename( self, - method: str - | Callable[[str], str | None] - | Literal["snake_case", "ALL_CAPS"] - | Mapping[str, str] - | None = None, + method: ( + str + | Callable[[str], str | None] + | Literal["snake_case", "ALL_CAPS"] + | Mapping[str, str] + | None + ) = None, /, **substitutions: str, ) -> Table: @@ -2972,17 +2976,19 @@ def describe( def join( left: Table, right: Table, - predicates: str - | Sequence[ + predicates: ( str - | ir.BooleanColumn - | Literal[True] - | Literal[False] - | tuple[ - str | ir.Column | ir.Deferred, - str | ir.Column | ir.Deferred, + | Sequence[ + str + | ir.BooleanColumn + | Literal[True] + | Literal[False] + | tuple[ + str | ir.Column | ir.Deferred, + str | ir.Column | ir.Deferred, + ] ] - ] = (), + ) = (), how: JoinKind = "inner", *, lname: str = "", @@ -3520,9 +3526,9 @@ def pivot_longer( *, names_to: str | Iterable[str] = "name", names_pattern: str | re.Pattern = r"(.+)", - names_transform: Callable[[str], ir.Value] - | Mapping[str, Callable[[str], ir.Value]] - | None = None, + names_transform: ( + Callable[[str], ir.Value] | Mapping[str, Callable[[str], ir.Value]] | None + ) = None, values_to: str = "value", values_transform: Callable[[ir.Value], ir.Value] | Deferred | None = None, ) -> Table: @@ -4496,25 +4502,38 @@ def relocate( return relocated - def window_by(self, time_col: ir.Value) -> WindowedTable: - """Create a windowing table-valued function (TVF) expression. - - Windowing table-valued functions (TVF) assign rows of a table to windows - based on a time attribute column in the table. - - Parameters - ---------- - time_col - Column of the table that will be mapped to windows. - - Returns - ------- - WindowedTable - WindowedTable expression. - """ - from ibis.expr.types.temporal_windows import WindowedTable - - return WindowedTable(self, time_col) + def window_by( + self, + window_type: Literal["tumble", "hop"], + time_col: str | ir.Value, + window_size: ir.IntervalScalar, + window_step: ir.IntervalScalar | None = None, + offset: ir.IntervalScalar | None = None, + ) -> WindowedTable: + from ibis.expr.types.temporal_windows import HopTable, TumbleTable + + time_col = next(iter(self.bind(time_col))) + + if window_type == "tumble": + if window_step is not None: + raise com.IbisInputError( + "Tumble windows are non-overlapping and the window step is assumed " + "to be the same as the window size. If you want to create overlapping " + "windows, specify `window_type='hop'`." + ) + return TumbleTable(self, time_col, window_size=window_size, offset=offset) + elif window_type == "hop": + return HopTable( + self, + time_col, + window_size=window_size, + window_step=window_step, + offset=offset, + ) + else: + raise com.IbisInputError( + f"`window_type` must be `tumble` or `hop`, got {window_type}" + ) @public diff --git a/ibis/expr/types/temporal_windows.py b/ibis/expr/types/temporal_windows.py index 74560d0f36a2a..1100a37005a92 100644 --- a/ibis/expr/types/temporal_windows.py +++ b/ibis/expr/types/temporal_windows.py @@ -7,129 +7,84 @@ import ibis.common.exceptions as com import ibis.expr.operations as ops import ibis.expr.types as ir -from ibis.expr.types.relations import bind +from ibis.common.grounds import Concrete +from ibis.expr.types.relations import unwrap_aliases if TYPE_CHECKING: - from ibis.expr.types import Table + from collections.abc import Sequence @public -class WindowedTable: +class WindowedTable(Concrete): """An intermediate table expression to hold windowing information.""" - def __init__(self, table: ir.Table, time_col: ir.Value): - self.table = table - self.time_col = next(bind(table, time_col)) + table: ops.Relation + time_col: ops.Column - if self.time_col is None: - raise com.IbisInputError( - "Window aggregations require `time_col` as an argument" - ) + def __init__(self, time_col: ops.Column, **kwargs): + if not time_col: + raise com.IbisInputError("No time column provided") + super().__init__(time_col=time_col, **kwargs) - def tumble( - self, - window_size: ir.IntervalScalar, - offset: ir.IntervalScalar | None = None, - ) -> Table: - """Compute a tumble table valued function. - - Tumbling windows have a fixed size and do not overlap. The size of the windows is - determined by `window_size`, optionally shifted by a duration specified by `offset`. - - Parameters - ---------- - window_size - Width of the tumbling windows. - offset - An optional parameter to specify the offset which window start should be shifted by. - - Returns - ------- - Table - Table expression after applying tumbling table-valued function. - """ - time_col = next(bind(self.table, self.time_col)) - return ops.TumbleWindowingTVF( - table=self.table, - time_col=time_col, - window_size=window_size, - offset=offset, - ).to_expr() - def hop( +@public +class TumbleTable(WindowedTable): + window_size: ir.IntervalScalar + offset: ir.IntervalScalar | None = None + + def aggregate( self, - window_size: ir.IntervalScalar, - window_slide: ir.IntervalScalar, - offset: ir.IntervalScalar | None = None, - ): - """Compute a hop table valued function. - - Hopping windows have a fixed size and can be overlapping if the slide is smaller than the - window size (in which case elements can be assigned to multiple windows). Hopping windows - are also known as sliding windows. The size of the windows is determined by `window_size`, - how frequently a hopping window is started is determined by `window_slide`, and windows can - be optionally shifted by a duration specified by `offset`. - - For example, you could have windows of size 10 minutes that slides by 5 minutes. With this, - you get every 5 minutes a window that contains the events that arrived during the last 10 minutes. - - Parameters - ---------- - window_size - Width of the hopping windows. - window_slide - The duration between the start of sequential hopping windows. - offset - An optional parameter to specify the offset which window start should be shifted by. - - Returns - ------- - Table - Table expression after applying hopping table-valued function. - """ - time_col = next(bind(self.table, self.time_col)) - return ops.HopWindowingTVF( - table=self.table, - time_col=time_col, - window_size=window_size, - window_slide=window_slide, - offset=offset, + metrics: Sequence[ir.Scalar] | None = (), + by: Sequence[ir.Value] | None = (), + **kwargs: ir.Value, + ) -> ir.Table: + table = self.table.to_expr() + groups = table.bind(by) + metrics = table.bind(metrics, **kwargs) + + groups = unwrap_aliases(groups) + metrics = unwrap_aliases(metrics) + + return ops.WindowAggregate( + self.table, + "tumble", + self.time_col, + groups=groups, + metrics=metrics, + window_size=self.window_size, + offset=self.offset, ).to_expr() - def cumulate( + agg = aggregate + + +@public +class HopTable(WindowedTable): + window_size: ir.IntervalScalar + window_step: ir.IntervalScalar + offset: ir.IntervalScalar | None = None + + def aggregate( self, - window_size: ir.IntervalScalar, - window_step: ir.IntervalScalar, - offset: ir.IntervalScalar | None = None, - ): - """Compute a cumulate table valued function. - - Cumulate windows don't have a fixed size and do overlap. Cumulate windows assign elements to windows - that cover rows within an initial interval of step size and expand to one more step size (keep window - start fixed) every step until the max window size. - - For example, you could have a cumulating window for 1 hour step and 1 day max size, and you will get - windows: [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) for every day. - - Parameters - ---------- - window_size - Max width of the cumulating windows. - window_step - A duration specifying the increased window size between the end of sequential cumulating windows. - offset - An optional parameter to specify the offset which window start should be shifted by. - - Returns - ------- - Table - Table expression after applying cumulate table-valued function. - """ - time_col = next(bind(self.table, self.time_col)) - return ops.CumulateWindowingTVF( - table=self.table, - time_col=time_col, - window_size=window_size, - window_step=window_step, - offset=offset, + metrics: Sequence[ir.Scalar] | None = (), + by: Sequence[ir.Value] | None = (), + **kwargs: ir.Value, + ) -> ir.Table: + table = self.table.to_expr() + groups = table.bind(by) + metrics = table.bind(metrics, **kwargs) + + groups = unwrap_aliases(groups) + metrics = unwrap_aliases(metrics) + + return ops.WindowAggregate( + self.table, + "hop", + self.time_col, + groups=groups, + metrics=metrics, + window_size=self.window_size, + offset=self.offset, ).to_expr() + + agg = aggregate diff --git a/ibis/tests/expr/test_temporal_windows.py b/ibis/tests/expr/test_temporal_windows.py index 928c26726d841..e69de29bb2d1d 100644 --- a/ibis/tests/expr/test_temporal_windows.py +++ b/ibis/tests/expr/test_temporal_windows.py @@ -1,76 +0,0 @@ -from __future__ import annotations - -import datetime - -import pytest - -import ibis -import ibis.common.exceptions as com -import ibis.expr.datatypes as dt -import ibis.expr.operations as ops -from ibis import selectors as s -from ibis.common.annotations import ValidationError -from ibis.common.deferred import _ - - -def test_tumble_tvf_schema(schema, table): - expr = table.window_by(time_col=table.i).tumble( - window_size=ibis.interval(minutes=15) - ) - expected_schema = ibis.schema( - schema - + [ - ("window_start", dt.Timestamp(scale=3)), - ("window_end", dt.Timestamp(scale=3)), - ("window_time", dt.Timestamp(scale=3)), - ] - ) - assert expr.schema() == expected_schema - - -@pytest.mark.parametrize("wrong_type_window_size", ["60", 60]) -def test_create_tumble_tvf_with_wrong_scalar_type(table, wrong_type_window_size): - with pytest.raises(ValidationError, match=".* is not coercible to a .*"): - table.window_by(time_col=table.i).tumble(window_size=wrong_type_window_size) - - -def test_create_tumble_tvf_with_nonexistent_time_col(table): - with pytest.raises(com.IbisTypeError, match="Column .* is not found in table"): - table.window_by(time_col=table["nonexistent"]).tumble( - window_size=datetime.timedelta(seconds=60) - ) - - -def test_create_tumble_tvf_with_nonscalar_window_size(schema): - schema.append(("l", "interval")) - table = ibis.table(schema, name="table") - with pytest.raises(ValidationError, match=".* is not coercible to a .*"): - table.window_by(time_col=table.i).tumble(window_size=table.l) - - -def test_create_tumble_tvf_with_non_timestamp_time_col(table): - with pytest.raises(ValidationError, match=".* is not coercible to a .*"): - table.window_by(time_col=table.e).tumble(window_size=ibis.interval(minutes=15)) - - -def test_create_tumble_tvf_with_str_time_col(table): - expr = table.window_by(time_col="i").tumble(window_size=ibis.interval(minutes=15)) - assert isinstance(expr.op(), ops.TumbleWindowingTVF) - assert expr.op().time_col == table.i.op() - - -@pytest.mark.parametrize("deferred", [_["i"], _.i]) -def test_create_tumble_tvf_with_deferred_time_col(table, deferred): - expr = table.window_by(time_col=deferred.resolve(table)).tumble( - window_size=ibis.interval(minutes=15) - ) - assert isinstance(expr.op(), ops.TumbleWindowingTVF) - assert expr.op().time_col == table.i.op() - - -def test_create_tumble_tvf_with_selector_time_col(table): - expr = table.window_by(time_col=s.c("i")).tumble( - window_size=ibis.interval(minutes=15) - ) - assert isinstance(expr.op(), ops.TumbleWindowingTVF) - assert expr.op().time_col == table.i.op()