Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(api): refactor the implementation of windowing #9200

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
86 changes: 39 additions & 47 deletions ibis/backends/flink/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,66 +145,58 @@ def _minimize_spec(start, end, spec):
spec.args["end_side"] = None
return spec

def visit_TumbleWindowingTVF(self, op, *, table, time_col, window_size, offset):
def visit_WindowAggregate(
self,
op,
*,
parent,
window_type,
time_col,
groups,
metrics,
window_size,
window_slide,
window_offset,
):
if window_type == "tumble":
assert window_slide is None

args = [
self.v[f"TABLE {table.this.sql(self.dialect)}"],
self.v[f"TABLE {parent.this.sql(self.dialect)}"],
# `time_col` has the table _alias_, instead of the table, but it is
# required to be bound to the table, this happens because of the
# way we construct the op in the tumble API using bind
#
# perhaps there's a better way to deal with this
self.f.descriptor(time_col.this),
window_slide,
window_size,
offset,
window_offset,
]

return sg.select(
sge.Column(
this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True)
)
).from_(
self.f.table(self.f.tumble(*filter(None, args))).as_(
table.alias_or_name, quoted=True
)
)
window_func = getattr(self.f, window_type)

def visit_HopWindowingTVF(
self, op, *, table, time_col, window_size, window_slide, offset
):
args = [
self.v[f"TABLE {table.this.sql(self.dialect)}"],
self.f.descriptor(time_col.this),
window_slide,
window_size,
offset,
]
return sg.select(
sge.Column(
this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True)
)
).from_(
self.f.table(self.f.hop(*filter(None, args))).as_(
table.alias_or_name, quoted=True
)
# create column references to new columns generated by Flink in the output
window_start = sg.column(
"window_start", table=parent.alias_or_name, quoted=True
)

def visit_CumulateWindowingTVF(
self, op, *, table, time_col, window_size, window_step, offset
):
args = [
self.v[f"TABLE {table.this.sql(self.dialect)}"],
self.f.descriptor(time_col.this),
window_step,
window_size,
offset,
]
return sg.select(
sge.Column(
this=STAR, table=sg.to_identifier(table.alias_or_name, quoted=True)
window_end = sg.column("window_end", table=parent.alias_or_name, quoted=True)

return (
sg.select(
window_start,
window_end,
*self._cleanup_names(groups),
*self._cleanup_names(metrics),
copy=False,
)
.from_(
self.f.table(window_func(*filter(None, args))).as_(
parent.alias_or_name, quoted=True
)
)
).from_(
self.f.table(self.f.cumulate(*filter(None, args))).as_(
table.alias_or_name, quoted=True
.group_by(
*self._generate_groups([window_start, window_end, *groups.values()])
)
)

Expand Down
68 changes: 50 additions & 18 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,25 @@
if TYPE_CHECKING:
from pyflink.table import StreamTableEnvironment

TEST_TABLES["functional_alltypes"] = ibis.schema(
{
"id": "int32",
"bool_col": "boolean",
"tinyint_col": "int8",
"smallint_col": "int16",
"int_col": "int32",
"bigint_col": "int64",
"float_col": "float32",
"double_col": "float64",
"date_string_col": "string",
"string_col": "string",
"timestamp_col": "timestamp(3)", # overriding the higher level fixture with precision because Flink's
# watermark must use a field of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), where 'p' is from 0 to 3
"year": "int32",
"month": "int32",
}
)


def get_table_env(
local_env: bool,
Expand Down Expand Up @@ -152,24 +171,7 @@ def awards_players_schema():

@pytest.fixture
def functional_alltypes_schema():
return ibis.schema(
{
"id": "int32",
"bool_col": "boolean",
"tinyint_col": "int8",
"smallint_col": "int16",
"int_col": "int32",
"bigint_col": "int64",
"float_col": "float32",
"double_col": "float64",
"date_string_col": "string",
"string_col": "string",
"timestamp_col": "timestamp(3)", # overriding the higher level fixture with precision because Flink's
# watermark must use a field of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), where 'p' is from 0 to 3
"year": "int32",
"month": "int32",
}
)
return TEST_TABLES["functional_alltypes"]


@pytest.fixture
Expand All @@ -188,3 +190,33 @@ def generate_csv_configs(csv_file):
}

return generate_csv_configs


@pytest.fixture(scope="session")
def functional_alltypes_no_header(tmpdir_factory, data_dir):
file = tmpdir_factory.mktemp("data") / "functional_alltypes.csv"
with (
open(data_dir / "csv" / "functional_alltypes.csv") as reader,
open(str(file), mode="w") as writer,
):
reader.readline() # read the first line and discard it
for line in reader:
writer.write(line)
return file


@pytest.fixture(scope="session", autouse=True)
def functional_alltypes_with_watermark(con, functional_alltypes_no_header):
# create a streaming table with watermark for testing event-time based ops
t = con.create_table(
"functional_alltypes_with_watermark",
schema=TEST_TABLES["functional_alltypes"],
tbl_properties={
"connector": "filesystem",
"path": functional_alltypes_no_header,
"format": "csv",
},
watermark=ibis.watermark("timestamp_col", ibis.interval(seconds=10)),
temp=True,
)
return t
50 changes: 0 additions & 50 deletions ibis/backends/flink/tests/test_compiler.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
from __future__ import annotations

from operator import methodcaller

import pytest
from pytest import param

import ibis
from ibis.common.deferred import _


def test_sum(simple_table, assert_sql):
expr = simple_table.a.sum()
Expand Down Expand Up @@ -103,48 +98,3 @@ def test_having(simple_table, assert_sql):
.aggregate(simple_table.b.sum().name("b_sum"))
)
assert_sql(expr)


@pytest.mark.parametrize(
"method",
[
methodcaller("tumble", window_size=ibis.interval(minutes=15)),
methodcaller(
"hop",
window_size=ibis.interval(minutes=15),
window_slide=ibis.interval(minutes=1),
),
methodcaller(
"cumulate",
window_size=ibis.interval(minutes=1),
window_step=ibis.interval(seconds=10),
),
],
ids=["tumble", "hop", "cumulate"],
)
def test_windowing_tvf(simple_table, method, assert_sql):
expr = method(simple_table.window_by(time_col=simple_table.i))
assert_sql(expr)


def test_window_aggregation(simple_table, assert_sql):
expr = (
simple_table.window_by(time_col=simple_table.i)
.tumble(window_size=ibis.interval(minutes=15))
.group_by(["window_start", "window_end", "g"])
.aggregate(mean=_.d.mean())
)
assert_sql(expr)


def test_window_topn(simple_table, assert_sql):
expr = simple_table.window_by(time_col="i").tumble(
window_size=ibis.interval(seconds=600),
)["a", "b", "c", "d", "g", "window_start", "window_end"]
expr = expr.mutate(
rownum=ibis.row_number().over(
group_by=["window_start", "window_end"], order_by=ibis.desc("g")
)
)
expr = expr[expr.rownum <= 3]
assert_sql(expr)
Loading