From b1e25b3c4313a2512d721bcfbb35768b68abd439 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Mon, 8 Jun 2026 16:13:46 +0200 Subject: [PATCH] Ignore Polars dynamic predicates --- duckdb/polars_io.py | 107 ++++++++++++++++-- .../test_460_polars_lazy_dynamic_predicate.py | 101 +++++++++++++++++ 2 files changed, 199 insertions(+), 9 deletions(-) create mode 100644 tests/fast/arrow/test_460_polars_lazy_dynamic_predicate.py diff --git a/duckdb/polars_io.py b/duckdb/polars_io.py index 45582221..c5cad457 100644 --- a/duckdb/polars_io.py +++ b/duckdb/polars_io.py @@ -1,7 +1,7 @@ from __future__ import annotations # noqa: D100 -import contextlib import datetime +import io import json import typing from decimal import Decimal @@ -35,16 +35,95 @@ def _predicate_to_expression(predicate: pl.Expr) -> duckdb.Expression | None: """ # Serialize the Polars expression tree to JSON tree = json.loads(predicate.meta.serialize(format="json")) + return _tree_to_sql_expression(tree) + +def _tree_to_sql_expression(tree: _ExpressionTree) -> duckdb.Expression | None: + """Convert an already-parsed Polars expression tree to a DuckDB expression. + + Returns None if the tree contains a node we cannot translate to SQL. + """ try: - # Convert the tree to SQL - sql_filter = _pl_tree_to_sql(tree) - return duckdb.SQLExpression(sql_filter) + return duckdb.SQLExpression(_pl_tree_to_sql(tree)) except Exception: # If the conversion fails, we return None return None +# Polars "dynamic predicates" +# --------------------------- +# When a slice / TOP-N sits above a scan, polars' optimizer pushes a *dynamic +# predicate* into the scan, AND-ed onto any real filter. It is an internal +# optimizer node, not a materializable expression: it serializes as a `Display` +# node with `fmt_str` "dynamic_pred: ", and feeding it back into +# `DataFrame.filter()` panics polars (`unreachable!` in `expr_to_ir`). It is only +# an early-pruning hint -- the limit above the scan still runs -- so we drop it +# and keep the real predicate, which we MUST still apply (polars trusts the +# source and does not re-filter above it). Reported as duckdb-python#460; for +# polars-side context see: +# - https://github.com/pola-rs/polars/issues/21665 (why real + dynamic arrive AND-ed) +# - https://github.com/pola-rs/polars/issues/22252 (filter() panicking on un-lowerable nodes) +def _is_dynamic_predicate_node(node: typing.Any) -> bool: # noqa: ANN401 + """Return True if a serialized node is a dynamic predicate (see the note above). + + Detected by shape: a ``Display`` node whose ``fmt_str`` starts with ``dynamic_pred``. + """ + if not isinstance(node, dict): + return False + display = node.get("Display") + return ( + isinstance(display, dict) + and isinstance(display.get("fmt_str"), str) + and display["fmt_str"].startswith("dynamic_pred") + ) + + +def _tree_contains_dynamic_predicate(node: typing.Any) -> bool: # noqa: ANN401 + """Return True if the serialized expression tree contains a dynamic predicate anywhere.""" + if _is_dynamic_predicate_node(node): + return True + if isinstance(node, dict): + return any(_tree_contains_dynamic_predicate(child) for child in node.values()) + if isinstance(node, list): + return any(_tree_contains_dynamic_predicate(child) for child in node) + return False + + +def _strip_dynamic_predicates(tree: typing.Any) -> tuple[_ExpressionTree | None, bool]: # noqa: ANN401 + """Remove dynamic-predicate conjuncts from a serialized predicate tree. + + See the note above for what a dynamic predicate is and why we drop it. + + Returns ``(stripped_tree, removed)``. ``stripped_tree`` is ``None`` when the + predicate was purely dynamic. Raises ``NotImplementedError`` if a dynamic + predicate appears anywhere other than a top-level ``And`` conjunct — a shape + polars does not produce today, where the hint can neither be safely dropped + nor applied. + """ + if _is_dynamic_predicate_node(tree): + return None, True + if isinstance(tree, dict) and "BinaryExpr" in tree: + bin_expr = tree["BinaryExpr"] + if isinstance(bin_expr, dict) and bin_expr.get("op") == "And": + left, left_removed = _strip_dynamic_predicates(bin_expr["left"]) + right, right_removed = _strip_dynamic_predicates(bin_expr["right"]) + removed = left_removed or right_removed + if left is None: + return right, removed + if right is None: + return left, removed + return {"BinaryExpr": {**bin_expr, "left": left, "right": right}}, removed + if _tree_contains_dynamic_predicate(tree): + msg = "Cannot handle a polars dynamic predicate outside a top-level AND conjunct" + raise NotImplementedError(msg) + return tree, False + + +def _expression_from_tree(tree: _ExpressionTree) -> pl.Expr: + """Rebuild a polars expression from a serialized tree (inverse of meta.serialize).""" + return pl.Expr.deserialize(io.BytesIO(json.dumps(tree).encode()), format="json") + + def _pl_operation_to_sql(op: str) -> str: """Map Polars binary operation strings to SQL equivalents. @@ -286,6 +365,7 @@ def source_generator( batch_size: int | None, ) -> Iterator[pl.DataFrame]: duck_predicate = None + fallback_predicate = None relation_final = relation if with_columns is not None: cols = ",".join(map(_escape_sql_identifier, with_columns)) @@ -293,18 +373,27 @@ def source_generator( if n_rows is not None: relation_final = relation_final.limit(n_rows) if predicate is not None: - # We have a predicate, if possible, we push it down to DuckDB - with contextlib.suppress(AssertionError, KeyError): - duck_predicate = _predicate_to_expression(predicate) + # Strip any dynamic-predicate hint (see the dynamic-predicate note + # above); the real predicate must still be applied. + tree = json.loads(predicate.meta.serialize(format="json")) + real_tree, had_dynamic = _strip_dynamic_predicates(tree) + if real_tree is not None: + # We have a real predicate; if possible, push it down to DuckDB. + duck_predicate = _tree_to_sql_expression(real_tree) + if duck_predicate is None: + # Could not push it down: re-apply it polars-side. Rebuild the + # expression from the stripped tree so we never hand polars the + # dynamic node it cannot lower. + fallback_predicate = _expression_from_tree(real_tree) if had_dynamic else predicate # Try to pushdown filter, if one exists if duck_predicate is not None: relation_final = relation_final.filter(duck_predicate) results = relation_final.to_arrow_reader() if batch_size is None else relation_final.to_arrow_reader(batch_size) for record_batch in iter(results.read_next_batch, None): - if predicate is not None and duck_predicate is None: + if fallback_predicate is not None: # We have a predicate, but did not manage to push it down, we fallback here - yield pl.from_arrow(record_batch).filter(predicate) # type: ignore[arg-type,misc,unused-ignore] + yield pl.from_arrow(record_batch).filter(fallback_predicate) # type: ignore[arg-type,misc,unused-ignore] else: yield pl.from_arrow(record_batch) # type: ignore[misc,unused-ignore] diff --git a/tests/fast/arrow/test_460_polars_lazy_dynamic_predicate.py b/tests/fast/arrow/test_460_polars_lazy_dynamic_predicate.py new file mode 100644 index 00000000..74a6cc92 --- /dev/null +++ b/tests/fast/arrow/test_460_polars_lazy_dynamic_predicate.py @@ -0,0 +1,101 @@ +"""Regression test for duckdb-python issue #460 (half b). + +A LazyFrame produced by `rel.pl(lazy=True)` panics polars when a `sort` + +`limit` (a TOP-N / slice) is collected. With Polars >= 1.39 the planner pushes a +*dynamic predicate* down to the IO source. It arrives not as a real expression +but as a `Display` node (`fmt_str: "dynamic_pred: "`), and the fallback in +`duckdb/polars_io.py`'s `source_generator` tries to materialize it via +`pl.from_arrow(record_batch).filter(predicate)`. Polars's DSL->IR lowering hits +`unreachable!()` on that node shape and raises `pyo3_runtime.PanicException` +(`internal error: entered unreachable code`). + +Only reproduces on Polars >= 1.39 — earlier planners don't push a dynamic +predicate to the IO source, so the test is skipped below that version. +""" + +from __future__ import annotations + +import pytest + +import duckdb + +# Dynamic-predicate pushdown to IO sources starts at Polars 1.39; below that the +# bug is unreachable, so skip rather than xfail. +pl = pytest.importorskip("polars", minversion="1.39") +pytest.importorskip("pyarrow") + + +@pytest.mark.parametrize("engine", ["streaming", "in-memory"]) +def test_460_lazy_sort_limit_does_not_panic(engine: str) -> None: + df = pl.DataFrame({"x": [3, 1, 2]}) + conn = duckdb.connect() + try: + conn.register("df_registered", df) + lf = conn.sql("SELECT * FROM df_registered").pl(lazy=True) + # sort + limit makes polars push a (pure) dynamic predicate into the source. + out = lf.sort("x").limit(1).collect(engine=engine) + finally: + conn.close() + + assert out.to_dict(as_series=False) == {"x": [1]} + + +@pytest.mark.parametrize("engine", ["streaming", "in-memory"]) +def test_460_user_filter_combined_with_dynamic_predicate_is_not_dropped(engine: str) -> None: + # Polars ANDs the dynamic predicate onto the user's real filter into one + # pushed predicate. Stripping the hint must NOT drop the real filter, and + # polars does not re-filter above the source. Adversarial data: the global + # min-x row (x=1) fails the y>15 filter, so dropping the filter yields a + # different (wrong) row. + df = pl.DataFrame({"x": [3, 1, 2], "y": [100, 5, 50]}) + conn = duckdb.connect() + try: + conn.register("t", df) + lf = conn.sql("SELECT * FROM t").pl(lazy=True) + out = lf.filter(pl.col("y") > 15).sort("x").limit(1).collect(engine=engine) + finally: + conn.close() + + ref = df.lazy().filter(pl.col("y") > 15).sort("x").limit(1).collect() + assert out.to_dict(as_series=False) == ref.to_dict(as_series=False) + assert out.to_dict(as_series=False) == {"x": [2], "y": [50]} + + +# --- unit tests for the stripping logic ------------------------------------- + +import json # noqa: E402 + +from duckdb.polars_io import _strip_dynamic_predicates # noqa: E402 + +_DYN = {"Display": {"inputs": [{"Column": "x"}], "fmt_str": "dynamic_pred: abc-123"}} + + +def _tree(expr: pl.Expr) -> dict: + return json.loads(expr.meta.serialize(format="json")) + + +def test_strip_bare_dynamic_predicate_returns_none() -> None: + assert _strip_dynamic_predicates(_DYN) == (None, True) + + +def test_strip_leaves_real_predicate_untouched() -> None: + real = _tree(pl.col("y") > 15) + assert _strip_dynamic_predicates(real) == (real, False) + + +def test_strip_and_of_real_and_dynamic_keeps_real() -> None: + real = _tree(pl.col("y") > 15) + for tree in ( + {"BinaryExpr": {"left": real, "op": "And", "right": _DYN}}, + {"BinaryExpr": {"left": _DYN, "op": "And", "right": real}}, + ): + stripped, removed = _strip_dynamic_predicates(tree) + assert removed is True + assert stripped == real + + +def test_strip_raises_on_dynamic_outside_top_level_and() -> None: + # An OR with a dynamic predicate cannot be safely dropped or applied. + tree = {"BinaryExpr": {"left": _tree(pl.col("y") > 15), "op": "Or", "right": _DYN}} + with pytest.raises(NotImplementedError): + _strip_dynamic_predicates(tree)