Skip to content

Commit

Permalink
feat: introduce watermarks in ibis api
Browse files Browse the repository at this point in the history
  • Loading branch information
chloeh13q authored and jcrist committed Sep 12, 2023
1 parent 9e1109b commit eaaebb8
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 17 deletions.
5 changes: 5 additions & 0 deletions ibis/backends/flink/__init__.py
Expand Up @@ -28,6 +28,7 @@
from pyflink.table import TableEnvironment

import ibis.expr.types as ir
from ibis.expr.streaming import Watermark


class Backend(BaseBackend, CanCreateDatabase):
Expand Down Expand Up @@ -255,6 +256,7 @@ def create_table(
database: str | None = None,
catalog: str | None = None,
tbl_properties: dict | None = None,
watermark: Watermark | None = None,
temp: bool = False,
overwrite: bool = False,
) -> ir.Table:
Expand Down Expand Up @@ -289,6 +291,8 @@ def create_table(
Table properties used to create a table source/sink. The properties
are usually used to find and create the underlying connector. Accepts
dictionary of key-value pairs (key1=val1, key2=val2, ...).
watermark
Watermark strategy for the table, only applicable on sources.
temp
Whether a table is temporary or not
overwrite
Expand Down Expand Up @@ -331,6 +335,7 @@ def create_table(
table_name=name,
schema=schema,
tbl_properties=tbl_properties,
watermark=watermark,
temp=temp,
database=database,
catalog=catalog,
Expand Down
66 changes: 63 additions & 3 deletions ibis/backends/flink/ddl.py
Expand Up @@ -4,6 +4,7 @@

import sqlglot as sg

import ibis.expr.schema as sch
from ibis.backends.base.sql.ddl import (
CreateTableWithSchema,
DropObject,
Expand All @@ -12,10 +13,48 @@
_is_quoted,
is_fully_qualified,
)
from ibis.backends.base.sql.registry import quote_identifier
from ibis.backends.base.sql.registry import quote_identifier, type_to_sql_string
from ibis.backends.flink.utils import translate_literal

if TYPE_CHECKING:
import ibis.expr.schema as sch
from ibis.expr.streaming import Watermark


def format_schema(schema):
elements = [
_format_schema_element(name, t) for name, t in zip(schema.names, schema.types)
]
return "({})".format(",\n ".join(elements))


def _format_schema_element(name, t):
return f"{quote_identifier(name, force=True)} {type_to_flink_sql_string(t)}"


def type_to_flink_sql_string(tval):
if tval.is_timestamp():
return f"timestamp({tval.scale})"
else:
return type_to_sql_string(tval)


def _format_watermark_strategy(watermark: Watermark) -> str:
if watermark.allowed_delay is None:
return watermark.time_col
return f"{watermark.time_col} - {translate_literal(watermark.allowed_delay.op())}"


def format_schema_with_watermark(
schema: sch.Schema, watermark: Watermark | None = None
) -> str:
elements = [
_format_schema_element(name, t) for name, t in zip(schema.names, schema.types)
]
if watermark is not None:
elements.append(
f"WATERMARK FOR {watermark.time_col} AS {_format_watermark_strategy(watermark)}"
)
return "({})".format(",\n ".join(elements))


class _CatalogAwareBaseQualifiedSQLStatement:
Expand All @@ -39,6 +78,7 @@ def __init__(
table_name: str,
schema: sch.Schema,
tbl_properties: dict,
watermark: Watermark | None = None,
database: str | None = None,
catalog: str | None = None,
temp: bool = False,
Expand All @@ -56,6 +96,7 @@ def __init__(
)
self.catalog = catalog
self.temp = temp
self.watermark = watermark

def _storage(self) -> str:
return f"STORED AS {self.format}" if self.format else None
Expand All @@ -77,7 +118,26 @@ def _create_line(self) -> str:

@property
def _pieces(self):
yield from super()._pieces
if self.partition is not None:
main_schema = self.schema
part_schema = self.partition
if not isinstance(part_schema, sch.Schema):
part_fields = {name: self.schema[name] for name in part_schema}
part_schema = sch.Schema(part_fields)

to_delete = {name for name in self.partition if name in self.schema}
fields = {
name: dtype
for name, dtype in main_schema.items()
if name not in to_delete
}
main_schema = sch.Schema(fields)

yield format_schema_with_watermark(main_schema, self.watermark)
yield f"PARTITIONED BY {format_schema(part_schema)}"
else:
yield format_schema_with_watermark(self.schema, self.watermark)

yield self._format_tbl_properties()


Expand Down
61 changes: 50 additions & 11 deletions ibis/backends/flink/tests/test_ddl.py
Expand Up @@ -3,6 +3,7 @@
import pytest
from py4j.protocol import Py4JJavaError

import ibis
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch

Expand All @@ -22,13 +23,36 @@ def awards_players_schema():


@pytest.fixture
def awards_players_csv_connector_configs():
return {
"connector": "filesystem",
"path": "ci/ibis-testing-data/csv/awards_players.csv",
"format": "csv",
"csv.ignore-parse-errors": "true",
}
def functiona_alltypes_schema():
return sch.Schema(
{
"id": dt.int32,
"bool_col": dt.bool,
"smallint_col": dt.int16,
"int_col": dt.int32,
"bigint_col": dt.int64,
"float_col": dt.float32,
"double_col": dt.float64,
"date_string_col": dt.string,
"string_col": dt.string,
"timestamp_col": dt.timestamp(scale=3),
"year": dt.int32,
"month": dt.int32,
}
)


@pytest.fixture
def csv_connector_configs():
def generate_csv_configs(csv_file):
return {
"connector": "filesystem",
"path": f"ci/ibis-testing-data/csv/{csv_file}.csv",
"format": "csv",
"csv.ignore-parse-errors": "true",
}

return generate_csv_configs


def test_list_tables(con):
Expand All @@ -37,25 +61,25 @@ def test_list_tables(con):


def test_create_table_from_schema(
con, awards_players_schema, temp_table, awards_players_csv_connector_configs
con, awards_players_schema, temp_table, csv_connector_configs
):
new_table = con.create_table(
temp_table,
schema=awards_players_schema,
tbl_properties=awards_players_csv_connector_configs,
tbl_properties=csv_connector_configs("awards_players"),
)
assert temp_table in con.list_tables()
assert new_table.schema() == awards_players_schema


@pytest.mark.parametrize("temp", [True, False])
def test_create_table(
con, awards_players_schema, temp_table, awards_players_csv_connector_configs, temp
con, awards_players_schema, temp_table, csv_connector_configs, temp
):
con.create_table(
temp_table,
schema=awards_players_schema,
tbl_properties=awards_players_csv_connector_configs,
tbl_properties=csv_connector_configs("awards_players"),
temp=temp,
)
assert temp_table in con.list_tables()
Expand All @@ -67,3 +91,18 @@ def test_create_table(
con.drop_table(temp_table, temp=temp)

assert temp_table not in con.list_tables()


def test_create_source_table_with_watermark(
con, functiona_alltypes_schema, temp_table, csv_connector_configs
):
new_table = con.create_table(
temp_table,
schema=functiona_alltypes_schema,
tbl_properties=csv_connector_configs("functional_alltypes"),
watermark=ibis.watermark(
time_col="timestamp_col", allowed_delay=ibis.interval(seconds=15)
),
)
assert temp_table in con.list_tables()
assert new_table.schema() == functiona_alltypes_schema
1 change: 1 addition & 0 deletions ibis/backends/flink/tests/test_literals.py
Expand Up @@ -51,6 +51,7 @@ def test_string_literals(value, expected):
param(
ibis.interval(months=50), "INTERVAL '04-02' YEAR TO MONTH", id="50months"
),
param(ibis.interval(seconds=5), "INTERVAL '5' SECOND", id="5seconds"),
],
)
def test_translate_interval_literal(value, expected):
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/utils.py
Expand Up @@ -239,7 +239,8 @@ def _translate_interval(value, dtype):
if len(nonzero_interval_segments) == 1:
unit = next(iter(nonzero_interval_segments))
value = nonzero_interval_segments[unit]
return f"'{value}' {unit.value}{format_precision(value, unit)}"
precision = _calculate_precision(value)
return f"'{value}' {unit.name}{format_precision(precision, unit)}"

# YEAR TO MONTH, DAY TO SECOND
return interval.format_as_string()
Expand Down
24 changes: 22 additions & 2 deletions ibis/expr/api.py
Expand Up @@ -22,6 +22,7 @@
from ibis.expr.deferred import Deferred
from ibis.expr.schema import Schema
from ibis.expr.sql import parse_sql, show_sql, to_sql
from ibis.expr.streaming import Watermark
from ibis.expr.types import (
DateValue,
Expr,
Expand Down Expand Up @@ -157,6 +158,7 @@
"trailing_range_window",
"trailing_window",
"union",
"watermark",
"where",
"window",
"preceding",
Expand Down Expand Up @@ -1496,9 +1498,9 @@ def difference(table: ir.Table, *rest: ir.Table, distinct: bool = True):
Parameters
----------
table:
table
A table expression
*rest:
*rest
Additional table expressions
distinct
Only diff distinct rows not occurring in the calling table
Expand Down Expand Up @@ -1544,6 +1546,24 @@ def difference(table: ir.Table, *rest: ir.Table, distinct: bool = True):
return table.difference(*rest, distinct=distinct) if rest else table


def watermark(time_col: str, allowed_delay: ir.IntervalScalar) -> Watermark:
"""Return a watermark object.
Parameters
----------
time_col
The timestamp column that will be used to generate watermarks in event time processing.
allowed_delay
Length of time that events are allowed to be late.
Returns
-------
Watermark
A watermark object.
"""
return Watermark(time_col=time_col, allowed_delay=allowed_delay)


e = ops.E().to_expr()
pi = ops.Pi().to_expr()

Expand Down
9 changes: 9 additions & 0 deletions ibis/expr/streaming.py
@@ -0,0 +1,9 @@
from __future__ import annotations

import ibis.expr.types as ir # noqa: TCH001
from ibis.common.grounds import Concrete


class Watermark(Concrete):
time_col: str
allowed_delay: ir.IntervalScalar

0 comments on commit eaaebb8

Please sign in to comment.