diff --git a/paimon-python/pypaimon/daft/daft_datasource.py b/paimon-python/pypaimon/daft/daft_datasource.py index f6fb6f8f4ce3..ec8ef02e3753 100644 --- a/paimon-python/pypaimon/daft/daft_datasource.py +++ b/paimon-python/pypaimon/daft/daft_datasource.py @@ -18,6 +18,7 @@ from __future__ import annotations +from dataclasses import dataclass import logging from typing import TYPE_CHECKING, Any from urllib.parse import urlparse @@ -37,6 +38,7 @@ from collections.abc import AsyncIterator from pypaimon.common.predicate import Predicate + from pypaimon.read.table_read import TableRead from pypaimon.read.split import Split from pypaimon.table.file_store_table import FileStoreTable @@ -50,6 +52,16 @@ PAIMON_FILE_FORMAT_AVRO = "avro" +@dataclass(frozen=True, slots=True) +class _ReadPushdownState: + reader_predicate: Predicate | None + planning_predicate: Predicate | None + requested_columns: list[str] | None + task_columns: list[str] | None + read_columns: list[str] | None + source_limit: int | None + + class _PaimonPKSplitTask(DataSourceTask): """DataSourceTask for PK-table splits that require LSM-tree merge. @@ -60,14 +72,16 @@ class _PaimonPKSplitTask(DataSourceTask): def __init__( self, - table: FileStoreTable, + table_read: TableRead, split: Split, schema: Schema, + output_columns: list[str] | None = None, blob_column_names: set[str] | None = None, ) -> None: - self._table = table + self._table_read = table_read self._split = split self._schema = schema + self._output_columns = output_columns self._blob_column_names = blob_column_names or set() @property @@ -75,9 +89,10 @@ def schema(self) -> Schema: return self._schema async def read(self) -> AsyncIterator[RecordBatch]: - table_read = self._table.new_read_builder().new_read() - reader = table_read.to_arrow_batch_reader([self._split]) + reader = self._table_read.to_arrow_batch_reader([self._split]) for batch in iter(reader.read_next_batch, None): + if self._output_columns is not None: + batch = batch.select(self._output_columns) if self._blob_column_names: batch = _convert_blob_columns(batch, self._blob_column_names) rb = RecordBatch.from_arrow_record_batches([batch], batch.schema) @@ -174,6 +189,7 @@ def __init__( ) self._paimon_predicate: Predicate | None = None + self._remaining_filters: list[PyExpr] | None = None @property def name(self) -> str: @@ -197,6 +213,7 @@ def push_filters(self, filters: list[PyExpr]) -> tuple[list[PyExpr], list[PyExpr pushed_filters, remaining_filters, paimon_predicate = convert_filters_to_paimon(self._table, filters) self._paimon_predicate = paimon_predicate + self._remaining_filters = remaining_filters if pushed_filters: logger.debug( @@ -213,18 +230,19 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask] read_table = self._table.copy({"blob-as-descriptor": "true"}) read_builder = read_table.new_read_builder() + read_pushdowns = self._read_pushdown_state(read_table, pushdowns) - if pushdowns.columns is not None: - read_builder = read_builder.with_projection(list(pushdowns.columns)) + if read_pushdowns.requested_columns is not None: + read_builder = read_builder.with_projection(read_pushdowns.requested_columns) - if pushdowns.limit is not None: - read_builder = read_builder.with_limit(pushdowns.limit) + if read_pushdowns.source_limit is not None: + read_builder = read_builder.with_limit(read_pushdowns.source_limit) - if self._paimon_predicate is not None: - read_builder = read_builder.with_filter(self._paimon_predicate) + if read_pushdowns.planning_predicate is not None: + read_builder = read_builder.with_filter(read_pushdowns.planning_predicate) logger.debug( "Applied Paimon filter pushdown predicate: %s", - self._paimon_predicate, + read_pushdowns.planning_predicate, ) if self._table.partition_keys and pushdowns.partition_filters is None: @@ -291,7 +309,18 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask] len(split.files), reason, ) - yield _PaimonPKSplitTask(read_table, split, self._schema, self._blob_column_names) + yield _PaimonPKSplitTask( + self._fallback_read_builder( + read_table, + read_pushdowns.read_columns, + read_pushdowns.source_limit, + read_pushdowns.reader_predicate, + ).new_read(), + split, + self._project_schema(read_pushdowns.task_columns), + read_pushdowns.task_columns, + self._blob_column_names, + ) def _build_file_uri(self, file_path: str) -> str: """Reconstruct a full URI from a (potentially scheme-stripped) file_path.""" @@ -314,3 +343,157 @@ def _build_partition_values(self, split: Split) -> daft.recordbatch.RecordBatch if not arrays: return None return daft.recordbatch.RecordBatch.from_pydict(arrays) + + def _valid_output_columns(self, columns: list[str] | None) -> list[str] | None: + if columns is None: + return None + schema_names = {field.name for field in self._schema} + return [name for name in columns if name in schema_names] + + def _task_columns( + self, + table: FileStoreTable, + output_columns: list[str] | None, + pushdowns: Pushdowns, + ) -> list[str] | None: + if output_columns is None: + return None + + task_columns = list(output_columns) + filter_required_column_names = getattr(pushdowns, "filter_required_column_names", None) + required_fields = filter_required_column_names() if filter_required_column_names else set() + return self._append_existing_columns(table, task_columns, required_fields) + + def _fallback_read_columns( + self, + table: FileStoreTable, + task_columns: list[str] | None, + paimon_predicate: Predicate | None, + ) -> list[str] | None: + if task_columns is None: + return None + + read_columns = list(task_columns) + if paimon_predicate is not None: + from pypaimon.read.push_down_utils import _get_all_fields + + return self._append_existing_columns(table, read_columns, _get_all_fields(paimon_predicate)) + + return read_columns + + @staticmethod + def _append_existing_columns( + table: FileStoreTable, + columns: list[str], + required_fields: set[str], + ) -> list[str]: + if not required_fields: + return columns + existing = set(columns) + columns.extend( + field.name + for field in table.fields + if field.name in required_fields and field.name not in existing + ) + return columns + + def _project_schema(self, columns: list[str] | None) -> Schema: + if columns is None: + return self._schema + field_map = {field.name: field for field in self._schema} + return Schema.from_field_name_and_types( + [(name, field_map[name].dtype) for name in columns if name in field_map] + ) + + def _fallback_read_builder( + self, + table: FileStoreTable, + read_columns: list[str] | None, + limit: int | None, + predicate: Predicate | None, + ) -> Any: + read_builder = table.new_read_builder() + if read_columns is not None: + read_builder = read_builder.with_projection(read_columns) + if limit is not None: + read_builder = read_builder.with_limit(limit) + if predicate is not None: + read_builder = read_builder.with_filter(predicate) + return read_builder + + def _read_pushdown_state( + self, + table: FileStoreTable, + pushdowns: Pushdowns, + ) -> _ReadPushdownState: + reader_predicate, filters_consumed = self._pushdown_filter_state(pushdowns) + planning_predicate = self._planning_predicate(reader_predicate) + requested_columns = self._valid_output_columns(pushdowns.columns) + task_columns = self._task_columns(table, requested_columns, pushdowns) + read_columns = self._fallback_read_columns(table, task_columns, reader_predicate) + source_limit = self._source_limit( + pushdowns, + reader_predicate, + planning_predicate, + filters_consumed, + ) + return _ReadPushdownState( + reader_predicate=reader_predicate, + planning_predicate=planning_predicate, + requested_columns=requested_columns, + task_columns=task_columns, + read_columns=read_columns, + source_limit=source_limit, + ) + + def _pushdown_filter_state(self, pushdowns: Pushdowns) -> tuple[Predicate | None, bool]: + if self._remaining_filters is not None: + return self._paimon_predicate, not self._remaining_filters + if pushdowns.filters is None: + return None, True + + py_expr = getattr(pushdowns.filters, "_expr", pushdowns.filters) + _, remaining_filters, paimon_predicate = convert_filters_to_paimon(self._table, [py_expr]) + return paimon_predicate, not remaining_filters + + def _planning_predicate(self, pushdown_predicate: Predicate | None) -> Predicate | None: + if pushdown_predicate is None: + return None + if not self._can_plan_predicate(pushdown_predicate): + return None + if self._paimon_predicate is not None or self._requires_fallback_reader(): + return pushdown_predicate + return None + + @staticmethod + def _source_limit( + pushdowns: Pushdowns, + reader_predicate: Predicate | None, + planning_predicate: Predicate | None, + filters_consumed: bool, + ) -> int | None: + if pushdowns.limit is None: + return None + if pushdowns.partition_filters is not None: + return None + if not filters_consumed: + return None + if reader_predicate is not None and planning_predicate is None: + return None + return pushdowns.limit + + def _requires_fallback_reader(self) -> bool: + return not self._is_parquet or self._has_blob_columns or self._table.is_primary_key_table + + def _can_plan_predicate(self, predicate: Predicate) -> bool: + # Missing value null-count stats make isNull unsafe for scan planning. + if not self._predicate_contains_is_null(predicate): + return True + return self._table.is_primary_key_table and not self._table.options.deletion_vectors_enabled() + + def _predicate_contains_is_null(self, predicate: Predicate) -> bool: + if predicate.method == "isNull": + return True + if predicate.method in ("and", "or"): + return any(self._predicate_contains_is_null(child) for child in predicate.literals or []) + return False diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py b/paimon-python/pypaimon/tests/daft/daft_data_test.py index e9e32efa9336..2eee5ba749a6 100644 --- a/paimon-python/pypaimon/tests/daft/daft_data_test.py +++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py @@ -24,6 +24,7 @@ from __future__ import annotations +import asyncio import decimal import pyarrow as pa @@ -57,6 +58,40 @@ def _write_to_paimon(table, arrow_table, mode="append", overwrite_partition=None table_commit.close() +async def _read_paimon_source_batches( + table, + filter_expr=None, + columns=None, + limit=None, + call_push_filters=True, +): + from daft import context, runners + from daft.daft import StorageConfig + from daft.io.pushdowns import Pushdowns + + from pypaimon.daft.daft_datasource import PaimonDataSource + + io_config = context.get_context().daft_planning_config.default_io_config + storage_config = StorageConfig(runners.get_or_create_runner().name != "ray", io_config) + source = PaimonDataSource(table, storage_config=storage_config, catalog_options={}) + + if filter_expr is not None and call_push_filters: + pushed_filters, remaining_filters = source.push_filters([filter_expr._expr]) + assert pushed_filters + assert not remaining_filters + + batches = [] + fallback_task_count = 0 + pushdowns = Pushdowns(filters=filter_expr, columns=columns, limit=limit) + async for task in source.get_tasks(pushdowns): + if type(task).__name__ == "_PaimonPKSplitTask": + fallback_task_count += 1 + async for batch in task.read(): + batches.append(batch.to_pydict()) + assert fallback_task_count > 0 + return batches + + # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @@ -312,6 +347,193 @@ def test_read_paimon_pk_table_deduplication(pk_table): assert id1_row[0][1] == "new_a" +def test_read_paimon_pk_fallback_applies_pushed_filter(pk_table): + """Fallback PK reads must apply filters pushed into the Paimon source.""" + table, _ = pk_table + batch1 = pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "name": pa.array(["old_a", "old_b"], pa.string()), + "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()), + } + ) + _write_to_paimon(table, batch1) + + batch2 = pa.table( + { + "id": pa.array([1], pa.int64()), + "name": pa.array(["new_a"], pa.string()), + "dt": pa.array(["2024-01-01"], pa.string()), + } + ) + _write_to_paimon(table, batch2) + + batches = asyncio.run(_read_paimon_source_batches(table, filter_expr=col("id") == 1)) + ids = [value for batch in batches for value in batch["id"]] + names = [value for batch in batches for value in batch["name"]] + + assert ids == [1] + assert names == ["new_a"] + + +def test_read_paimon_pk_fallback_filters_before_projection(pk_table): + """Fallback reads keep filter columns available for Daft's upper filter.""" + table, _ = pk_table + batch1 = pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "name": pa.array(["old_a", "old_b"], pa.string()), + "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()), + } + ) + _write_to_paimon(table, batch1) + + batch2 = pa.table( + { + "id": pa.array([1], pa.int64()), + "name": pa.array(["new_a"], pa.string()), + "dt": pa.array(["2024-01-01"], pa.string()), + } + ) + _write_to_paimon(table, batch2) + + batches = asyncio.run( + _read_paimon_source_batches( + table, + filter_expr=col("id") == 1, + columns=["name"], + ) + ) + + assert batches == [{"name": ["new_a"], "id": [1]}] + + +def test_read_paimon_fallback_plans_pushdown_filter_without_push_filters(local_paimon_catalog): + """Fallback planning must use Pushdowns.filters even if push_filters was not called.""" + catalog, _ = local_paimon_catalog + schema = pypaimon.Schema.from_pyarrow_schema( + pa.schema([ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ]), + options={ + "file.format": "avro", + "source.split.target-size": "800b", + "source.split.open-file-cost": "600b", + }, + ) + catalog.create_table("test_db.avro_pushdown_filter", schema, ignore_if_exists=False) + table = catalog.get_table("test_db.avro_pushdown_filter") + _write_to_paimon(table, pa.table({"id": [1], "name": ["first"]})) + _write_to_paimon(table, pa.table({"id": [999], "name": ["match"]})) + + batches = asyncio.run( + _read_paimon_source_batches( + table, + filter_expr=col("id") == 999, + limit=1, + call_push_filters=False, + ) + ) + + assert batches == [{"id": [999], "name": ["match"]}] + + +def test_read_paimon_fallback_keeps_limit_above_remaining_filter(local_paimon_catalog): + """Fallback reads must not apply limit before Daft evaluates remaining filters.""" + catalog, _ = local_paimon_catalog + schema = pypaimon.Schema.from_pyarrow_schema( + pa.schema([ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ]), + options={"file.format": "avro"}, + ) + catalog.create_table("test_db.avro_remaining_filter_limit", schema, ignore_if_exists=False) + table = catalog.get_table("test_db.avro_remaining_filter_limit") + _write_to_paimon(table, pa.table({"id": [1, 999], "name": ["first", "match"]})) + + result = _read_table(table).where(~(col("id") == 1)).limit(1).to_pydict() + + assert result == {"id": [999], "name": ["match"]} + + +def test_read_paimon_keeps_limit_above_partition_filter(append_only_table): + """Scan planning must not apply limit before datasource partition pruning.""" + table, _ = append_only_table + _write_to_paimon( + table, + pa.table( + { + "id": pa.array([1], pa.int64()), + "name": pa.array(["first"], pa.string()), + "value": pa.array([1.0], pa.float64()), + "dt": pa.array(["2024-01-01"], pa.string()), + } + ), + ) + _write_to_paimon( + table, + pa.table( + { + "id": pa.array([2], pa.int64()), + "name": pa.array(["match"], pa.string()), + "value": pa.array([2.0], pa.float64()), + "dt": pa.array(["2024-01-02"], pa.string()), + } + ), + ) + + result = _read_table(table).where(col("dt") == "2024-01-02").limit(1).to_pydict() + + assert result["id"] == [2] + + +def test_read_paimon_pk_fallback_filter_then_project_dataframe(pk_table): + """Daft may keep the filter above the source while pushing projection.""" + table, _ = pk_table + batch1 = pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "name": pa.array(["old_a", "old_b"], pa.string()), + "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()), + } + ) + _write_to_paimon(table, batch1) + + batch2 = pa.table( + { + "id": pa.array([1], pa.int64()), + "name": pa.array(["new_a"], pa.string()), + "dt": pa.array(["2024-01-01"], pa.string()), + } + ) + _write_to_paimon(table, batch2) + + result = _read_table(table).where(col("id") == 1).select("name").to_pydict() + + assert result == {"name": ["new_a"]} + + +def test_read_paimon_pk_fallback_applies_limit(pk_table): + """Fallback PK reads must use the same limit as the planning read builder.""" + table, _ = pk_table + data = pa.table( + { + "id": pa.array([1, 2, 3], pa.int64()), + "name": pa.array(["a", "b", "c"], pa.string()), + "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01"], pa.string()), + } + ) + _write_to_paimon(table, data) + _write_to_paimon(table, data) + + batches = asyncio.run(_read_paimon_source_batches(table, limit=1)) + ids = [value for batch in batches for value in batch["id"]] + + assert len(ids) == 1 + + # --------------------------------------------------------------------------- # Filter pushdown # ---------------------------------------------------------------------------