Skip to content

Commit

Permalink
feat(clickhouse): enable support for working window functions
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Aug 29, 2022
1 parent c1b72ba commit 310a5a8
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 37 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.4"
services:
clickhouse:
image: clickhouse/clickhouse-server:22-alpine
image: clickhouse/clickhouse-server:22.6.6.16-alpine
ports:
- 8123:8123
- 9000:9000
Expand Down
39 changes: 21 additions & 18 deletions ibis/backends/clickhouse/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import ibis.expr.operations as ops
import ibis.expr.types as ir
import ibis.util as util
from ibis.backends.base.sql.registry import binary_infix
from ibis.backends.base.sql.registry import binary_infix, window
from ibis.backends.clickhouse.datatypes import serialize
from ibis.backends.clickhouse.identifiers import quote_identifier

Expand Down Expand Up @@ -685,6 +685,13 @@ def _struct_field(translator, expr):
return f"{translator.translate(op.arg)}.`{op.field}`"


def _nth_value(translator, expr):
op = expr.op()
arg = translator.translate(op.arg)
nth = translator.translate(op.nth)
return f"nth_value({arg}, ({nth}) + 1)"


# TODO: clickhouse uses different string functions
# for ascii and utf-8 encodings,

Expand Down Expand Up @@ -848,6 +855,17 @@ def _struct_field(translator, expr):
ops.Clip: _clip,
ops.StructField: _struct_field,
ops.StructColumn: _struct_column,
ops.Window: window.window,
ops.RowNumber: lambda *args: 'row_number()',
ops.DenseRank: lambda *args: 'dense_rank()',
ops.MinRank: lambda *args: 'rank()',
ops.Lag: window.shift_like('lagInFrame'),
ops.Lead: window.shift_like('leadInFrame'),
ops.FirstValue: _unary('first_value'),
ops.LastValue: _unary('last_value'),
ops.NthValue: _nth_value,
ops.Window: window.window,
ops.NTile: window.ntile,
}


Expand Down Expand Up @@ -896,28 +914,13 @@ def _day_of_week_index(translator, expr):


_unsupported_ops_list = [
ops.Window,
ops.DecimalPrecision,
ops.DecimalScale,
ops.BaseConvert,
ops.CumeDist,
ops.CumulativeSum,
ops.CumulativeMin,
ops.CumulativeMax,
ops.CumulativeMean,
ops.CumulativeAny,
ops.CumulativeAll,
ops.IdenticalTo,
ops.RowNumber,
ops.DenseRank,
ops.MinRank,
ops.CumeDist,
ops.PercentRank,
ops.FirstValue,
ops.LastValue,
ops.NthValue,
ops.Lag,
ops.Lead,
ops.NTile,
ops.ReductionVectorizedUDF,
]
_unsupported_ops = {k: _raise_error for k in _unsupported_ops_list}

Expand Down
112 changes: 94 additions & 18 deletions ibis/backends/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,28 @@ def calc_zscore(s):
lambda t, win: t.float_col.lead().over(win),
lambda t: t.float_col.shift(-1),
id='lead',
marks=pytest.mark.broken(
["clickhouse"],
reason="upstream is broken; returns all nulls",
),
),
param(
lambda t, win: t.id.rank().over(win),
lambda t: t.id.rank(method='min').astype('int64') - 1,
id='rank',
marks=pytest.mark.broken(
["clickhouse"],
reason="upstream is broken",
),
),
param(
lambda t, win: t.id.dense_rank().over(win),
lambda t: t.id.rank(method='dense').astype('int64') - 1,
id='dense_rank',
marks=pytest.mark.broken(
["clickhouse"],
reason="upstream is broken",
),
),
param(
lambda t, win: t.id.percent_rank().over(win),
Expand All @@ -52,12 +64,13 @@ def calc_zscore(s):
)
).reset_index(drop=True, level=[0]),
id='percent_rank',
marks=pytest.mark.notyet(["clickhouse"]),
),
param(
lambda t, win: t.id.cume_dist().over(win),
lambda t: t.id.rank(method='min') / t.id.transform(len),
id='cume_dist',
marks=pytest.mark.notimpl(["pyspark"]),
marks=pytest.mark.notimpl(["clickhouse", "pyspark"]),
),
param(
lambda t, win: t.float_col.ntile(buckets=7).over(win),
Expand Down Expand Up @@ -99,7 +112,13 @@ def calc_zscore(s):
lambda _, win: ibis.row_number().over(win),
lambda t: t.cumcount(),
id='row_number',
marks=pytest.mark.notimpl(["pandas"]),
marks=[
pytest.mark.notimpl(["pandas"]),
pytest.mark.broken(
["clickhouse"],
reason="upstream implementation cannot handle subtraction",
),
],
),
param(
lambda t, win: t.double_col.cumsum().over(win),
Expand Down Expand Up @@ -143,7 +162,14 @@ def calc_zscore(s):
),
id='cumnotany',
marks=pytest.mark.notyet(
("duckdb", 'impala', 'postgres', 'mysql', 'sqlite'),
(
"clickhouse",
"duckdb",
'impala',
'postgres',
'mysql',
'sqlite',
),
reason="notany() over window not supported",
),
),
Expand All @@ -167,7 +193,14 @@ def calc_zscore(s):
),
id='cumnotall',
marks=pytest.mark.notyet(
("duckdb", 'impala', 'postgres', 'mysql', 'sqlite'),
(
"clickhouse",
"duckdb",
'impala',
'postgres',
'mysql',
'sqlite',
),
reason="notall() over window not supported",
),
),
Expand Down Expand Up @@ -204,7 +237,7 @@ def calc_zscore(s):
),
],
)
@pytest.mark.notimpl(["clickhouse", "dask", "datafusion"])
@pytest.mark.notimpl(["dask", "datafusion"])
def test_grouped_bounded_expanding_window(
backend, alltypes, df, result_fn, expected_fn
):
Expand Down Expand Up @@ -244,14 +277,21 @@ def test_grouped_bounded_expanding_window(
id='mean_udf',
marks=[
pytest.mark.notimpl(
["duckdb", "impala", "mysql", "postgres", "sqlite"]
[
"clickhouse",
"duckdb",
"impala",
"mysql",
"postgres",
"sqlite",
]
)
],
),
],
)
# Some backends do not support non-grouped window specs
@pytest.mark.notimpl(["clickhouse", "dask", "datafusion"])
@pytest.mark.notimpl(["dask", "datafusion"])
def test_ungrouped_bounded_expanding_window(
backend, alltypes, df, result_fn, expected_fn
):
Expand All @@ -271,7 +311,7 @@ def test_ungrouped_bounded_expanding_window(
backend.assert_series_equal(left, right)


@pytest.mark.notimpl(["clickhouse", "dask", "datafusion", "pandas"])
@pytest.mark.notimpl(["dask", "datafusion", "pandas"])
def test_grouped_bounded_following_window(backend, alltypes, df):
window = ibis.window(
preceding=0,
Expand Down Expand Up @@ -326,7 +366,7 @@ def test_grouped_bounded_following_window(backend, alltypes, df):
),
],
)
@pytest.mark.notimpl(["clickhouse", "dask", "datafusion"])
@pytest.mark.notimpl(["dask", "datafusion"])
def test_grouped_bounded_preceding_window(backend, alltypes, df, window_fn):
window = window_fn(alltypes)

Expand Down Expand Up @@ -363,7 +403,15 @@ def test_grouped_bounded_preceding_window(backend, alltypes, df, window_fn):
lambda gb: (gb.double_col.transform('mean')),
id='mean_udf',
marks=pytest.mark.notimpl(
["dask", "duckdb", "impala", "mysql", "postgres", "sqlite"]
[
"clickhouse",
"dask",
"duckdb",
"impala",
"mysql",
"postgres",
"sqlite",
]
),
),
],
Expand All @@ -377,7 +425,7 @@ def test_grouped_bounded_preceding_window(backend, alltypes, df, window_fn):
param(False, id='unordered'),
],
)
@pytest.mark.notimpl(["clickhouse", "datafusion"])
@pytest.mark.notimpl(["datafusion"])
def test_grouped_unbounded_window(
backend, alltypes, df, result_fn, expected_fn, ordered
):
Expand Down Expand Up @@ -417,7 +465,12 @@ def test_grouped_unbounded_window(
lambda df: pd.Series([df.double_col.mean()] * len(df.double_col)),
True,
id='ordered-mean',
marks=pytest.mark.notimpl(["dask", "impala", "pandas"]),
marks=[
pytest.mark.notimpl(["dask", "impala", "pandas"]),
pytest.mark.broken(
["clickhouse"], reason="upstream appears broken"
),
],
),
param(
lambda t, win: t.double_col.mean().over(win),
Expand All @@ -432,6 +485,7 @@ def test_grouped_unbounded_window(
id='ordered-mean_udf',
marks=pytest.mark.notimpl(
[
"clickhouse",
"dask",
"duckdb",
"impala",
Expand All @@ -448,7 +502,14 @@ def test_grouped_unbounded_window(
False,
id='unordered-mean_udf',
marks=pytest.mark.notimpl(
["duckdb", "impala", "mysql", "postgres", "sqlite"]
[
"clickhouse",
"duckdb",
"impala",
"mysql",
"postgres",
"sqlite",
]
),
),
# Analytic ops
Expand All @@ -471,14 +532,16 @@ def test_grouped_unbounded_window(
lambda df: df.float_col.shift(-1),
True,
id='ordered-lead',
marks=pytest.mark.notimpl(["dask"]),
marks=pytest.mark.notimpl(["clickhouse", "dask"]),
),
param(
lambda t, win: t.float_col.lead().over(win),
lambda df: df.float_col.shift(-1),
False,
id='unordered-lead',
marks=pytest.mark.notimpl(["dask", "mysql", "pyspark"]),
marks=pytest.mark.notimpl(
["clickhouse", "dask", "mysql", "pyspark"]
),
),
param(
lambda t, win: calc_zscore(t.double_col).over(win),
Expand All @@ -487,6 +550,7 @@ def test_grouped_unbounded_window(
id='ordered-zscore_udf',
marks=pytest.mark.notimpl(
[
"clickhouse",
"dask",
"duckdb",
"impala",
Expand All @@ -504,13 +568,21 @@ def test_grouped_unbounded_window(
False,
id='unordered-zscore_udf',
marks=pytest.mark.notimpl(
["duckdb", "impala", "mysql", "postgres", "pyspark", "sqlite"]
[
"clickhouse",
"duckdb",
"impala",
"mysql",
"postgres",
"pyspark",
"sqlite",
]
),
),
],
)
# Some backends do not support non-grouped window specs
@pytest.mark.notimpl(["clickhouse", "datafusion"])
@pytest.mark.notimpl(["datafusion"])
def test_ungrouped_unbounded_window(
backend, alltypes, df, con, result_fn, expected_fn, ordered
):
Expand Down Expand Up @@ -541,7 +613,11 @@ def test_ungrouped_unbounded_window(
backend.assert_series_equal(left, right)


@pytest.mark.notimpl(["clickhouse", "dask", "datafusion", "impala", "pandas"])
@pytest.mark.notimpl(["dask", "datafusion", "impala", "pandas"])
@pytest.mark.notyet(
["clickhouse"],
reason="RANGE OFFSET frame for 'DB::ColumnNullable' ORDER BY column is not implemented", # noqa: E501
)
def test_grouped_bounded_range_window(backend, alltypes, df):
# Explanation of the range window spec below:
#
Expand Down

0 comments on commit 310a5a8

Please sign in to comment.