Skip to content

Commit

Permalink
refactor(api): refactor the implementation of windowing
Browse files Browse the repository at this point in the history
  • Loading branch information
Chloe He committed May 15, 2024
1 parent 142c105 commit 84bac37
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 352 deletions.
72 changes: 26 additions & 46 deletions ibis/backends/flink/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,66 +142,46 @@ def _minimize_spec(start, end, spec):
spec.args["end_side"] = None
return spec

def visit_TumbleWindowingTVF(self, op, *, table, time_col, window_size, offset):
def visit_WindowAggregate(

Check warning on line 145 in ibis/backends/flink/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/compiler.py#L145

Added line #L145 was not covered by tests
self,
op,
*,
parent,
window_type,
time_col,
groups,
metrics,
window_size,
window_step,
offset,
):
if window_type == "tumble":
assert window_step is None

Check warning on line 159 in ibis/backends/flink/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/compiler.py#L159

Added line #L159 was not covered by tests

args = [
self.v[f"TABLE {table.this.sql(self.dialect)}"],
self.v[f"TABLE {parent.this.sql(self.dialect)}"],
# `time_col` has the table _alias_, instead of the table, but it is
# required to be bound to the table, this happens because of the
# way we construct the op in the tumble API using bind
#
# perhaps there's a better way to deal with this
self.f.descriptor(time_col.this),
window_step,
window_size,
offset,
]

return sg.select(
sge.Column(
this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True)
)
).from_(
self.f.table(self.f.tumble(*filter(None, args))).as_(
table.alias_or_name, quoted=True
)
)

def visit_HopWindowingTVF(
self, op, *, table, time_col, window_size, window_slide, offset
):
args = [
self.v[f"TABLE {table.this.sql(self.dialect)}"],
self.f.descriptor(time_col.this),
window_slide,
window_size,
offset,
]
return sg.select(
sge.Column(
this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True)
)
).from_(
self.f.table(self.f.hop(*filter(None, args))).as_(
table.alias_or_name, quoted=True
)
)
window_func = getattr(self.f, window_type)

Check warning on line 174 in ibis/backends/flink/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/compiler.py#L174

Added line #L174 was not covered by tests

def visit_CumulateWindowingTVF(
self, op, *, table, time_col, window_size, window_step, offset
):
args = [
self.v[f"TABLE {table.this.sql(self.dialect)}"],
self.f.descriptor(time_col.this),
window_step,
window_size,
offset,
]
return sg.select(
sge.Column(
this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True)
)
sg.column("window_start", table=parent.alias_or_name, quoted=True),
sg.column("window_end", table=parent.alias_or_name, quoted=True),
*self._cleanup_names(groups),
*self._cleanup_names(metrics),
copy=False,
).from_(
self.f.table(self.f.cumulate(*filter(None, args))).as_(
table.alias_or_name, quoted=True
self.f.table(window_func(*filter(None, args))).as_(
parent.alias_or_name, quoted=True
)
)

Expand Down
24 changes: 0 additions & 24 deletions ibis/backends/flink/tests/test_compiler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import annotations

from operator import methodcaller

import pytest
from pytest import param

Expand Down Expand Up @@ -105,28 +103,6 @@ def test_having(simple_table, assert_sql):
assert_sql(expr)


@pytest.mark.parametrize(
"method",
[
methodcaller("tumble", window_size=ibis.interval(minutes=15)),
methodcaller(
"hop",
window_size=ibis.interval(minutes=15),
window_slide=ibis.interval(minutes=1),
),
methodcaller(
"cumulate",
window_size=ibis.interval(minutes=1),
window_step=ibis.interval(seconds=10),
),
],
ids=["tumble", "hop", "cumulate"],
)
def test_windowing_tvf(simple_table, method, assert_sql):
expr = method(simple_table.window_by(time_col=simple_table.i))
assert_sql(expr)


def test_window_aggregation(simple_table, assert_sql):
expr = (
simple_table.window_by(time_col=simple_table.i)
Expand Down
1 change: 0 additions & 1 deletion ibis/expr/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ def fmt(op, **kwargs):


@fmt.register(ops.Relation)
@fmt.register(ops.WindowingTVF)
def _relation(op, parent=None, **kwargs):
if parent is None:
top = f"{op.__class__.__name__}\n"
Expand Down
71 changes: 21 additions & 50 deletions ibis/expr/operations/temporal_windows.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,38 @@
from __future__ import annotations

from typing import Optional
from typing import Literal, Optional

from public import public

import ibis.expr.datatypes as dt
from ibis.common.annotations import attribute
from ibis.common.collections import FrozenOrderedDict
from ibis.expr.operations.core import Column, Scalar # noqa: TCH001
from ibis.expr.operations.relations import Relation
from ibis.expr.operations.relations import Relation, Unaliased
from ibis.expr.schema import Schema


@public
class WindowingTVF(Relation):
"""Generic windowing table-valued function."""

# TODO(kszucs): rename to `parent`
table: Relation
time_col: Column[dt.Timestamp] # enforce timestamp column type here
class WindowAggregate(Relation):
parent: Relation
window_type: Literal["tumble", "hop"]
time_col: Unaliased[Column]
groups: FrozenOrderedDict[str, Unaliased[Column]]
metrics: FrozenOrderedDict[str, Unaliased[Scalar]]
window_size: Scalar[dt.Interval]
window_step: Optional[Scalar[dt.Interval]] = None
offset: Optional[Scalar[dt.Interval]] = None

@attribute
def values(self):
return self.table.fields
return FrozenOrderedDict({**self.groups, **self.metrics})

Check warning on line 28 in ibis/expr/operations/temporal_windows.py

View check run for this annotation

Codecov / codecov/patch

ibis/expr/operations/temporal_windows.py#L28

Added line #L28 was not covered by tests

@property
@attribute
def schema(self):
names = list(self.table.schema.names)
types = list(self.table.schema.types)

# The return value of windowing TVF is a new relation that includes all columns
# of original relation as well as additional 3 columns named “window_start”,
# “window_end”, “window_time” to indicate the assigned window

# TODO(kszucs): this looks like an implementation detail leaked from the
# flink backend
names.extend(["window_start", "window_end", "window_time"])
# window_start, window_end, window_time have type TIMESTAMP(3) in Flink
types.extend([dt.timestamp(scale=3)] * 3)

return Schema.from_tuples(list(zip(names, types)))


@public
class TumbleWindowingTVF(WindowingTVF):
"""TUMBLE window table-valued function."""

window_size: Scalar[dt.Interval]
offset: Optional[Scalar[dt.Interval]] = None


@public
class HopWindowingTVF(WindowingTVF):
"""HOP window table-valued function."""

window_size: Scalar[dt.Interval]
window_slide: Scalar[dt.Interval]
offset: Optional[Scalar[dt.Interval]] = None


@public
class CumulateWindowingTVF(WindowingTVF):
"""CUMULATE window table-valued function."""

window_size: Scalar[dt.Interval]
window_step: Scalar[dt.Interval]
offset: Optional[Scalar[dt.Interval]] = None
field_pairs = {
"window_start": dt.timestamp,
"window_end": dt.timestamp,
**{k: v.dtype for k, v in self.groups.items()},
**{k: v.dtype for k, v in self.metrics.items()},
}
return Schema(field_pairs)

Check warning on line 38 in ibis/expr/operations/temporal_windows.py

View check run for this annotation

Codecov / codecov/patch

ibis/expr/operations/temporal_windows.py#L38

Added line #L38 was not covered by tests
107 changes: 63 additions & 44 deletions ibis/expr/types/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def _regular_join_method(
def f( # noqa: D417
self: ir.Table,
right: ir.Table,
predicates: str
| Sequence[
str | tuple[str | ir.Column, str | ir.Column] | ir.BooleanValue
] = (),
predicates: (
str
| Sequence[str | tuple[str | ir.Column, str | ir.Column] | ir.BooleanValue]
) = (),
*,
lname: str = "",
rname: str = "{name}_right",
Expand Down Expand Up @@ -2136,10 +2136,12 @@ def select(
)
def relabel(
self,
substitutions: Mapping[str, str]
| Callable[[str], str | None]
| str
| Literal["snake_case", "ALL_CAPS"],
substitutions: (
Mapping[str, str]
| Callable[[str], str | None]
| str
| Literal["snake_case", "ALL_CAPS"]
),
) -> Table:
"""Deprecated in favor of `Table.rename`."""
if isinstance(substitutions, Mapping):
Expand All @@ -2148,11 +2150,13 @@ def relabel(

def rename(
self,
method: str
| Callable[[str], str | None]
| Literal["snake_case", "ALL_CAPS"]
| Mapping[str, str]
| None = None,
method: (
str
| Callable[[str], str | None]
| Literal["snake_case", "ALL_CAPS"]
| Mapping[str, str]
| None
) = None,
/,
**substitutions: str,
) -> Table:
Expand Down Expand Up @@ -2972,17 +2976,19 @@ def describe(
def join(
left: Table,
right: Table,
predicates: str
| Sequence[
predicates: (
str
| ir.BooleanColumn
| Literal[True]
| Literal[False]
| tuple[
str | ir.Column | ir.Deferred,
str | ir.Column | ir.Deferred,
| Sequence[
str
| ir.BooleanColumn
| Literal[True]
| Literal[False]
| tuple[
str | ir.Column | ir.Deferred,
str | ir.Column | ir.Deferred,
]
]
] = (),
) = (),
how: JoinKind = "inner",
*,
lname: str = "",
Expand Down Expand Up @@ -3520,9 +3526,9 @@ def pivot_longer(
*,
names_to: str | Iterable[str] = "name",
names_pattern: str | re.Pattern = r"(.+)",
names_transform: Callable[[str], ir.Value]
| Mapping[str, Callable[[str], ir.Value]]
| None = None,
names_transform: (
Callable[[str], ir.Value] | Mapping[str, Callable[[str], ir.Value]] | None
) = None,
values_to: str = "value",
values_transform: Callable[[ir.Value], ir.Value] | Deferred | None = None,
) -> Table:
Expand Down Expand Up @@ -4496,25 +4502,38 @@ def relocate(

return relocated

def window_by(self, time_col: ir.Value) -> WindowedTable:
"""Create a windowing table-valued function (TVF) expression.
Windowing table-valued functions (TVF) assign rows of a table to windows
based on a time attribute column in the table.
Parameters
----------
time_col
Column of the table that will be mapped to windows.
Returns
-------
WindowedTable
WindowedTable expression.
"""
from ibis.expr.types.temporal_windows import WindowedTable

return WindowedTable(self, time_col)
def window_by(
self,
window_type: Literal["tumble", "hop"],
time_col: str | ir.Value,
window_size: ir.IntervalScalar,
window_step: ir.IntervalScalar | None = None,
offset: ir.IntervalScalar | None = None,
) -> WindowedTable:
from ibis.expr.types.temporal_windows import HopTable, TumbleTable

Check warning on line 4513 in ibis/expr/types/relations.py

View check run for this annotation

Codecov / codecov/patch

ibis/expr/types/relations.py#L4513

Added line #L4513 was not covered by tests

time_col = next(iter(self.bind(time_col)))

Check warning on line 4515 in ibis/expr/types/relations.py

View check run for this annotation

Codecov / codecov/patch

ibis/expr/types/relations.py#L4515

Added line #L4515 was not covered by tests

if window_type == "tumble":
if window_step is not None:
raise com.IbisInputError(

Check warning on line 4519 in ibis/expr/types/relations.py

View check run for this annotation

Codecov / codecov/patch

ibis/expr/types/relations.py#L4519

Added line #L4519 was not covered by tests
"Tumble windows are non-overlapping and the window step is assumed "
"to be the same as the window size. If you want to create overlapping "
"windows, specify `window_type='hop'`."
)
return TumbleTable(self, time_col, window_size=window_size, offset=offset)

Check warning on line 4524 in ibis/expr/types/relations.py

View check run for this annotation

Codecov / codecov/patch

ibis/expr/types/relations.py#L4524

Added line #L4524 was not covered by tests
elif window_type == "hop":
return HopTable(

Check warning on line 4526 in ibis/expr/types/relations.py

View check run for this annotation

Codecov / codecov/patch

ibis/expr/types/relations.py#L4526

Added line #L4526 was not covered by tests
self,
time_col,
window_size=window_size,
window_step=window_step,
offset=offset,
)
else:
raise com.IbisInputError(

Check warning on line 4534 in ibis/expr/types/relations.py

View check run for this annotation

Codecov / codecov/patch

ibis/expr/types/relations.py#L4534

Added line #L4534 was not covered by tests
f"`window_type` must be `tumble` or `hop`, got {window_type}"
)


@public
Expand Down
Loading

0 comments on commit 84bac37

Please sign in to comment.