Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 195 additions & 12 deletions paimon-python/pypaimon/daft/daft_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import annotations

from dataclasses import dataclass
import logging
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
Expand All @@ -37,6 +38,7 @@
from collections.abc import AsyncIterator

from pypaimon.common.predicate import Predicate
from pypaimon.read.table_read import TableRead
from pypaimon.read.split import Split
from pypaimon.table.file_store_table import FileStoreTable

Expand All @@ -50,6 +52,16 @@
PAIMON_FILE_FORMAT_AVRO = "avro"


@dataclass(frozen=True, slots=True)
class _ReadPushdownState:
reader_predicate: Predicate | None
planning_predicate: Predicate | None
requested_columns: list[str] | None
task_columns: list[str] | None
read_columns: list[str] | None
source_limit: int | None


class _PaimonPKSplitTask(DataSourceTask):
"""DataSourceTask for PK-table splits that require LSM-tree merge.

Expand All @@ -60,24 +72,27 @@ class _PaimonPKSplitTask(DataSourceTask):

def __init__(
self,
table: FileStoreTable,
table_read: TableRead,
split: Split,
schema: Schema,
output_columns: list[str] | None = None,
blob_column_names: set[str] | None = None,
) -> None:
self._table = table
self._table_read = table_read
self._split = split
self._schema = schema
self._output_columns = output_columns
self._blob_column_names = blob_column_names or set()

@property
def schema(self) -> Schema:
return self._schema

async def read(self) -> AsyncIterator[RecordBatch]:
table_read = self._table.new_read_builder().new_read()
reader = table_read.to_arrow_batch_reader([self._split])
reader = self._table_read.to_arrow_batch_reader([self._split])
for batch in iter(reader.read_next_batch, None):
if self._output_columns is not None:
batch = batch.select(self._output_columns)
if self._blob_column_names:
batch = _convert_blob_columns(batch, self._blob_column_names)
rb = RecordBatch.from_arrow_record_batches([batch], batch.schema)
Expand Down Expand Up @@ -174,6 +189,7 @@ def __init__(
)

self._paimon_predicate: Predicate | None = None
self._remaining_filters: list[PyExpr] | None = None

@property
def name(self) -> str:
Expand All @@ -197,6 +213,7 @@ def push_filters(self, filters: list[PyExpr]) -> tuple[list[PyExpr], list[PyExpr
pushed_filters, remaining_filters, paimon_predicate = convert_filters_to_paimon(self._table, filters)

self._paimon_predicate = paimon_predicate
self._remaining_filters = remaining_filters

if pushed_filters:
logger.debug(
Expand All @@ -213,18 +230,19 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]
read_table = self._table.copy({"blob-as-descriptor": "true"})

read_builder = read_table.new_read_builder()
read_pushdowns = self._read_pushdown_state(read_table, pushdowns)

if pushdowns.columns is not None:
read_builder = read_builder.with_projection(list(pushdowns.columns))
if read_pushdowns.requested_columns is not None:
read_builder = read_builder.with_projection(read_pushdowns.requested_columns)

if pushdowns.limit is not None:
read_builder = read_builder.with_limit(pushdowns.limit)
if read_pushdowns.source_limit is not None:
read_builder = read_builder.with_limit(read_pushdowns.source_limit)

if self._paimon_predicate is not None:
read_builder = read_builder.with_filter(self._paimon_predicate)
if read_pushdowns.planning_predicate is not None:
read_builder = read_builder.with_filter(read_pushdowns.planning_predicate)
logger.debug(
"Applied Paimon filter pushdown predicate: %s",
self._paimon_predicate,
read_pushdowns.planning_predicate,
)

if self._table.partition_keys and pushdowns.partition_filters is None:
Expand Down Expand Up @@ -291,7 +309,18 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]
len(split.files),
reason,
)
yield _PaimonPKSplitTask(read_table, split, self._schema, self._blob_column_names)
yield _PaimonPKSplitTask(
self._fallback_read_builder(
read_table,
read_pushdowns.read_columns,
read_pushdowns.source_limit,
read_pushdowns.reader_predicate,
).new_read(),
split,
self._project_schema(read_pushdowns.task_columns),
read_pushdowns.task_columns,
self._blob_column_names,
)

def _build_file_uri(self, file_path: str) -> str:
"""Reconstruct a full URI from a (potentially scheme-stripped) file_path."""
Expand All @@ -314,3 +343,157 @@ def _build_partition_values(self, split: Split) -> daft.recordbatch.RecordBatch
if not arrays:
return None
return daft.recordbatch.RecordBatch.from_pydict(arrays)

def _valid_output_columns(self, columns: list[str] | None) -> list[str] | None:
if columns is None:
return None
schema_names = {field.name for field in self._schema}
return [name for name in columns if name in schema_names]

def _task_columns(
self,
table: FileStoreTable,
output_columns: list[str] | None,
pushdowns: Pushdowns,
) -> list[str] | None:
if output_columns is None:
return None

task_columns = list(output_columns)
filter_required_column_names = getattr(pushdowns, "filter_required_column_names", None)
required_fields = filter_required_column_names() if filter_required_column_names else set()
return self._append_existing_columns(table, task_columns, required_fields)

def _fallback_read_columns(
self,
table: FileStoreTable,
task_columns: list[str] | None,
paimon_predicate: Predicate | None,
) -> list[str] | None:
if task_columns is None:
return None

read_columns = list(task_columns)
if paimon_predicate is not None:
from pypaimon.read.push_down_utils import _get_all_fields

return self._append_existing_columns(table, read_columns, _get_all_fields(paimon_predicate))

return read_columns

@staticmethod
def _append_existing_columns(
table: FileStoreTable,
columns: list[str],
required_fields: set[str],
) -> list[str]:
if not required_fields:
return columns
existing = set(columns)
columns.extend(
field.name
for field in table.fields
if field.name in required_fields and field.name not in existing
)
return columns

def _project_schema(self, columns: list[str] | None) -> Schema:
if columns is None:
return self._schema
field_map = {field.name: field for field in self._schema}
return Schema.from_field_name_and_types(
[(name, field_map[name].dtype) for name in columns if name in field_map]
)

def _fallback_read_builder(
self,
table: FileStoreTable,
read_columns: list[str] | None,
limit: int | None,
predicate: Predicate | None,
) -> Any:
read_builder = table.new_read_builder()
if read_columns is not None:
read_builder = read_builder.with_projection(read_columns)
if limit is not None:
read_builder = read_builder.with_limit(limit)
if predicate is not None:
read_builder = read_builder.with_filter(predicate)
return read_builder

def _read_pushdown_state(
self,
table: FileStoreTable,
pushdowns: Pushdowns,
) -> _ReadPushdownState:
reader_predicate, filters_consumed = self._pushdown_filter_state(pushdowns)
planning_predicate = self._planning_predicate(reader_predicate)
requested_columns = self._valid_output_columns(pushdowns.columns)
task_columns = self._task_columns(table, requested_columns, pushdowns)
read_columns = self._fallback_read_columns(table, task_columns, reader_predicate)
source_limit = self._source_limit(
pushdowns,
reader_predicate,
planning_predicate,
filters_consumed,
)
return _ReadPushdownState(
reader_predicate=reader_predicate,
planning_predicate=planning_predicate,
requested_columns=requested_columns,
task_columns=task_columns,
read_columns=read_columns,
source_limit=source_limit,
)

def _pushdown_filter_state(self, pushdowns: Pushdowns) -> tuple[Predicate | None, bool]:
if self._remaining_filters is not None:
return self._paimon_predicate, not self._remaining_filters
if pushdowns.filters is None:
return None, True

py_expr = getattr(pushdowns.filters, "_expr", pushdowns.filters)
_, remaining_filters, paimon_predicate = convert_filters_to_paimon(self._table, [py_expr])
return paimon_predicate, not remaining_filters

def _planning_predicate(self, pushdown_predicate: Predicate | None) -> Predicate | None:
if pushdown_predicate is None:
return None
if not self._can_plan_predicate(pushdown_predicate):
return None
if self._paimon_predicate is not None or self._requires_fallback_reader():
return pushdown_predicate
return None

@staticmethod
def _source_limit(
pushdowns: Pushdowns,
reader_predicate: Predicate | None,
planning_predicate: Predicate | None,
filters_consumed: bool,
) -> int | None:
if pushdowns.limit is None:
return None
if pushdowns.partition_filters is not None:
return None
if not filters_consumed:
return None
if reader_predicate is not None and planning_predicate is None:
return None
return pushdowns.limit

def _requires_fallback_reader(self) -> bool:
return not self._is_parquet or self._has_blob_columns or self._table.is_primary_key_table

def _can_plan_predicate(self, predicate: Predicate) -> bool:
# Missing value null-count stats make isNull unsafe for scan planning.
if not self._predicate_contains_is_null(predicate):
return True
return self._table.is_primary_key_table and not self._table.options.deletion_vectors_enabled()

def _predicate_contains_is_null(self, predicate: Predicate) -> bool:
if predicate.method == "isNull":
return True
if predicate.method in ("and", "or"):
return any(self._predicate_contains_is_null(child) for child in predicate.literals or [])
return False
Loading
Loading