[python] Add self-contained Ray datasource and top-level read_paimon/write_paimon API#7740
[python] Add self-contained Ray datasource and top-level read_paimon/write_paimon API#7740TheR1sing3un wants to merge 7 commits intoapache:masterfrom
Conversation
…write_paimon API
The existing RayDatasource needs the caller to first build a TableRead
(via Catalog -> Table -> ReadBuilder -> TableRead) and then pass it in
together with the planned splits. That is fine for the
TableRead.to_ray() helper, but it means there is no single-line API for
"read this Paimon table into a Ray Dataset" — every user has to repeat
the catalog/table/builder boilerplate.
Mirror Iceberg's IcebergDatasource: make RayDatasource self-contained
and add a top-level facade.
* RayDatasource(table_identifier, catalog_options, predicate=None,
projection=None, limit=None) is the new constructor. The catalog,
table, splits and read_type are loaded lazily via @Property, so the
object is cheap to instantiate and easy to ship across Ray workers.
* The legacy entry point (TableRead.to_ray) keeps working through a
new RayDatasource._from_table_read(table_read, splits) classmethod
that wraps an already-resolved (table_read, splits) pair without a
second catalog round-trip.
* Refactor the read closure into a module-level _paimon_read_task
generator that yields one Arrow table per batch. This avoids the
closure-over-self serialization overhead (ray-project/ray#49107) and
keeps memory proportional to one batch instead of the whole chunk.
Add the new pypaimon.ray module:
* pypaimon.ray.read_paimon(table_identifier, catalog_options, *,
filter=None, projection=None, limit=None, ray_remote_args=None,
concurrency=None, override_num_blocks=None, **read_args) ->
ray.data.Dataset.
* pypaimon.ray.write_paimon(dataset, table_identifier, catalog_options,
*, overwrite=False, concurrency=None, ray_remote_args=None) -> None.
Tests:
* New pypaimon/tests/ray_integration_test.py covers basic read,
column projection, predicate pushdown, empty table, basic write,
overwrite, primary-key upsert + read-back, and the
override_num_blocks<1 guard.
* Existing ray_data_test / ray_sink_test (which exercise
TableRead.to_ray and PaimonDatasink directly) continue to pass via
the _from_table_read bridge — no behavioural change for those
paths.
Three review comments from JingsongLi on apache#7740: 1. Extract _ensure_planned() helper. The splits and read_type properties each duplicated `if self._x is None: self._plan()`; both now route through a single _ensure_planned() that runs the ReadBuilder plan once and populates both fields together. 2. _from_table_read no longer bypasses __init__ via cls.__new__. Added a private _resolved=(table, splits, read_type) sentinel parameter to __init__; when supplied the catalog/identifier path is skipped and the pre-resolved values are used directly. _from_table_read now forwards through __init__, so any future field added to __init__ is automatically initialized for both construction paths. Validation added for the public path (table_identifier and catalog_options required when _resolved is None). 3. Added test_read_paimon_with_limit to ray_integration_test.py: writes 10 rows across two partitions (forces two raw-convertible splits) and asserts limit=3 causes the scan plan to drop the second split, so the resulting Ray Dataset row count is < 10. The full unbounded read serves as a sanity baseline (count == 10). The assertion uses `< 10` rather than `== N` because Paimon's scan-time limit is a per-split cap (whole-split granularity), not a row-exact hard limit — row-exact short-circuiting in the reader is a separate follow-up. Tests: pypaimon/tests/ray_integration_test.py 9/9 pass. Lint: flake8 clean.
|
Thanks for the review! Addressed all three comments in c4e86cd:
Tests: |
| def __init__( | ||
| self, | ||
| table_identifier: Optional[str] = None, | ||
| catalog_options: Optional[Dict[str, str]] = None, |
There was a problem hiding this comment.
Perhaps we should do some abstraction, such as providing a SplitProvider to obtain splits.
There was a problem hiding this comment.
Perhaps we should do some abstraction, such as providing a
SplitProviderto obtain splits.
Do you mean to extract the logic of how to find the corresponding read splits through identifiers, predicates, limits, and projections into a separate method? To achieve possible reuse in the future? For example, the integration of the high level into the daft engine
There was a problem hiding this comment.
I just mean you should do some abstraction, introduce SplitProvider ABC, and implement two implementations.
There was a problem hiding this comment.
I just mean you should do some abstraction, introduce
SplitProviderABC, and implement two implementations.
Thank you for your reminder. I'll make it happen
There was a problem hiding this comment.
I just mean you should do some abstraction, introduce
SplitProviderABC, and implement two implementations.
done~
Per JingsongLi's feedback on apache#7740 (line 79), the catalog -> table -> ReadBuilder -> Scan -> splits chain that lived inline in RayDatasource is now hidden behind a SplitProvider ABC with two implementations: - CatalogSplitProvider: builds the chain from a table identifier and catalog options. Used by the public read_paimon facade. Caches the ReadBuilder plan so splits / read_type are resolved together exactly once, mirroring the previous _ensure_planned helper. - PreResolvedSplitProvider: wraps an already-resolved (table, splits, read_type, predicate) tuple. Used by the TableRead.to_ray bridge to skip the catalog round-trip. RayDatasource.__init__ now takes a SplitProvider via a keyword-only split_provider= argument; the public table_identifier / catalog_options path constructs CatalogSplitProvider for callers transparently. The previous _resolved sentinel is removed in favor of this cleaner injection point. display_name() is added to the ABC so RayDatasource.get_name() can stay provider-agnostic instead of branching on isinstance. Tests: split_provider_test.py covers both implementations - lazy planning + caching, identifier/options validation, propagation of predicate / projection / limit (limit verified by split-pruning on a multi-commit table), and the pre-resolved passthrough. All existing ray_data_test / ray_sink_test / ray_integration_test cases continue to pass through the bridge.
Per JingsongLi's feedback on apache#7740 (line 79), the catalog -> table -> ReadBuilder -> Scan -> splits chain that lived inline in RayDatasource is now hidden behind a SplitProvider ABC with two implementations: - CatalogSplitProvider: builds the chain from a table identifier and catalog options. Used by the public read_paimon facade. Caches the ReadBuilder plan so splits / read_type are resolved together exactly once, mirroring the previous _ensure_planned helper. - PreResolvedSplitProvider: wraps an already-resolved (table, splits, read_type, predicate) tuple. Used by the TableRead.to_ray bridge to skip the catalog round-trip. RayDatasource.__init__ now takes a SplitProvider via a keyword-only split_provider= argument; the public table_identifier / catalog_options path constructs CatalogSplitProvider for callers transparently. The previous _resolved sentinel is removed in favor of this cleaner injection point. display_name() is added to the ABC so RayDatasource.get_name() can stay provider-agnostic instead of branching on isinstance. Tests: split_provider_test.py covers both implementations - lazy planning + caching, identifier/options validation, propagation of predicate / projection / limit (limit verified by split-pruning on a multi-commit table), and the pre-resolved passthrough. All existing ray_data_test / ray_sink_test / ray_integration_test cases continue to pass through the bridge.
944d144 to
b863ad9
Compare
Drop the inline scan args (table_identifier / catalog_options / predicate / projection / limit) from RayDatasource.__init__. Once the SplitProvider abstraction was introduced these duplicated CatalogSplitProvider's constructor and created a footgun where passing both a split_provider= kwarg and the inline args would silently drop the inline args. The constructor is now `RayDatasource(split_provider)`. Callers build the provider themselves: - pypaimon.ray.read_paimon constructs CatalogSplitProvider. - TableRead.to_ray constructs PreResolvedSplitProvider. The _from_table_read classmethod is removed since the bridge is now a plain two-line caller-side construct.
Now that RayDatasource is a thin shell over a SplitProvider, the public-looking @Property wrappers around the provider's API were duplication, not abstraction. Remove them and call the provider directly from the two internal users (estimate_inmemory_data_size, get_read_tasks). Removed: - split_provider property (no caller anywhere in the repo). - table / splits / read_type / predicate properties (only used inside this class, mirror SplitProvider one-for-one, and would force every future SplitProvider method addition to add a parallel property). Also drop two dead defensive checks in get_read_tasks: - hasattr(split, 'merged_row_count') -- defined on the Split ABC, so the check is always true. - hasattr(split, 'row_count') -- abstract Split property, always true. The hasattr guards on file_size / file_paths stay: those are not on the ABC and the only Split implementation today (DataSplit) carries them as concrete attributes.
The previous refactor hoisted the per-task read function to module level. The justifications given in the PR description -- "avoids closure-over-self serialization overhead" and "memory proportional to one batch" -- did not actually depend on module-level placement: the nested form already used default-arg early binding to avoid capturing self, and was already a generator. Module-level only added a small pickle-stability benefit at the cost of an extra public-looking name. Inline it back as a nested function with the same default-arg binding pattern. partial(_read_task, chunk_splits) replaces the longer partial(_paimon_read_task, chunk_splits, table=..., predicate=..., ...) call site.
Minimise the read-task-related diff vs master: keep the original inner function name (_get_read_task), type annotations, docstring, and the intermediate `get_read_task = partial(_get_read_task, ...)` step as-is. The only change in the read-task area is now where its inputs (table, predicate, read_type, splits, schema) come from -- they're sourced from the SplitProvider instead of self.table_read.
Purpose
Today, reading a Paimon table into a Ray Dataset requires the caller to first build a TableRead by hand:
That works for the existing
TableRead.to_ray()helper, but it forces every user to repeat the same catalog → table → builder boilerplate. The Iceberg integration has long had a single-lineIcebergDatasource(table_identifier, catalog_options, ...), and that's the missing surface here.This PR makes
RayDatasourceself-contained and adds a top-level facade so reading and writing a Paimon table from Ray is one call.Datasource refactor
RayDatasource(table_identifier, catalog_options, predicate=None, projection=None, limit=None)is the new constructor. The catalog, table, splits andread_typeare loaded lazily via@property(table,splits,read_type), so the object is cheap to instantiate and easy to ship across Ray workers.TableRead.to_ray()— keeps working through a newRayDatasource._from_table_read(table_read, splits)classmethod that wraps an already-resolved(table_read, splits)pair without a second catalog round-trip._paimon_read_taskgenerator that yields one Arrow table per batch. This:selfserialization overhead documented in ray-project/ray#49107;New module:
pypaimon.rayread_paimon(table_identifier, catalog_options, *, filter=None, projection=None, limit=None, ray_remote_args=None, concurrency=None, override_num_blocks=None, **read_args) -> ray.data.Datasetwrite_paimon(dataset, table_identifier, catalog_options, *, overwrite=False, concurrency=None, ray_remote_args=None) -> NoneLinked issue
N/A — surfaced when wiring pypaimon into a Ray-based ingestion pipeline alongside Iceberg, where users expected a
read_paimon/write_paimonfacade analogous toread_iceberg/write_iceberg.Tests
New
pypaimon/tests/ray_integration_test.py(8 cases):test_read_paimon_basic— round-trip of three rows.test_read_paimon_with_projection— projection narrowing.test_read_paimon_with_filter— predicate pushdown viaPredicateBuilder.test_read_paimon_empty_table— empty datasets work.test_write_paimon_basic— write-then-read round-trip via the facade.test_write_paimon_overwrite—overwrite=Truereplaces data.test_read_paimon_primary_key— PK upsert + merge-on-read row count and values.test_read_paimon_invalid_override_num_blocks— guardsoverride_num_blocks < 1.Existing
ray_data_test/ray_sink_test(which exerciseTableRead.to_ray()andPaimonDatasinkdirectly) continue to pass via the_from_table_readbridge — no behavioural change for those paths.Local:
pytest pypaimon/tests/{ray_data_test,ray_sink_test,ray_integration_test}.py→ 26 passed;flake8 --config=dev/cfg.iniclean.API and format
Public Python API additions (
pypaimon.ray.read_paimon,pypaimon.ray.write_paimon). The existingRayDatasourceconstructor signature changes shape (now takestable_identifier + catalog_optionsinstead oftable_read + splits). The only in-tree caller of the old form,TableRead.to_ray(), is migrated toRayDatasource._from_table_read()in the same change. No external project should be importingRayDatasourcedirectly today, but the bridge classmethod is kept available for callers that do.No file format change.
Documentation
Public docstrings on
RayDatasource,read_paimon, andwrite_paimondescribe the new contract and parameters.Generative AI disclosure
Drafted with assistance from an AI coding tool; the design follows Iceberg's
IcebergDatasourcepattern, and every behavioural guarantee made by the new facade is exercised by a test inray_integration_test.py.