From 8e03f5fff787507d39981c524ee4c8ef5f23bb28 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 9 May 2026 15:38:43 +0800 Subject: [PATCH 1/3] [python] Add Projection utility and nested-projection API on ReadBuilder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a small ``Projection`` abstraction with three concrete shapes (empty / top-level / nested) that maps an integer-index or integer-path projection onto the table's flat ``DataField`` list. Nested paths flatten into top-level fields whose names are the underscore-joined original path (``a_b`` for ``a.b``, with a ``_$N`` suffix on collisions) and whose IDs are inherited from the leaf so schema-evolution remapping by field ID continues to work. ReadBuilder grows two API surfaces: - ``with_projection(['struct.subfield'])`` — names with a dot are resolved into integer paths against the table schema. Top-level-only callers are unchanged. Unknown names continue to be silently skipped. - ``with_nested_projection([[1, 0], [1, 1]])`` — low-level integer-path entry point. ``read_type()`` materialises the projected fields via ``Projection`` when nested paths are set, otherwise keeps the existing top-level name-based path. This commit only adds the utility, the new API, and unit tests. The file readers and SplitRead still see only top-level fields — file- level nested pushdown and PK-side outer projection follow in subsequent commits. --- paimon-python/pypaimon/read/read_builder.py | 83 +++++- .../pypaimon/tests/test_projection_utility.py | 205 ++++++++++++++ .../test_read_builder_nested_projection.py | 124 ++++++++ paimon-python/pypaimon/utils/projection.py | 267 ++++++++++++++++++ 4 files changed, 673 insertions(+), 6 deletions(-) create mode 100644 paimon-python/pypaimon/tests/test_projection_utility.py create mode 100644 paimon-python/pypaimon/tests/test_read_builder_nested_projection.py create mode 100644 paimon-python/pypaimon/utils/projection.py diff --git a/paimon-python/pypaimon/read/read_builder.py b/paimon-python/pypaimon/read/read_builder.py index d45a526a697e..524e5fabaf35 100644 --- a/paimon-python/pypaimon/read/read_builder.py +++ b/paimon-python/pypaimon/read/read_builder.py @@ -24,6 +24,7 @@ from pypaimon.read.table_scan import TableScan from pypaimon.schema.data_types import DataField from pypaimon.table.special_fields import SpecialFields +from pypaimon.utils.projection import Projection, _is_row_type class ReadBuilder: @@ -34,7 +35,14 @@ def __init__(self, table): self.table: FileStoreTable = table self._predicate: Optional[Predicate] = None + # ``_projection`` is the user-facing list of names from + # :meth:`with_projection`. ``_nested_paths`` is the canonical + # integer-path form (length-1 for top level, longer for nested + # ROW children) used by every consumer in this module. + # ``with_nested_projection`` populates only the latter; the two + # never both hold meaningful state simultaneously. self._projection: Optional[List[str]] = None + self._nested_paths: Optional[List[List[int]]] = None self._limit: Optional[int] = None def with_filter(self, predicate: Predicate) -> 'ReadBuilder': @@ -42,7 +50,19 @@ def with_filter(self, predicate: Predicate) -> 'ReadBuilder': return self def with_projection(self, projection: List[str]) -> 'ReadBuilder': + """Project to the given column names. + + Names containing a dot (e.g. ``"struct.subfield"``) walk into ROW + children and are translated into a nested projection. Top-level- + only callers see the same observable behaviour as before — the + dotted form is opt-in. Unknown names are silently skipped to + preserve the pre-existing contract. + """ self._projection = projection + if projection and any('.' in name for name in projection): + self._nested_paths = self._resolve_dotted_paths(projection) + else: + self._nested_paths = None return self def with_limit(self, limit: int) -> 'ReadBuilder': @@ -69,10 +89,61 @@ def new_predicate_builder(self) -> PredicateBuilder: def read_type(self) -> List[DataField]: table_fields = self.table.fields - if not self._projection: + if not self._projection and not self._nested_paths: return table_fields - else: - if self.table.options.row_tracking_enabled(): - table_fields = SpecialFields.row_type_with_row_tracking(table_fields) - field_map = {field.name: field for field in table_fields} - return [field_map[name] for name in self._projection if name in field_map] + + if self.table.options.row_tracking_enabled(): + table_fields = SpecialFields.row_type_with_row_tracking(table_fields) + + if self._nested_paths: + return Projection.of(self._nested_paths).project(table_fields) + + field_map = {field.name: field for field in table_fields} + return [field_map[name] for name in self._projection if name in field_map] + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _resolve_dotted_paths(self, names: List[str]) -> List[List[int]]: + """Translate dotted-name projection entries into integer paths + against the current table schema. Names without dots produce + length-1 paths. + """ + table_fields = self.table.fields + if self.table.options.row_tracking_enabled(): + table_fields = SpecialFields.row_type_with_row_tracking(table_fields) + top_index = {f.name: i for i, f in enumerate(table_fields)} + + paths: List[List[int]] = [] + for name in names: + if '.' not in name: + if name not in top_index: + # Silently skip unknown names — preserves the + # pre-existing contract from the plain top-level path. + continue + paths.append([top_index[name]]) + continue + parts = name.split('.') + top = parts[0] + if top not in top_index: + continue + path = [top_index[top]] + current_field = table_fields[path[0]] + ok = True + for part in parts[1:]: + if not _is_row_type(current_field.type): + ok = False + break + child_fields = current_field.type.fields + child_idx = next( + (i for i, f in enumerate(child_fields) if f.name == part), + -1) + if child_idx < 0: + ok = False + break + path.append(child_idx) + current_field = child_fields[child_idx] + if ok: + paths.append(path) + return paths diff --git a/paimon-python/pypaimon/tests/test_projection_utility.py b/paimon-python/pypaimon/tests/test_projection_utility.py new file mode 100644 index 000000000000..4d1c43eb1637 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_projection_utility.py @@ -0,0 +1,205 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest + +from pypaimon.schema.data_types import AtomicType, DataField, RowType +from pypaimon.utils.projection import (NestedProjection, Projection, + TopLevelProjection) + + +def _atomic(idx: int, name: str, type_name: str = 'BIGINT') -> DataField: + return DataField(idx, name, AtomicType(type_name)) + + +def _struct(idx: int, name: str, sub_fields) -> DataField: + return DataField(idx, name, RowType(False, list(sub_fields))) + + +def _three_top_fields(): + """schema: [pk: BIGINT, mv: ROW, val: STRING]""" + return [ + _atomic(1, 'pk'), + _struct(2, 'mv', [ + _atomic(10, 'latest_version'), + _atomic(11, 'latest_value', 'STRING'), + ]), + _atomic(3, 'val', 'STRING'), + ] + + +class TopLevelProjectionTest(unittest.TestCase): + + def test_factory_produces_top_level(self): + p = Projection.of([2, 0]) + self.assertIsInstance(p, TopLevelProjection) + self.assertFalse(p.is_nested()) + + def test_indexes_round_trip(self): + p = Projection.of([2, 0, 1]) + self.assertEqual(p.to_top_level_indexes(), [2, 0, 1]) + # nested form lifts each top-level index into a singleton path + self.assertEqual(p.to_nested_indexes(), [[2], [0], [1]]) + + def test_project_picks_fields_in_order(self): + fields = _three_top_fields() + res = Projection.of([2, 0]).project(fields) + self.assertEqual([f.name for f in res], ['val', 'pk']) + self.assertEqual([f.id for f in res], [3, 1]) + + def test_to_name_paths(self): + fields = _three_top_fields() + names = Projection.of([2, 0]).to_name_paths(fields) + self.assertEqual(names, [['val'], ['pk']]) + + def test_range_factory(self): + p = Projection.range(1, 4) + self.assertEqual(p.to_top_level_indexes(), [1, 2, 3]) + + def test_range_zero_or_negative_returns_empty(self): + self.assertFalse(Projection.range(2, 2).is_nested()) + self.assertEqual(Projection.range(2, 2).to_top_level_indexes(), []) + self.assertEqual(Projection.range(5, 1).to_top_level_indexes(), []) + + +class NestedProjectionTest(unittest.TestCase): + + def test_factory_produces_nested(self): + p = Projection.of([[1, 0], [1, 1]]) + self.assertIsInstance(p, NestedProjection) + self.assertTrue(p.is_nested()) + + def test_singleton_paths_reported_not_nested(self): + # paths of length 1 only — observable behaviour matches top level. + p = Projection.of([[2], [0]]) + self.assertIsInstance(p, NestedProjection) + self.assertFalse(p.is_nested()) + + def test_top_level_indexes_dedup_in_path_order(self): + p = Projection.of([[1, 0], [1, 1], [0]]) + self.assertEqual(p.to_top_level_indexes(), [1, 0]) + + def test_nested_indexes_round_trip(self): + p = Projection.of([[1, 0], [1, 1]]) + self.assertEqual(p.to_nested_indexes(), [[1, 0], [1, 1]]) + + def test_to_name_paths_walks_into_struct(self): + fields = _three_top_fields() + names = Projection.of([[1, 0], [1, 1], [0]]).to_name_paths(fields) + self.assertEqual( + names, + [['mv', 'latest_version'], ['mv', 'latest_value'], ['pk']]) + + def test_project_flattens_with_underscore_join(self): + fields = _three_top_fields() + res = Projection.of([[1, 0], [1, 1], [0]]).project(fields) + self.assertEqual( + [f.name for f in res], ['mv_latest_version', 'mv_latest_value', 'pk']) + + def test_project_preserves_leaf_field_id(self): + # Schema-evolution remapping is by field ID, so flattened nested + # fields must inherit the leaf's ID — not the parent struct's. + fields = _three_top_fields() + res = Projection.of([[1, 0], [1, 1]]).project(fields) + self.assertEqual([f.id for f in res], [10, 11]) + + def test_collision_dedup_via_dollar_suffix(self): + # Two leaves under different parents with the same final name. + sub_a = _atomic(20, 'x') + sub_b = _atomic(21, 'x') + fields = [ + _struct(1, 'a', [sub_a]), + _struct(2, 'b', [sub_b]), + ] + # path [0,0] -> 'a_x', path [1,0] -> 'b_x' (no collision yet). + res = Projection.of([[0, 0], [1, 0]]).project(fields) + self.assertEqual([f.name for f in res], ['a_x', 'b_x']) + + # When two collapse to the SAME name, the second gets `_$N`. + # Build two parents whose leaves have the same compound name. + sub_x_only = _atomic(30, 'x') + fields2 = [ + _struct(1, 'a', [sub_x_only]), + _atomic(2, 'a_x'), # plain top-level already named 'a_x'. + ] + res2 = Projection.of([[0, 0], [1]]).project(fields2) + # First path produces 'a_x'; second is already-existing 'a_x'. + # Collision → suffix on the second. + self.assertEqual(res2[0].name, 'a_x') + self.assertTrue(res2[1].name.startswith('a_x_$')) + + def test_project_rejects_non_row_step(self): + # Trying to walk into an atomic field must fail loudly. + fields = _three_top_fields() + with self.assertRaises(ValueError): + Projection.of([[0, 0]]).project(fields) + + def test_constructor_rejects_empty_paths_list(self): + with self.assertRaises(ValueError): + NestedProjection([]) + + def test_constructor_rejects_zero_length_path(self): + with self.assertRaises(ValueError): + NestedProjection([[]]) + + def test_dup_count_is_monotonic_across_distinct_collisions(self): + # ``_$N`` is a per-call monotonic counter — distinct collisions + # share the suffix space, they don't each restart at 0. + sub_x_1 = _atomic(20, 'x') + sub_x_2 = _atomic(21, 'x') + sub_y_1 = _atomic(30, 'y') + sub_y_2 = _atomic(31, 'y') + fields = [ + _atomic(1, 'a_x'), + _struct(2, 'a', [sub_x_1]), + _atomic(3, 'a_y'), + _struct(4, 'a', [sub_y_1]), # collides via path [3, 0] → a_y + _struct(5, 'a', [sub_x_2, sub_y_2]), # second a.x collision + ] + # Order: a_x (top) → keeps; [1, 0] flatten → a_x (collision) → a_x_$0 + # a_y (top) → keeps; [3, 0] flatten → a_y (collision) → a_y_$1 + res = Projection.of( + [[0], [1, 0], [2], [3, 0]]).project(fields) + self.assertEqual( + [f.name for f in res], ['a_x', 'a_x_$0', 'a_y', 'a_y_$1']) + + def test_of_rejects_mixed_int_and_path(self): + # Mixing top-level indexes and nested paths is a programming error; + # ``of`` should fail loudly at the call site instead of producing a + # broken projection that explodes downstream. + with self.assertRaises(TypeError): + Projection.of([1, [2, 3]]) + with self.assertRaises(TypeError): + Projection.of([[1, 2], 3]) + + +class EmptyProjectionTest(unittest.TestCase): + + def test_of_empty(self): + self.assertEqual(Projection.of([]).to_top_level_indexes(), []) + self.assertEqual(Projection.of([]).to_nested_indexes(), []) + + def test_empty_factory(self): + p = Projection.empty() + self.assertEqual(p.project(_three_top_fields()), []) + self.assertFalse(p.is_nested()) + self.assertEqual(p.to_name_paths(_three_top_fields()), []) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py b/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py new file mode 100644 index 000000000000..a781f149fa4b --- /dev/null +++ b/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py @@ -0,0 +1,124 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class _ReadBuilderTestBase(unittest.TestCase): + """Build a primary-key table whose value column is a ROW so we can + exercise both top-level and nested projection paths against the + actual ``ReadBuilder`` API.""" + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', False) + + struct_type = pa.struct([ + ('latest_version', pa.int64()), + ('latest_value', pa.string()), + ]) + cls.pa_schema = pa.schema([ + pa.field('pk', pa.int64(), nullable=False), + ('mv', struct_type), + ('val', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + cls.pa_schema, primary_keys=['pk'], + options={'bucket': '1', 'file.format': 'parquet'}) + cls.catalog.create_table('default.rb_nested', schema, False) + cls.table = cls.catalog.get_table('default.rb_nested') + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + +class ReadBuilderProjectionStateTest(_ReadBuilderTestBase): + + def test_no_projection_returns_full_schema(self): + rb = self.table.new_read_builder() + fields = rb.read_type() + names = [f.name for f in fields] + self.assertEqual(names, ['pk', 'mv', 'val']) + # Without an explicit projection the read_type must NOT inject + # row-tracking system columns; the raw table fields are returned + # verbatim. + self.assertNotIn('_ROW_ID', names) + self.assertNotIn('_SEQUENCE_NUMBER', names) + + def test_top_level_projection_unchanged(self): + rb = self.table.new_read_builder().with_projection(['val', 'pk']) + names = [f.name for f in rb.read_type()] + self.assertEqual(names, ['val', 'pk']) + # No nested paths derived; only names are stored. + self.assertIsNone(rb._nested_paths) + + def test_dotted_name_resolves_to_nested_path(self): + rb = self.table.new_read_builder().with_projection( + ['mv.latest_version', 'pk']) + # _nested_paths is populated; user-facing names are kept on _projection + self.assertIsNotNone(rb._nested_paths) + self.assertEqual(rb._nested_paths, [[1, 0], [0]]) + names = [f.name for f in rb.read_type()] + # Nested leaves get flattened to underscore-joined names. + self.assertEqual(names, ['mv_latest_version', 'pk']) + + def test_dotted_name_unknown_top_silently_skipped(self): + rb = self.table.new_read_builder().with_projection( + ['nope.x', 'val']) + # Only 'val' resolved; the dot trigger still populates _nested_paths. + self.assertEqual(rb._nested_paths, [[2]]) + names = [f.name for f in rb.read_type()] + self.assertEqual(names, ['val']) + + def test_dotted_name_unknown_subfield_silently_skipped(self): + rb = self.table.new_read_builder().with_projection( + ['mv.no_such_subfield', 'pk']) + # The bad path drops out, the plain name survives. + self.assertEqual(rb._nested_paths, [[0]]) + names = [f.name for f in rb.read_type()] + self.assertEqual(names, ['pk']) + + +class ReadBuilderProjectionFieldIdTest(_ReadBuilderTestBase): + + def test_nested_leaves_inherit_leaf_field_id(self): + rb = self.table.new_read_builder().with_projection( + ['mv.latest_version', 'mv.latest_value']) + leaf_ids = [f.id for f in rb.read_type()] + # Look up the actual leaf IDs from the table schema for assertion + mv_field = next(f for f in self.table.fields if f.name == 'mv') + sub_v = next(f for f in mv_field.type.fields + if f.name == 'latest_version') + sub_x = next(f for f in mv_field.type.fields + if f.name == 'latest_value') + self.assertEqual(leaf_ids, [sub_v.id, sub_x.id]) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/utils/projection.py b/paimon-python/pypaimon/utils/projection.py new file mode 100644 index 000000000000..c87da7ac2c7d --- /dev/null +++ b/paimon-python/pypaimon/utils/projection.py @@ -0,0 +1,267 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Column projection utilities. + +A projection maps a source row type to a flat list of ``DataField``: the +columns the user wants to read. Two flavours: + +* :class:`TopLevelProjection` selects fields by their top-level index. +* :class:`NestedProjection` accepts paths that walk into ROW children, e.g. + ``[[1, 0], [1, 2]]`` means "the 0th and 2nd children of the field at top + level index 1". The result is flattened into top-level fields whose + names are the underscore-joined original path (``a_b`` for ``a.b``, + with a ``_$N`` suffix on collisions) and whose IDs are inherited from + the leaf so schema-evolution remapping by field ID still works. +""" + +from abc import ABC, abstractmethod +from typing import List, Optional, Sequence + +from pypaimon.schema.data_types import DataField, RowType + + +class Projection(ABC): + """Abstract base for column projection.""" + + @abstractmethod + def project(self, row_type) -> List[DataField]: + """Apply the projection and return the resulting flat fields.""" + + @abstractmethod + def is_nested(self) -> bool: + """Whether any path goes deeper than the top level.""" + + @abstractmethod + def to_top_level_indexes(self) -> List[int]: + """Top-level positions touched by this projection. + + For nested projections, returns unique top-level indexes in path + order. Useful for fallback paths that can only push down at the + top level. + """ + + @abstractmethod + def to_nested_indexes(self) -> List[List[int]]: + """Return the projection as a list of paths, one per output field.""" + + @abstractmethod + def to_name_paths(self, row_type) -> List[List[str]]: + """Translate integer paths to field-name paths against ``row_type``. + + For a path ``[1, 0]`` against a row whose top-level field at index 1 + is a struct ``mv_col`` with sub-fields ``[LATEST_VERSION, ...]``, + returns ``[["mv_col", "LATEST_VERSION"], ...]``. Used by format + readers to push nested projection down to the underlying engine + (e.g. PyArrow's ``ds.field(*name_path)``). + """ + + # ------------------------------------------------------------------ + # Factories + # ------------------------------------------------------------------ + + @staticmethod + def empty() -> "Projection": + """The empty projection: no columns selected.""" + return _EmptyProjection() + + @staticmethod + def of(indexes_or_paths) -> "Projection": + """Build a projection from either ``int[]`` or ``int[][]``. + + Empty input returns :func:`empty`. The input must be uniformly + shaped — either all integers or all sequences of integers; mixing + the two raises ``TypeError`` so the failure is reported at the + ``of`` call site rather than as an opaque error deep in + ``project``. + """ + if not indexes_or_paths: + return _EmptyProjection() + first_is_path = isinstance(indexes_or_paths[0], (list, tuple)) + for entry in indexes_or_paths[1:]: + entry_is_path = isinstance(entry, (list, tuple)) + if entry_is_path != first_is_path: + raise TypeError( + "Projection.of expects either all top-level indexes " + "or all nested paths; got a mix") + if first_is_path: + return NestedProjection([list(p) for p in indexes_or_paths]) + return TopLevelProjection(list(indexes_or_paths)) + + @staticmethod + def range(start_inclusive: int, end_exclusive: int) -> "Projection": + """Top-level projection over a contiguous index range.""" + if end_exclusive <= start_inclusive: + return _EmptyProjection() + return TopLevelProjection(list(range(start_inclusive, end_exclusive))) + + +class _EmptyProjection(Projection): + + def project(self, row_type) -> List[DataField]: + return [] + + def is_nested(self) -> bool: + return False + + def to_top_level_indexes(self) -> List[int]: + return [] + + def to_nested_indexes(self) -> List[List[int]]: + return [] + + def to_name_paths(self, row_type) -> List[List[str]]: + return [] + + +class TopLevelProjection(Projection): + """Single-level projection: pick fields by their top-level index.""" + + def __init__(self, indexes: Sequence[int]): + self.indexes = list(indexes) + + def project(self, row_type) -> List[DataField]: + fields = _row_fields(row_type) + return [fields[i] for i in self.indexes] + + def is_nested(self) -> bool: + return False + + def to_top_level_indexes(self) -> List[int]: + return list(self.indexes) + + def to_nested_indexes(self) -> List[List[int]]: + return [[i] for i in self.indexes] + + def to_name_paths(self, row_type) -> List[List[str]]: + fields = _row_fields(row_type) + return [[fields[i].name] for i in self.indexes] + + +class NestedProjection(Projection): + """Projection over paths that may walk into ROW children. + + Each path navigates from a top-level field through successive ROW + children. A path of length 1 is equivalent to a top-level selection. + """ + + def __init__(self, paths: Sequence[Sequence[int]]): + if not paths: + raise ValueError("NestedProjection requires at least one path") + self.paths = [list(p) for p in paths] + for p in self.paths: + if len(p) == 0: + raise ValueError( + "Each projection path must have at least one index") + self._has_nested = any(len(p) > 1 for p in self.paths) + + def is_nested(self) -> bool: + return self._has_nested + + def to_top_level_indexes(self) -> List[int]: + # Preserve order, deduplicate. + seen = set() + out: List[int] = [] + for p in self.paths: + top = p[0] + if top not in seen: + seen.add(top) + out.append(top) + return out + + def to_nested_indexes(self) -> List[List[int]]: + return [list(p) for p in self.paths] + + def to_name_paths(self, row_type) -> List[List[str]]: + fields = _row_fields(row_type) + result: List[List[str]] = [] + for path in self.paths: + field = fields[path[0]] + names = [field.name] + for idx in path[1:]: + child_type = field.type + if not _is_row_type(child_type): + raise ValueError( + "Nested projection step expected a ROW type but got %s " + "for field '%s'" % (child_type, field.name)) + child_fields = _row_fields(child_type) + field = child_fields[idx] + names.append(field.name) + result.append(names) + return result + + def project(self, row_type) -> List[DataField]: + # ``dup_count`` is monotonic across the whole call: it increments + # whenever a name has to be suffixed, regardless of which base + # name caused the collision. The per-call counter only guarantees + # uniqueness, not that ``_$0`` always represents the first + # collision of any given base. + fields = _row_fields(row_type) + out: List[DataField] = [] + seen_names = set() + dup_count = 0 + for path in self.paths: + field = fields[path[0]] + name_parts = [field.name] + for idx in path[1:]: + child_type = field.type + if not _is_row_type(child_type): + raise ValueError( + "Nested projection step expected a ROW type but got %s " + "for field '%s'" % (child_type, field.name)) + child_fields = _row_fields(child_type) + field = child_fields[idx] + name_parts.append(field.name) + base_name = "_".join(name_parts) + final_name = base_name + while final_name in seen_names: + final_name = "%s_$%d" % (base_name, dup_count) + dup_count += 1 + seen_names.add(final_name) + # Keep the leaf field's ID so downstream schema-evolution + # remapping by field ID still works after rename. + out.append(DataField( + id=field.id, + name=final_name, + type=field.type, + description=getattr(field, 'description', None), + default_value=getattr(field, 'default_value', None), + )) + return out + + +def _row_fields(row_type) -> List[DataField]: + """Return the field list of a row-like type. Accepts a RowType, a plain + list of DataField, or anything else with a ``.fields`` attribute. + """ + if isinstance(row_type, list): + return row_type + fields: Optional[List[DataField]] = getattr(row_type, 'fields', None) + if fields is None: + raise ValueError( + "Projection target must be a RowType or have a .fields attribute, " + "got %s" % type(row_type).__name__) + return list(fields) + + +def _is_row_type(data_type) -> bool: + if isinstance(data_type, RowType): + return True + # Lightweight test stubs may report ``.fields`` without subclassing + # RowType — accept those too. + return getattr(data_type, 'fields', None) is not None From bddd66c67379ee8052d96ee42a9c7be8f66fee8b Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 9 May 2026 16:38:56 +0800 Subject: [PATCH 2/3] [python] Push down nested-field projection to PyArrow scanner for append-only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the ``Projection`` infrastructure from the previous commit through ``ReadBuilder.new_read`` → ``TableRead`` → ``SplitRead`` → ``FormatPyArrowReader`` so a request like ``with_projection(['mv.latest_version'])`` actually prunes the nested column at the file-read stage instead of materialising the parent struct and discarding the unwanted children. When at least one path has length > 1 the format reader switches from a list-form ``columns=...`` scanner to a dict-form one with ``ds.field(*path)`` expressions, mirroring the engine's own nested column-pruning surface. Sub-field schema evolution (a leaf renamed or removed) is detected up front via ``_path_exists_in_arrow_schema`` and the missing column is served as NULL — same shape as the existing top-level missing-field handling. SplitRead grows three small helpers and one bypass: * ``_nested_path_by_name`` indexes the user-facing flat names back onto their original-schema paths so ``file_reader_supplier`` can align ``nested_name_paths`` with the reader's ``ordered_read_fields``. * ``_get_fields_and_predicate`` checks the path's top-level name against the file schema instead of the flat name, otherwise nested fields would be filtered out before reaching the format reader. * ``_get_final_read_data_fields`` returns the user-facing flat names directly in nested mode; the trimmed-fields machinery cannot find matches by leaf field ID. * ``create_index_mapping`` returns identity in nested mode because the format reader already emits batches whose columns match ``read_fields`` exactly. Avro / Lance / Vortex / Blob raise ``NotImplementedError`` when a nested path reaches them — Avro's Python-side fallback ships in the next commit; the others have no nested-pruning support to mirror. Primary-key tables that go through ``MergeFileSplitRead`` likewise raise: an outer-projection layer that lets the merge function see complete parent structs is the next phase. Tests: ``test_nested_projection_e2e.py`` covers the dotted-name path, the low-level integer-path API, mixed projection ordering, the top-level fast path stays intact, and the PK-merge-path NotImplementedError guard. --- paimon-python/pypaimon/read/read_builder.py | 15 +- .../read/reader/format_pyarrow_reader.py | 96 +++++++-- paimon-python/pypaimon/read/split_read.py | 106 +++++++++- paimon-python/pypaimon/read/table_read.py | 32 ++- .../tests/test_nested_projection_e2e.py | 185 ++++++++++++++++++ 5 files changed, 411 insertions(+), 23 deletions(-) create mode 100644 paimon-python/pypaimon/tests/test_nested_projection_e2e.py diff --git a/paimon-python/pypaimon/read/read_builder.py b/paimon-python/pypaimon/read/read_builder.py index 524e5fabaf35..ce95115431ef 100644 --- a/paimon-python/pypaimon/read/read_builder.py +++ b/paimon-python/pypaimon/read/read_builder.py @@ -80,9 +80,22 @@ def new_read(self) -> TableRead: return TableRead( table=self.table, predicate=self._predicate, - read_type=self.read_type() + read_type=self.read_type(), + nested_name_paths=self._nested_name_paths(), ) + def _nested_name_paths(self) -> Optional[List[List[str]]]: + """Resolve the current nested-projection state into a parallel list + of name paths against the underlying table schema. Returns ``None`` + if the user only requested top-level projection (or no projection). + """ + if not self._nested_paths: + return None + table_fields = self.table.fields + if self.table.options.row_tracking_enabled(): + table_fields = SpecialFields.row_type_with_row_tracking(table_fields) + return Projection.of(self._nested_paths).to_name_paths(table_fields) + def new_predicate_builder(self) -> PredicateBuilder: return PredicateBuilder(self.read_type()) diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py index ddfb368c8899..0c12d27e4f95 100644 --- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py +++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py @@ -48,33 +48,81 @@ class FormatPyArrowReader(RecordBatchReader): def __init__(self, file_io: FileIO, file_format: str, file_path: str, read_fields: List[DataField], push_down_predicate: Any, batch_size: int = 1024, - options: CoreOptions = None): + options: CoreOptions = None, + nested_name_paths: Optional[List[List[str]]] = None): file_path_for_pyarrow = file_io.to_filesystem_path(file_path) self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format, filesystem=file_io.filesystem) self._file_format = file_format self.read_fields = read_fields self._read_field_names = [f.name for f in read_fields] - # Identify which fields exist in the file and which are missing + # ``nested_name_paths`` is parallel to ``read_fields``; when + # any path has length > 1 the scanner is invoked with a + # ``{flat_name: ds.field(*path)}`` column dict. + if nested_name_paths is not None and len(nested_name_paths) != len(read_fields): + raise ValueError( + "nested_name_paths length {} does not match read_fields length {}".format( + len(nested_name_paths), len(read_fields))) + self._nested_name_paths = nested_name_paths + has_nested_path = bool( + nested_name_paths and any(len(p) > 1 for p in nested_name_paths)) + + # Identify which fields exist in the file and which are missing. + # For nested projection, "exists" is determined by walking the + # whole path against the file schema; sub-field schema evolution + # (a leaf renamed or removed) shows up as ``missing`` and is + # served as a NULL column, mirroring the top-level handling. file_schema = self.dataset.schema - file_schema_names = set(file_schema.names) - self.existing_fields = [f.name for f in read_fields if f.name in file_schema_names] - self.missing_fields = [f.name for f in read_fields if f.name not in file_schema_names] - - # column name → VariantSchema for shredded columns that need assembly + if has_nested_path: + self.existing_fields = [] + self.missing_fields = [] + for f, path in zip(read_fields, nested_name_paths): + if _path_exists_in_arrow_schema(file_schema, path): + self.existing_fields.append(f.name) + else: + self.missing_fields.append(f.name) + else: + file_schema_names = set(file_schema.names) + self.existing_fields = [f.name for f in read_fields if f.name in file_schema_names] + self.missing_fields = [f.name for f in read_fields if f.name not in file_schema_names] + + # column name → VariantSchema for shredded columns that need assembly. + # In nested mode we still want to reassemble shredded VARIANTs + # that were projected at the top level — only the columns actually + # reached via a length>1 path are skipped (those are sub-fields of + # some other struct, not VARIANTs themselves). self._shredded_schemas: Dict[str, VariantSchema] = {} if options is None or options.variant_shredding_enabled(): + top_level_names = set(file_schema.names) for name in self.existing_fields: + if name not in top_level_names: + continue field_type = file_schema.field(name).type if is_shredded_variant(field_type): self._shredded_schemas[name] = build_variant_schema(field_type) - # Only pass existing fields to PyArrow scanner to avoid errors - self.reader = self.dataset.scanner( - columns=self.existing_fields, - filter=push_down_predicate, - batch_size=batch_size - ).to_reader() + if has_nested_path: + # Dict-form columns let PyArrow read leaf fields out of nested + # structs without materialising the parent. The dict keys + # become the output column names — they're already flattened + # to ``a_b`` form by the upstream projection utility. + existing_set = set(self.existing_fields) + columns_dict = {} + for f, path in zip(read_fields, nested_name_paths): + if f.name in existing_set: + columns_dict[f.name] = ds.field(*path) + self.reader = self.dataset.scanner( + columns=columns_dict, + filter=push_down_predicate, + batch_size=batch_size + ).to_reader() + else: + # Only pass existing fields to PyArrow scanner to avoid errors + self.reader = self.dataset.scanner( + columns=self.existing_fields, + filter=push_down_predicate, + batch_size=batch_size + ).to_reader() self._output_schema = ( PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields else None @@ -169,3 +217,25 @@ def _cast_orc_time_columns(self, batch): def close(self): if self.reader is not None: self.reader = None + + +def _path_exists_in_arrow_schema(schema: pa.Schema, path: List[str]) -> bool: + """Walk ``path`` (a list of field names) through a PyArrow schema and + return whether every step exists. The first step is a top-level field + name; subsequent steps are struct child names. Missing leaves at any + depth (e.g. a renamed sub-field) yield ``False`` so the caller can + fall back to a NULL column instead of raising during scan setup. + """ + if not path: + return False + if path[0] not in schema.names: + return False + current_type = schema.field(path[0]).type + for name in path[1:]: + if not pa.types.is_struct(current_type): + return False + idx = current_type.get_field_index(name) + if idx < 0: + return False + current_type = current_type.field(idx).type + return True diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index c88f49f3b065..09281a17f64e 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -19,7 +19,7 @@ import os from abc import ABC, abstractmethod from functools import partial -from typing import Callable, List, Optional, Tuple +from typing import Callable, Dict, List, Optional, Tuple from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.predicate import Predicate @@ -89,7 +89,8 @@ def __init__( predicate: Optional[Predicate], read_type: List[DataField], split: Split, - row_tracking_enabled: bool): + row_tracking_enabled: bool, + nested_name_paths: Optional[List[List[str]]] = None): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table @@ -98,6 +99,9 @@ def __init__( self.split = split self.row_tracking_enabled = row_tracking_enabled self.value_arity = len(read_type) + # Parallel to ``read_type``; each entry is the original-schema + # name path. ``None`` when no projection or top-level only. + self.nested_name_paths = nested_name_paths self.trimmed_primary_key = self.table.trimmed_primary_keys self.read_fields = read_type @@ -127,6 +131,21 @@ def __init__( else: self.predicate_for_reader = None + def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]: + """``{flat_name: original_path}`` for the rows of ``read_type`` + that go through a nested path (``len > 1``). ``None`` when no + such path exists, so callers stay on the top-level fast path. + """ + if not self.nested_name_paths: + return None + if not any(len(p) > 1 for p in self.nested_name_paths): + return None + out: Dict[str, List[str]] = {} + for f, path in zip(self.read_fields[:self.value_arity], + self.nested_name_paths): + out[f.name] = path + return out + def _push_down_predicate(self) -> Optional[Predicate]: if self.predicate is None: return None @@ -168,22 +187,46 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, end = r.to - file.first_row_id row_indices.extend(range(start, end + 1)) + # Map the parallel ``self.nested_name_paths`` (aligned with + # ``self.read_fields``) into the same order the format reader + # will see, indexed by output column name. Set to ``None`` when + # no path has length > 1 so the reader stays on its top-level + # fast path. + nested_path_by_name = self._nested_path_by_name() + has_nested = nested_path_by_name is not None + format_reader: RecordBatchReader if file_format == CoreOptions.FILE_FORMAT_AVRO: + if has_nested: + # Avro has no native nested column pruning; a Python-side + # walk-each-record fallback ships in a separate commit. + raise NotImplementedError( + "Nested-field projection on Avro is not yet supported") format_reader = FormatAvroReader(self.table.file_io, file_path, read_file_fields, self.read_fields, read_arrow_predicate, batch_size=batch_size) elif file_format == CoreOptions.FILE_FORMAT_BLOB: + if has_nested: + raise NotImplementedError( + "Nested-field projection is not supported on BLOB files") blob_as_descriptor = CoreOptions.blob_as_descriptor(self.table.options) format_reader = FormatBlobReader(self.table.file_io, file_path, read_file_fields, self.read_fields, read_arrow_predicate, blob_as_descriptor, batch_size=batch_size) elif file_format == CoreOptions.FILE_FORMAT_LANCE: + if has_nested: + # Lance has no nested column pruning today; project the + # parent struct in full and extract sub-fields client-side. + raise NotImplementedError( + "Nested-field projection is not supported on Lance files") name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] format_reader = FormatLanceReader(self.table.file_io, file_path, ordered_read_fields, read_arrow_predicate, batch_size=batch_size, row_range=row_range, row_indices=row_indices) elif file_format == CoreOptions.FILE_FORMAT_VORTEX: + if has_nested: + raise NotImplementedError( + "Nested-field projection is not supported on Vortex files") name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] predicate_fields = _get_all_fields(self.push_down_predicate) if self.push_down_predicate else set() @@ -194,10 +237,15 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC: name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] + ordered_nested_paths = ( + [nested_path_by_name[f.name] for f in ordered_read_fields] + if has_nested else None + ) format_reader = FormatPyArrowReader( self.table.file_io, file_format, file_path, ordered_read_fields, read_arrow_predicate, batch_size=batch_size, - options=self.table.options) + options=self.table.options, + nested_name_paths=ordered_nested_paths) elif file_format in ('json', 'csv'): raise NotImplementedError( f"Reading '{file_format}' format is not yet supported in Python SDK. " @@ -251,6 +299,10 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, return reader def _get_fields_and_predicate(self, schema_id: int, read_fields): + # In nested mode the flat name (``mv_latest_version``) never + # appears in the file schema; reachability is decided by the + # path's top-level entry instead. + nested_path_by_name = self._nested_path_by_name() key = (schema_id, tuple(read_fields)) if key not in self.schema_id_2_fields: schema = self.table.schema_manager.get_schema(schema_id) @@ -262,7 +314,20 @@ def _get_fields_and_predicate(self, schema_id: int, read_fields): if self.table.is_primary_key_table: schema_field_names.add('_SEQUENCE_NUMBER') schema_field_names.add('_VALUE_KIND') - read_file_fields = [read_field for read_field in read_fields if read_field in schema_field_names] + + def _is_reachable(name: str) -> bool: + if name in schema_field_names: + return True + if nested_path_by_name is not None: + path = nested_path_by_name.get(name) + if path: + return path[0] in schema_field_names + return False + + read_file_fields = [ + read_field for read_field in read_fields + if _is_reachable(read_field) + ] read_predicate = trim_predicate_by_fields(self.push_down_predicate, read_file_fields) read_arrow_predicate = read_predicate.to_arrow() if read_predicate else None self.schema_id_2_fields[key] = (read_file_fields, read_arrow_predicate) @@ -300,6 +365,10 @@ def _create_key_value_fields(self, value_field: List[DataField]): return all_data_fields def create_index_mapping(self): + if self._nested_path_by_name() is not None: + # Format reader already emits flat columns aligned with + # ``read_fields``; the id-based remap can't see leaf IDs. + return None base_index_mapping = self._create_base_index_mapping(self.read_fields, self._get_read_data_fields()) trimmed_key_mapping, _ = self._get_trimmed_fields(self._get_read_data_fields(), self._get_all_data_fields()) if base_index_mapping is None: @@ -341,6 +410,10 @@ def _create_base_index_mapping(self, table_fields: List[DataField], data_fields: return None def _get_final_read_data_fields(self) -> List[str]: + if self._nested_path_by_name() is not None: + # Trimmed-fields filters by ID and drops nested leaves; + # hand the format reader the user-facing flat names directly. + return self._remove_partition_fields(list(self.read_fields)) _, trimmed_fields = self._get_trimmed_fields( self._get_read_data_fields(), self._get_all_data_fields() ) @@ -395,6 +468,23 @@ def _create_partition_info(self): return PartitionInfo(partition_mapping, self.split.partition) def _construct_partition_mapping(self) -> List[int]: + if self._nested_path_by_name() is not None: + # Nested fields carry leaf IDs that don't match top-level + # data-field IDs, so the trimmed-fields machinery can't see + # them. Build the mapping directly from ``self.read_fields``: + # entries whose flat name equals a partition key get a + # negative index, the rest get a sequential read index. + partition_names = self.table.partition_keys + mapping = [0] * (len(self.read_fields) + 1) + p_count = 0 + for i, field in enumerate(self.read_fields): + if field.name in partition_names: + partition_index = partition_names.index(field.name) + mapping[i] = -(partition_index + 1) + p_count += 1 + else: + mapping[i] = (i - p_count) + 1 + return mapping _, trimmed_fields = self._get_trimmed_fields( self._get_read_data_fields(), self._get_all_data_fields() ) @@ -537,13 +627,17 @@ def __init__( predicate: Optional[Predicate], read_type: List[DataField], split: Split, - row_tracking_enabled: bool): + row_tracking_enabled: bool, + nested_name_paths: Optional[List[List[str]]] = None): self.row_ranges = None actual_split = split if isinstance(split, IndexedSplit): self.row_ranges = split.row_ranges() actual_split = split.data_split() - super().__init__(table, predicate, read_type, actual_split, row_tracking_enabled) + super().__init__( + table, predicate, read_type, actual_split, row_tracking_enabled, + nested_name_paths=nested_name_paths, + ) def _push_down_predicate(self) -> Optional[Predicate]: # Data evolution: files may have different schemas, so we don't push predicate diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 40cc337aaa4f..4f53e49126f3 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -40,7 +40,8 @@ def __init__( table, predicate: Optional[Predicate], read_type: List[DataField], - include_row_kind: bool = False + include_row_kind: bool = False, + nested_name_paths: Optional[List[List[str]]] = None, ): from pypaimon.table.file_store_table import FileStoreTable @@ -48,6 +49,10 @@ def __init__( self.predicate = predicate self.read_type = read_type self.include_row_kind = include_row_kind + # Parallel to ``read_type``; each entry is the original-schema + # name path for the field. ``None`` when no projection (or only + # top-level projection) is set. + self.nested_name_paths = nested_name_paths def to_iterator(self, splits: List[Split]) -> Iterator: def _record_generator(): @@ -268,6 +273,15 @@ def to_torch( def _create_split_read(self, split: Split) -> SplitRead: if self.table.is_primary_key_table and not split.raw_convertible: + if self.nested_name_paths and any( + len(p) > 1 for p in self.nested_name_paths): + # The merge function needs full parent structs; outer + # projection that walks the path to recover leaves is a + # separate change. Project the parent struct and extract + # client-side until then. + raise NotImplementedError( + "Nested-field projection on primary-key tables that " + "require a merge read is not yet supported") return MergeFileSplitRead( table=self.table, predicate=self.predicate, @@ -276,12 +290,23 @@ def _create_split_read(self, split: Split) -> SplitRead: row_tracking_enabled=False ) elif self.table.options.data_evolution_enabled(): + if self.nested_name_paths and any( + len(p) > 1 for p in self.nested_name_paths): + # Multi-file union for data-evolution tables matches files + # by top-level field ID; a nested ``read_field`` carries + # its leaf ID, which never matches and would silently + # produce all-NULL columns. Refuse loudly until the + # union path is taught to walk paths. + raise NotImplementedError( + "Nested-field projection on data-evolution tables is " + "not yet supported") return DataEvolutionSplitRead( table=self.table, predicate=self.predicate, read_type=self.read_type, split=split, - row_tracking_enabled=True + row_tracking_enabled=True, + nested_name_paths=self.nested_name_paths, ) else: return RawFileSplitRead( @@ -289,7 +314,8 @@ def _create_split_read(self, split: Split) -> SplitRead: predicate=self.predicate, read_type=self.read_type, split=split, - row_tracking_enabled=self.table.options.row_tracking_enabled() + row_tracking_enabled=self.table.options.row_tracking_enabled(), + nested_name_paths=self.nested_name_paths, ) @staticmethod diff --git a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py new file mode 100644 index 000000000000..d04e06f10840 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py @@ -0,0 +1,185 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class _AppendOnlyNestedBase(unittest.TestCase): + """Append-only table whose ``mv`` column is a nested struct, used to + exercise file-level Parquet/ORC pushdown of nested projection.""" + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', False) + + cls.pa_schema = pa.schema([ + ('id', pa.int64()), + ('mv', pa.struct([ + ('latest_version', pa.int64()), + ('latest_value', pa.string()), + ])), + ('val', pa.string()), + ]) + cls.rows = [ + {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'}, + {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'}, 'val': 'y'}, + {'id': 3, 'mv': {'latest_version': 300, 'latest_value': 'c'}, 'val': 'z'}, + ] + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _create_table(self, name: str, file_format: str = 'parquet'): + identifier = 'default.{}'.format(name) + schema = Schema.from_pyarrow_schema( + self.pa_schema, + options={'bucket': '-1', 'file.format': file_format}, + ) + self.catalog.create_table(identifier, schema, False) + table = self.catalog.get_table(identifier) + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pylist(self.rows, schema=self.pa_schema)) + wb.new_commit().commit(w.prepare_commit()) + w.close() + return table + + +class AppendOnlyNestedParquetTest(_AppendOnlyNestedBase): + """Parquet path uses PyArrow's dict-form scanner with ``ds.field(*path)`` + to push the nested column read into the engine.""" + + def test_dotted_name_returns_just_the_leaf(self): + table = self._create_table('ao_dotted_leaf') + rb = table.new_read_builder().with_projection(['mv.latest_version']) + got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist() + self.assertEqual( + got, + [{'mv_latest_version': 100}, + {'mv_latest_version': 200}, + {'mv_latest_version': 300}]) + + def test_mixed_nested_and_top_level_preserves_order(self): + table = self._create_table('ao_mixed_order') + rb = table.new_read_builder().with_projection( + ['mv.latest_version', 'val', 'mv.latest_value']) + got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist() + self.assertEqual( + got, + [{'mv_latest_version': 100, 'val': 'x', 'mv_latest_value': 'a'}, + {'mv_latest_version': 200, 'val': 'y', 'mv_latest_value': 'b'}, + {'mv_latest_version': 300, 'val': 'z', 'mv_latest_value': 'c'}]) + + def test_top_level_only_projection_unchanged(self): + """A projection without dots must keep the existing top-level + path — file-level pushdown still asks for plain column names, + no dict-form scanner.""" + table = self._create_table('ao_top_level_unchanged') + rb = table.new_read_builder().with_projection(['val', 'id']) + got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist() + self.assertEqual( + got, + [{'val': 'x', 'id': 1}, + {'val': 'y', 'id': 2}, + {'val': 'z', 'id': 3}]) + + def test_partitioned_table_with_nested_projection(self): + """Partition-aware reads have a separate path-mapping helper from + the non-partitioned fast path; regress the case where it dropped + non-nested top-level columns alongside the projected leaf.""" + identifier = 'default.ao_partitioned' + pa_schema = pa.schema([ + ('part', pa.string()), + ('id', pa.int64()), + ('mv', pa.struct([ + ('latest_version', pa.int64()), + ('latest_value', pa.string()), + ])), + ('val', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=['part'], + options={'bucket': '-1', 'file.format': 'parquet'}, + ) + self.catalog.create_table(identifier, schema, False) + table = self.catalog.get_table(identifier) + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pylist([ + {'part': 'A', 'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'}, + {'part': 'B', 'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'}, 'val': 'y'}, + ], schema=pa_schema)) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + # Mixed projection: nested leaf, a non-partition top-level column, + # and the partition column itself. + rb = table.new_read_builder().with_projection( + ['part', 'mv.latest_version', 'val']) + got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist() + got_sorted = sorted(got, key=lambda r: r['part']) + self.assertEqual( + got_sorted, + [{'part': 'A', 'mv_latest_version': 100, 'val': 'x'}, + {'part': 'B', 'mv_latest_version': 200, 'val': 'y'}]) + + def test_pk_table_merge_split_with_nested_projection_raises(self): + # Phase 2b lands the append-only path only; PK + nested needs an + # outer-projection wrapper that ships in a follow-up commit. Until + # then, the call must refuse loudly rather than silently corrupt + # the merge function input. Two commits on the same PK force the + # split out of the raw-convertible fast path into the merge + # reader, which is where the guard lives. + identifier = 'default.pk_nested_unsupported' + schema = Schema.from_pyarrow_schema( + self.pa_schema, + primary_keys=['id'], + options={'bucket': '1', 'file.format': 'parquet'}, + ) + self.catalog.create_table(identifier, schema, False) + table = self.catalog.get_table(identifier) + for batch in (self.rows, self.rows): # two overlapping commits + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pylist(batch, schema=self.pa_schema)) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + rb = table.new_read_builder().with_projection(['mv.latest_version']) + splits = rb.new_scan().plan().splits() + # ``to_arrow`` materialises the split read; the merge path is what + # raises, so do it eagerly here rather than waiting for the first + # batch. + with self.assertRaises(NotImplementedError): + rb.new_read().to_arrow(splits) + + +if __name__ == '__main__': + unittest.main() From 682625477d006cbe4b14fbb745a439103982377f Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 9 May 2026 16:52:28 +0800 Subject: [PATCH 3/3] [python] Implement Python-side nested-projection fallback for Avro MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fastavro has no native nested column pruning, so the reader walks each record dict step-by-step using the same ``nested_name_paths`` the previous commit threaded through ``SplitRead``. Top-level-only projection keeps the existing ``record.get(name)`` fast path. The walk helper returns ``None`` when any path segment is missing or hits a non-dict value, which lets sub-field schema evolution surface as a NULL column instead of an exception — same shape as the Parquet/ORC missing-leaf handling. --- .../read/reader/format_avro_reader.py | 41 +++++++++++++++++-- paimon-python/pypaimon/read/split_read.py | 12 +++--- .../tests/test_nested_projection_e2e.py | 25 +++++++++++ 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py b/paimon-python/pypaimon/read/reader/format_avro_reader.py index 5dd27389443e..1fe41468e801 100644 --- a/paimon-python/pypaimon/read/reader/format_avro_reader.py +++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py @@ -35,7 +35,8 @@ class FormatAvroReader(RecordBatchReader): """ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], full_fields: List[DataField], - push_down_predicate: Any, batch_size: int = 1024): + push_down_predicate: Any, batch_size: int = 1024, + nested_name_paths: Optional[List[List[str]]] = None): file_path_for_io = file_io.to_filesystem_path(file_path) self._file = file_io.filesystem.open_input_file(file_path_for_io) self._avro_reader = fastavro.reader(self._file) @@ -47,13 +48,30 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], full projected_data_fields = [full_fields_map[name] for name in read_fields] self._schema = PyarrowFieldParser.from_paimon_schema(projected_data_fields) + # ``nested_name_paths`` is parallel to ``read_fields``. Top-level + # entries are length-1 paths and use the existing ``record.get`` + # fast path; longer paths walk the record dict step-by-step. The + # path's first segment must be a real top-level Avro field — + # ``_get_fields_and_predicate`` upstream guarantees this. + if nested_name_paths is not None and len(nested_name_paths) != len(read_fields): + raise ValueError( + "nested_name_paths length {} does not match read_fields length {}".format( + len(nested_name_paths), len(read_fields))) + self._nested_name_paths = nested_name_paths + self._has_nested = bool( + nested_name_paths and any(len(p) > 1 for p in nested_name_paths)) + def read_arrow_batch(self) -> Optional[RecordBatch]: pydict_data = {name: [] for name in self._fields} records_in_batch = 0 for record in self._avro_reader: - for col_name in self._fields: - pydict_data[col_name].append(record.get(col_name)) + if self._has_nested: + for col_name, path in zip(self._fields, self._nested_name_paths): + pydict_data[col_name].append(_walk_avro_record(record, path)) + else: + for col_name in self._fields: + pydict_data[col_name].append(record.get(col_name)) records_in_batch += 1 if records_in_batch >= self._batch_size: break @@ -76,3 +94,20 @@ def close(self): if self._file: self._file.close() self._file = None + + +def _walk_avro_record(record, path: List[str]): + """Walk a list of field names through an avro record dict, returning + the leaf value or ``None`` if any segment is missing or hits a + non-dict value. ``record`` is the top-level fastavro dict; nested + record fields surface as nested dicts. + """ + current = record + for name in path: + if current is None: + return None + if isinstance(current, dict): + current = current.get(name) + continue + return None + return current diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 09281a17f64e..b68d2f9e36c7 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -197,13 +197,13 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, format_reader: RecordBatchReader if file_format == CoreOptions.FILE_FORMAT_AVRO: - if has_nested: - # Avro has no native nested column pruning; a Python-side - # walk-each-record fallback ships in a separate commit. - raise NotImplementedError( - "Nested-field projection on Avro is not yet supported") + avro_nested_paths = ( + [nested_path_by_name[name] for name in read_file_fields] + if has_nested else None + ) format_reader = FormatAvroReader(self.table.file_io, file_path, read_file_fields, - self.read_fields, read_arrow_predicate, batch_size=batch_size) + self.read_fields, read_arrow_predicate, batch_size=batch_size, + nested_name_paths=avro_nested_paths) elif file_format == CoreOptions.FILE_FORMAT_BLOB: if has_nested: raise NotImplementedError( diff --git a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py index d04e06f10840..87ab3935f435 100644 --- a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py +++ b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py @@ -150,6 +150,31 @@ def test_partitioned_table_with_nested_projection(self): [{'part': 'A', 'mv_latest_version': 100, 'val': 'x'}, {'part': 'B', 'mv_latest_version': 200, 'val': 'y'}]) + def test_avro_nested_projection_python_fallback(self): + """Avro has no native nested column pruning; the reader walks + each fastavro record dict by path and assembles the column + client-side.""" + table = self._create_table('ao_avro_nested', file_format='avro') + rb = table.new_read_builder().with_projection(['mv.latest_version', 'val']) + got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist() + self.assertEqual( + got, + [{'mv_latest_version': 100, 'val': 'x'}, + {'mv_latest_version': 200, 'val': 'y'}, + {'mv_latest_version': 300, 'val': 'z'}]) + + def test_avro_top_level_projection_unchanged(self): + """Top-level-only projection on Avro stays on the existing + ``record.get(name)`` fast path.""" + table = self._create_table('ao_avro_top', file_format='avro') + rb = table.new_read_builder().with_projection(['val', 'id']) + got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist() + self.assertEqual( + got, + [{'val': 'x', 'id': 1}, + {'val': 'y', 'id': 2}, + {'val': 'z', 'id': 3}]) + def test_pk_table_merge_split_with_nested_projection_raises(self): # Phase 2b lands the append-only path only; PK + nested needs an # outer-projection wrapper that ships in a follow-up commit. Until