Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 91 additions & 7 deletions paimon-python/pypaimon/read/read_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pypaimon.read.table_scan import TableScan
from pypaimon.schema.data_types import DataField
from pypaimon.table.special_fields import SpecialFields
from pypaimon.utils.projection import Projection, _is_row_type


class ReadBuilder:
Expand All @@ -34,15 +35,34 @@ def __init__(self, table):

self.table: FileStoreTable = table
self._predicate: Optional[Predicate] = None
# ``_projection`` is the user-facing list of names from
# :meth:`with_projection`. ``_nested_paths`` is the canonical
# integer-path form (length-1 for top level, longer for nested
# ROW children) used by every consumer in this module.
# ``with_nested_projection`` populates only the latter; the two
# never both hold meaningful state simultaneously.
self._projection: Optional[List[str]] = None
self._nested_paths: Optional[List[List[int]]] = None
self._limit: Optional[int] = None

def with_filter(self, predicate: Predicate) -> 'ReadBuilder':
self._predicate = predicate
return self

def with_projection(self, projection: List[str]) -> 'ReadBuilder':
"""Project to the given column names.

Names containing a dot (e.g. ``"struct.subfield"``) walk into ROW
children and are translated into a nested projection. Top-level-
only callers see the same observable behaviour as before — the
dotted form is opt-in. Unknown names are silently skipped to
preserve the pre-existing contract.
"""
self._projection = projection
if projection and any('.' in name for name in projection):
self._nested_paths = self._resolve_dotted_paths(projection)
else:
self._nested_paths = None
return self

def with_limit(self, limit: int) -> 'ReadBuilder':
Expand All @@ -60,19 +80,83 @@ def new_read(self) -> TableRead:
return TableRead(
table=self.table,
predicate=self._predicate,
read_type=self.read_type()
read_type=self.read_type(),
nested_name_paths=self._nested_name_paths(),
)

def _nested_name_paths(self) -> Optional[List[List[str]]]:
"""Resolve the current nested-projection state into a parallel list
of name paths against the underlying table schema. Returns ``None``
if the user only requested top-level projection (or no projection).
"""
if not self._nested_paths:
return None
table_fields = self.table.fields
if self.table.options.row_tracking_enabled():
table_fields = SpecialFields.row_type_with_row_tracking(table_fields)
return Projection.of(self._nested_paths).to_name_paths(table_fields)

def new_predicate_builder(self) -> PredicateBuilder:
return PredicateBuilder(self.read_type())

def read_type(self) -> List[DataField]:
table_fields = self.table.fields

if not self._projection:
if not self._projection and not self._nested_paths:
return table_fields
else:
if self.table.options.row_tracking_enabled():
table_fields = SpecialFields.row_type_with_row_tracking(table_fields)
field_map = {field.name: field for field in table_fields}
return [field_map[name] for name in self._projection if name in field_map]

if self.table.options.row_tracking_enabled():
table_fields = SpecialFields.row_type_with_row_tracking(table_fields)

if self._nested_paths:
return Projection.of(self._nested_paths).project(table_fields)

field_map = {field.name: field for field in table_fields}
return [field_map[name] for name in self._projection if name in field_map]

# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

def _resolve_dotted_paths(self, names: List[str]) -> List[List[int]]:
"""Translate dotted-name projection entries into integer paths
against the current table schema. Names without dots produce
length-1 paths.
"""
table_fields = self.table.fields
if self.table.options.row_tracking_enabled():
table_fields = SpecialFields.row_type_with_row_tracking(table_fields)
top_index = {f.name: i for i, f in enumerate(table_fields)}

paths: List[List[int]] = []
for name in names:
if '.' not in name:
if name not in top_index:
# Silently skip unknown names — preserves the
# pre-existing contract from the plain top-level path.
continue
paths.append([top_index[name]])
continue
parts = name.split('.')
top = parts[0]
if top not in top_index:
continue
path = [top_index[top]]
current_field = table_fields[path[0]]
ok = True
for part in parts[1:]:
if not _is_row_type(current_field.type):
ok = False
break
child_fields = current_field.type.fields
child_idx = next(
(i for i, f in enumerate(child_fields) if f.name == part),
-1)
if child_idx < 0:
ok = False
break
path.append(child_idx)
current_field = child_fields[child_idx]
if ok:
paths.append(path)
return paths
41 changes: 38 additions & 3 deletions paimon-python/pypaimon/read/reader/format_avro_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class FormatAvroReader(RecordBatchReader):
"""

def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], full_fields: List[DataField],
push_down_predicate: Any, batch_size: int = 1024):
push_down_predicate: Any, batch_size: int = 1024,
nested_name_paths: Optional[List[List[str]]] = None):
file_path_for_io = file_io.to_filesystem_path(file_path)
self._file = file_io.filesystem.open_input_file(file_path_for_io)
self._avro_reader = fastavro.reader(self._file)
Expand All @@ -47,13 +48,30 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], full
projected_data_fields = [full_fields_map[name] for name in read_fields]
self._schema = PyarrowFieldParser.from_paimon_schema(projected_data_fields)

# ``nested_name_paths`` is parallel to ``read_fields``. Top-level
# entries are length-1 paths and use the existing ``record.get``
# fast path; longer paths walk the record dict step-by-step. The
# path's first segment must be a real top-level Avro field —
# ``_get_fields_and_predicate`` upstream guarantees this.
if nested_name_paths is not None and len(nested_name_paths) != len(read_fields):
raise ValueError(
"nested_name_paths length {} does not match read_fields length {}".format(
len(nested_name_paths), len(read_fields)))
self._nested_name_paths = nested_name_paths
self._has_nested = bool(
nested_name_paths and any(len(p) > 1 for p in nested_name_paths))

def read_arrow_batch(self) -> Optional[RecordBatch]:
pydict_data = {name: [] for name in self._fields}
records_in_batch = 0

for record in self._avro_reader:
for col_name in self._fields:
pydict_data[col_name].append(record.get(col_name))
if self._has_nested:
for col_name, path in zip(self._fields, self._nested_name_paths):
pydict_data[col_name].append(_walk_avro_record(record, path))
else:
for col_name in self._fields:
pydict_data[col_name].append(record.get(col_name))
records_in_batch += 1
if records_in_batch >= self._batch_size:
break
Expand All @@ -76,3 +94,20 @@ def close(self):
if self._file:
self._file.close()
self._file = None


def _walk_avro_record(record, path: List[str]):
"""Walk a list of field names through an avro record dict, returning
the leaf value or ``None`` if any segment is missing or hits a
non-dict value. ``record`` is the top-level fastavro dict; nested
record fields surface as nested dicts.
"""
current = record
for name in path:
if current is None:
return None
if isinstance(current, dict):
current = current.get(name)
continue
return None
return current
96 changes: 83 additions & 13 deletions paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,81 @@ class FormatPyArrowReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_format: str, file_path: str,
read_fields: List[DataField],
push_down_predicate: Any, batch_size: int = 1024,
options: CoreOptions = None):
options: CoreOptions = None,
nested_name_paths: Optional[List[List[str]]] = None):
file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format, filesystem=file_io.filesystem)
self._file_format = file_format
self.read_fields = read_fields
self._read_field_names = [f.name for f in read_fields]

# Identify which fields exist in the file and which are missing
# ``nested_name_paths`` is parallel to ``read_fields``; when
# any path has length > 1 the scanner is invoked with a
# ``{flat_name: ds.field(*path)}`` column dict.
if nested_name_paths is not None and len(nested_name_paths) != len(read_fields):
raise ValueError(
"nested_name_paths length {} does not match read_fields length {}".format(
len(nested_name_paths), len(read_fields)))
self._nested_name_paths = nested_name_paths
has_nested_path = bool(
nested_name_paths and any(len(p) > 1 for p in nested_name_paths))

# Identify which fields exist in the file and which are missing.
# For nested projection, "exists" is determined by walking the
# whole path against the file schema; sub-field schema evolution
# (a leaf renamed or removed) shows up as ``missing`` and is
# served as a NULL column, mirroring the top-level handling.
file_schema = self.dataset.schema
file_schema_names = set(file_schema.names)
self.existing_fields = [f.name for f in read_fields if f.name in file_schema_names]
self.missing_fields = [f.name for f in read_fields if f.name not in file_schema_names]

# column name → VariantSchema for shredded columns that need assembly
if has_nested_path:
self.existing_fields = []
self.missing_fields = []
for f, path in zip(read_fields, nested_name_paths):
if _path_exists_in_arrow_schema(file_schema, path):
self.existing_fields.append(f.name)
else:
self.missing_fields.append(f.name)
else:
file_schema_names = set(file_schema.names)
self.existing_fields = [f.name for f in read_fields if f.name in file_schema_names]
self.missing_fields = [f.name for f in read_fields if f.name not in file_schema_names]

# column name → VariantSchema for shredded columns that need assembly.
# In nested mode we still want to reassemble shredded VARIANTs
# that were projected at the top level — only the columns actually
# reached via a length>1 path are skipped (those are sub-fields of
# some other struct, not VARIANTs themselves).
self._shredded_schemas: Dict[str, VariantSchema] = {}
if options is None or options.variant_shredding_enabled():
top_level_names = set(file_schema.names)
for name in self.existing_fields:
if name not in top_level_names:
continue
field_type = file_schema.field(name).type
if is_shredded_variant(field_type):
self._shredded_schemas[name] = build_variant_schema(field_type)

# Only pass existing fields to PyArrow scanner to avoid errors
self.reader = self.dataset.scanner(
columns=self.existing_fields,
filter=push_down_predicate,
batch_size=batch_size
).to_reader()
if has_nested_path:
# Dict-form columns let PyArrow read leaf fields out of nested
# structs without materialising the parent. The dict keys
# become the output column names — they're already flattened
# to ``a_b`` form by the upstream projection utility.
existing_set = set(self.existing_fields)
columns_dict = {}
for f, path in zip(read_fields, nested_name_paths):
if f.name in existing_set:
columns_dict[f.name] = ds.field(*path)
self.reader = self.dataset.scanner(
columns=columns_dict,
filter=push_down_predicate,
batch_size=batch_size
).to_reader()
else:
# Only pass existing fields to PyArrow scanner to avoid errors
self.reader = self.dataset.scanner(
columns=self.existing_fields,
filter=push_down_predicate,
batch_size=batch_size
).to_reader()

self._output_schema = (
PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields else None
Expand Down Expand Up @@ -169,3 +217,25 @@ def _cast_orc_time_columns(self, batch):
def close(self):
if self.reader is not None:
self.reader = None


def _path_exists_in_arrow_schema(schema: pa.Schema, path: List[str]) -> bool:
"""Walk ``path`` (a list of field names) through a PyArrow schema and
return whether every step exists. The first step is a top-level field
name; subsequent steps are struct child names. Missing leaves at any
depth (e.g. a renamed sub-field) yield ``False`` so the caller can
fall back to a NULL column instead of raising during scan setup.
"""
if not path:
return False
if path[0] not in schema.names:
return False
current_type = schema.field(path[0]).type
for name in path[1:]:
if not pa.types.is_struct(current_type):
return False
idx = current_type.get_field_index(name)
if idx < 0:
return False
current_type = current_type.field(idx).type
return True
Loading
Loading