Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 98 additions & 9 deletions duckdb/polars_io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations # noqa: D100

import contextlib
import datetime
import io
import json
import typing
from decimal import Decimal
Expand Down Expand Up @@ -35,16 +35,95 @@ def _predicate_to_expression(predicate: pl.Expr) -> duckdb.Expression | None:
"""
# Serialize the Polars expression tree to JSON
tree = json.loads(predicate.meta.serialize(format="json"))
return _tree_to_sql_expression(tree)


def _tree_to_sql_expression(tree: _ExpressionTree) -> duckdb.Expression | None:
"""Convert an already-parsed Polars expression tree to a DuckDB expression.

Returns None if the tree contains a node we cannot translate to SQL.
"""
try:
# Convert the tree to SQL
sql_filter = _pl_tree_to_sql(tree)
return duckdb.SQLExpression(sql_filter)
return duckdb.SQLExpression(_pl_tree_to_sql(tree))
except Exception:
# If the conversion fails, we return None
return None


# Polars "dynamic predicates"
# ---------------------------
# When a slice / TOP-N sits above a scan, polars' optimizer pushes a *dynamic
# predicate* into the scan, AND-ed onto any real filter. It is an internal
# optimizer node, not a materializable expression: it serializes as a `Display`
# node with `fmt_str` "dynamic_pred: <uuid>", and feeding it back into
# `DataFrame.filter()` panics polars (`unreachable!` in `expr_to_ir`). It is only
# an early-pruning hint -- the limit above the scan still runs -- so we drop it
# and keep the real predicate, which we MUST still apply (polars trusts the
# source and does not re-filter above it). Reported as duckdb-python#460; for
# polars-side context see:
# - https://github.com/pola-rs/polars/issues/21665 (why real + dynamic arrive AND-ed)
# - https://github.com/pola-rs/polars/issues/22252 (filter() panicking on un-lowerable nodes)
def _is_dynamic_predicate_node(node: typing.Any) -> bool: # noqa: ANN401
"""Return True if a serialized node is a dynamic predicate (see the note above).

Detected by shape: a ``Display`` node whose ``fmt_str`` starts with ``dynamic_pred``.
"""
if not isinstance(node, dict):
return False
display = node.get("Display")
return (
isinstance(display, dict)
and isinstance(display.get("fmt_str"), str)
and display["fmt_str"].startswith("dynamic_pred")
)


def _tree_contains_dynamic_predicate(node: typing.Any) -> bool: # noqa: ANN401
"""Return True if the serialized expression tree contains a dynamic predicate anywhere."""
if _is_dynamic_predicate_node(node):
return True
if isinstance(node, dict):
return any(_tree_contains_dynamic_predicate(child) for child in node.values())
if isinstance(node, list):
return any(_tree_contains_dynamic_predicate(child) for child in node)
return False


def _strip_dynamic_predicates(tree: typing.Any) -> tuple[_ExpressionTree | None, bool]: # noqa: ANN401
"""Remove dynamic-predicate conjuncts from a serialized predicate tree.

See the note above for what a dynamic predicate is and why we drop it.

Returns ``(stripped_tree, removed)``. ``stripped_tree`` is ``None`` when the
predicate was purely dynamic. Raises ``NotImplementedError`` if a dynamic
predicate appears anywhere other than a top-level ``And`` conjunct — a shape
polars does not produce today, where the hint can neither be safely dropped
nor applied.
"""
if _is_dynamic_predicate_node(tree):
return None, True
if isinstance(tree, dict) and "BinaryExpr" in tree:
bin_expr = tree["BinaryExpr"]
if isinstance(bin_expr, dict) and bin_expr.get("op") == "And":
left, left_removed = _strip_dynamic_predicates(bin_expr["left"])
right, right_removed = _strip_dynamic_predicates(bin_expr["right"])
removed = left_removed or right_removed
if left is None:
return right, removed
if right is None:
return left, removed
return {"BinaryExpr": {**bin_expr, "left": left, "right": right}}, removed
if _tree_contains_dynamic_predicate(tree):
msg = "Cannot handle a polars dynamic predicate outside a top-level AND conjunct"
raise NotImplementedError(msg)
return tree, False


def _expression_from_tree(tree: _ExpressionTree) -> pl.Expr:
"""Rebuild a polars expression from a serialized tree (inverse of meta.serialize)."""
return pl.Expr.deserialize(io.BytesIO(json.dumps(tree).encode()), format="json")


def _pl_operation_to_sql(op: str) -> str:
"""Map Polars binary operation strings to SQL equivalents.

Expand Down Expand Up @@ -286,25 +365,35 @@ def source_generator(
batch_size: int | None,
) -> Iterator[pl.DataFrame]:
duck_predicate = None
fallback_predicate = None
relation_final = relation
if with_columns is not None:
cols = ",".join(map(_escape_sql_identifier, with_columns))
relation_final = relation_final.project(cols)
if n_rows is not None:
relation_final = relation_final.limit(n_rows)
if predicate is not None:
# We have a predicate, if possible, we push it down to DuckDB
with contextlib.suppress(AssertionError, KeyError):
duck_predicate = _predicate_to_expression(predicate)
# Strip any dynamic-predicate hint (see the dynamic-predicate note
# above); the real predicate must still be applied.
tree = json.loads(predicate.meta.serialize(format="json"))
real_tree, had_dynamic = _strip_dynamic_predicates(tree)
if real_tree is not None:
# We have a real predicate; if possible, push it down to DuckDB.
duck_predicate = _tree_to_sql_expression(real_tree)
if duck_predicate is None:
# Could not push it down: re-apply it polars-side. Rebuild the
# expression from the stripped tree so we never hand polars the
# dynamic node it cannot lower.
fallback_predicate = _expression_from_tree(real_tree) if had_dynamic else predicate
# Try to pushdown filter, if one exists
if duck_predicate is not None:
relation_final = relation_final.filter(duck_predicate)
results = relation_final.to_arrow_reader() if batch_size is None else relation_final.to_arrow_reader(batch_size)

for record_batch in iter(results.read_next_batch, None):
if predicate is not None and duck_predicate is None:
if fallback_predicate is not None:
# We have a predicate, but did not manage to push it down, we fallback here
yield pl.from_arrow(record_batch).filter(predicate) # type: ignore[arg-type,misc,unused-ignore]
yield pl.from_arrow(record_batch).filter(fallback_predicate) # type: ignore[arg-type,misc,unused-ignore]
else:
yield pl.from_arrow(record_batch) # type: ignore[misc,unused-ignore]

Expand Down
101 changes: 101 additions & 0 deletions tests/fast/arrow/test_460_polars_lazy_dynamic_predicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Regression test for duckdb-python issue #460 (half b).

A LazyFrame produced by `rel.pl(lazy=True)` panics polars when a `sort` +
`limit` (a TOP-N / slice) is collected. With Polars >= 1.39 the planner pushes a
*dynamic predicate* down to the IO source. It arrives not as a real expression
but as a `Display` node (`fmt_str: "dynamic_pred: <uuid>"`), and the fallback in
`duckdb/polars_io.py`'s `source_generator` tries to materialize it via
`pl.from_arrow(record_batch).filter(predicate)`. Polars's DSL->IR lowering hits
`unreachable!()` on that node shape and raises `pyo3_runtime.PanicException`
(`internal error: entered unreachable code`).

Only reproduces on Polars >= 1.39 — earlier planners don't push a dynamic
predicate to the IO source, so the test is skipped below that version.
"""

from __future__ import annotations

import pytest

import duckdb

# Dynamic-predicate pushdown to IO sources starts at Polars 1.39; below that the
# bug is unreachable, so skip rather than xfail.
pl = pytest.importorskip("polars", minversion="1.39")
pytest.importorskip("pyarrow")


@pytest.mark.parametrize("engine", ["streaming", "in-memory"])
def test_460_lazy_sort_limit_does_not_panic(engine: str) -> None:
df = pl.DataFrame({"x": [3, 1, 2]})
conn = duckdb.connect()
try:
conn.register("df_registered", df)
lf = conn.sql("SELECT * FROM df_registered").pl(lazy=True)
# sort + limit makes polars push a (pure) dynamic predicate into the source.
out = lf.sort("x").limit(1).collect(engine=engine)
finally:
conn.close()

assert out.to_dict(as_series=False) == {"x": [1]}


@pytest.mark.parametrize("engine", ["streaming", "in-memory"])
def test_460_user_filter_combined_with_dynamic_predicate_is_not_dropped(engine: str) -> None:
# Polars ANDs the dynamic predicate onto the user's real filter into one
# pushed predicate. Stripping the hint must NOT drop the real filter, and
# polars does not re-filter above the source. Adversarial data: the global
# min-x row (x=1) fails the y>15 filter, so dropping the filter yields a
# different (wrong) row.
df = pl.DataFrame({"x": [3, 1, 2], "y": [100, 5, 50]})
conn = duckdb.connect()
try:
conn.register("t", df)
lf = conn.sql("SELECT * FROM t").pl(lazy=True)
out = lf.filter(pl.col("y") > 15).sort("x").limit(1).collect(engine=engine)
finally:
conn.close()

ref = df.lazy().filter(pl.col("y") > 15).sort("x").limit(1).collect()
assert out.to_dict(as_series=False) == ref.to_dict(as_series=False)
assert out.to_dict(as_series=False) == {"x": [2], "y": [50]}


# --- unit tests for the stripping logic -------------------------------------

import json # noqa: E402

from duckdb.polars_io import _strip_dynamic_predicates # noqa: E402

_DYN = {"Display": {"inputs": [{"Column": "x"}], "fmt_str": "dynamic_pred: abc-123"}}


def _tree(expr: pl.Expr) -> dict:
return json.loads(expr.meta.serialize(format="json"))


def test_strip_bare_dynamic_predicate_returns_none() -> None:
assert _strip_dynamic_predicates(_DYN) == (None, True)


def test_strip_leaves_real_predicate_untouched() -> None:
real = _tree(pl.col("y") > 15)
assert _strip_dynamic_predicates(real) == (real, False)


def test_strip_and_of_real_and_dynamic_keeps_real() -> None:
real = _tree(pl.col("y") > 15)
for tree in (
{"BinaryExpr": {"left": real, "op": "And", "right": _DYN}},
{"BinaryExpr": {"left": _DYN, "op": "And", "right": real}},
):
stripped, removed = _strip_dynamic_predicates(tree)
assert removed is True
assert stripped == real


def test_strip_raises_on_dynamic_outside_top_level_and() -> None:
# An OR with a dynamic predicate cannot be safely dropped or applied.
tree = {"BinaryExpr": {"left": _tree(pl.col("y") > 15), "op": "Or", "right": _DYN}}
with pytest.raises(NotImplementedError):
_strip_dynamic_predicates(tree)
Loading