[python] Implement aggregation merge engine in pypaimon#7952
Open
TheR1sing3un wants to merge 6 commits into
Open
[python] Implement aggregation merge engine in pypaimon#7952TheR1sing3un wants to merge 6 commits into
TheR1sing3un wants to merge 6 commits into
Conversation
Lays down the per-field aggregation abstraction needed by the upcoming
aggregation merge engine. Mirrors Java's FieldAggregator +
FieldAggregatorFactory SPI in shape: an abstract base class with
agg / reset / retract, plus a module-global registry keyed by
identifier ("sum", "last_value", ...).
retract() raises NotImplementedError by default. pypaimon's
AggregateMergeFunction will reject DELETE / UPDATE_BEFORE rows up
front (same approach as PR apache#7745 for partial-update), so no
aggregator's retract path is reachable from the read pipeline.
The hook is kept so a future PR can wire retract through without
touching every subclass.
The registry is populated by concrete aggregators registering
themselves at import time. This commit only adds the framework;
concrete aggregators land in the next commit.
Tested by: pypaimon/tests/test_field_aggregator_registry.py
(register/create/unknown-identifier/retract/reset)
Implements the 9 most commonly-used field aggregators on top of the registry added in the previous commit: primary_key / last_value / last_non_null_value / first_value / first_non_null_value / sum / max / min / bool_or / bool_and Each class mirrors the value semantics of its Java counterpart in paimon-core/.../mergetree/compact/aggregate/. sum / max / min rely on Python's native +/< so int / float / Decimal / str / date all work through one code path; type validation (numeric for sum, boolean for bool_or/bool_and) happens at factory time so configuration errors surface at merge-function construction rather than mid-read. Remaining Java aggregators (product / listagg / collect / merge_map / nested_update / theta_sketch / hll_sketch / roaring_bitmap_*) are intentionally deferred — the registry will report them as unsupported so users see a clear error rather than a silent fallback. The aggregate package __init__ now eagerly imports this module so the registrations always happen before any lookup. Tested by: pypaimon/tests/test_field_aggregators.py (26 cases covering value semantics, null handling, reset, type validation and registration sanity check).
Adds AggregateMergeFunction, the third MergeFunction implementation after DeduplicateMergeFunction and PartialUpdateMergeFunction. Each non-PK column is reduced across rows for the same primary key by the field aggregator configured in table options (fields.<field>.aggregate-function), falling back to fields.default-aggregate-function and finally to last_non_null_value when nothing is set — same precedence as Java. Wiring: * split_read.py dispatches MergeEngine.AGGREGATE to a new AggregateMergeFunction built via build_field_aggregators(), which resolves the per-column aggregator identifier and instantiates each through the registry from the previous commit. * merge_engine_support.check_supported now lets aggregation through. The aggregation-specific unsupported-option guard (out-of-scope aggregators, retract opt-ins, sequence.field on aggregation tables) lands in the follow-up commit; until then misconfigurations surface either at merge-function construction (unknown aggregator → ValueError) or at first DELETE / UPDATE_BEFORE row (NotImplementedError). * test_partial_update_e2e.py drops the stale test_aggregation_engine_raises_not_implemented placeholder now that aggregation actually runs. Intentionally not implemented yet (mirrors the partial-update PR's scoping approach so the contract is explicit rather than partial): * retract handling for DELETE / UPDATE_BEFORE rows; * the 14 remaining Java aggregators (product / listagg / collect / merge_map* / nested_update* / theta_sketch / hll_sketch / roaring_bitmap_*); * sequence-field special-casing on aggregation tables. Tested by: * test_aggregation_merge_function.py — 16 unit cases (per-field agg composition, null handling, reset, decoupled output, retract-row rejection, resolve_agg_func_name / build_field_aggregators). * test_aggregation_e2e.py — 6 end-to-end cases on real PK tables (sum across commits, mixed aggregators, null semantics, disjoint keys, table-default aggregator, implicit last_non_null_value).
…cope options The previous commit wires AggregateMergeFunction into the read path but only the merge-function-level rejections fire on misconfigured tables (unknown aggregator → ValueError at construction, retract row → NotImplementedError at first DELETE). That leaves several configuration classes that Java treats as well-defined but the Python port doesn't implement — and which would silently produce a wrong answer rather than fail. Adds aggregation_unsupported_options() to merge_engine_support and calls it from the AGGREGATE branch of check_supported() so the following options surface a NotImplementedError at TableRead construction time, before any row is read: * aggregation.remove-record-on-delete=true * fields.<f>.ignore-retract=true * sequence.field (Java would force last_value on the sequence field; pypaimon doesn't special-case it, so the safer answer is to refuse the table than diverge silently) * fields.<f>.sequence-group * fields.<f>.aggregate-function set to an aggregator the port hasn't implemented yet (collect, nested_update, ...) * fields.default-aggregate-function set to an out-of-scope aggregator Supported aggregator identifiers are duplicated in merge_engine_support so the guard has no import-time dependency on the read-pipeline modules; comment in both files flags the duplication so a future addition keeps both sides in sync. Tested by: 7 new cases in test_aggregation_e2e.py — one per rejected configuration plus a positive case ensuring a supported aggregator (sum) does not trip the guard.
aggregators.py and test_field_aggregators.py docstrings both said "9 aggregators" but the actually-registered set is 10 (primary_key placeholder plus 9 value aggregators). The EXPECTED frozenset in RegistrationTest already lists 10. Reword the docstrings to match the implementation so a future reader counting from the docstring doesn't add an 11th and chase a phantom registration.
fb7d38a to
ba20cf4
Compare
JingsongLi
approved these changes
May 27, 2026
Contributor
JingsongLi
left a comment
There was a problem hiding this comment.
LGTM. Solid port of the Java AggregateMergeFunction to pypaimon.
Highlights:
- The
FieldAggregatorframework with registry pattern is extensible and mirrors the Java architecture well - The 9 built-in aggregators cover the most common use cases
merge_engine_supportguard correctly rejects unsupported configurations (retract, sequence-field, out-of-scope aggregators) with clear error messages- Good test coverage: unit tests for individual aggregators + e2e tests for the full merge pipeline
The scoping decision to defer retract handling and less-common aggregators is pragmatic and well-documented.
+1
leaves12138
requested changes
May 27, 2026
Contributor
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for the implementation. I found one correctness issue in the read-path wiring for partitioned primary-key tables; please fix before merge.
Validation I ran locally:
PYTHONPATH=. python -m pytest -q pypaimon/tests/test_field_aggregators.py pypaimon/tests/test_field_aggregator_registry.py pypaimon/tests/test_aggregation_merge_function.py-> 47 passedPYTHONPATH=. python -m pytest -q pypaimon/tests/test_aggregation_e2e.py pypaimon/tests/test_partial_update_e2e.py-> 28 passed
JingsongLi
requested changes
May 27, 2026
…gators The read-path was passing ``trimmed_primary_key`` (partition columns stripped) to ``build_field_aggregators``. ``value_fields`` still carries those partition columns, so any PK column that was also a partition column went unrecognised as PK and fell through to the configured ``fields.default-aggregate-function``. With e.g. PK ``[p, id]``, partition ``[p]``, default ``sum``, writing the same ``(p, id)`` twice yielded ``p = p1 + p2`` instead of staying constant. Switch to ``self.table.primary_keys`` (full list) so partition-PK columns get the identity aggregator. Adds a regression test reproducing the exact scenario from the review comment.
Member
Author
|
@JingsongLi @leaves12138 Thanks for your review, please review again! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Port Java's
AggregateMergeFunctionto pypaimon, following the shape of #7745.Ships the
FieldAggregatorframework, 9 value aggregators (sum/max/min/last_value/last_non_null_value/first_value/first_non_null_value/bool_or/bool_and) plus theprimary_keyplaceholder, theAggregateMergeFunctionwired intoMergeFileSplitRead._build_merge_function, and amerge_engine_supportguard that rejects retract opt-ins, sequence fields and out-of-scope aggregator identifiers (collect/nested_update/theta_sketch/roaring_bitmap_*/ ...).Retract handling (DELETE / UPDATE_BEFORE) and the remaining 14 Java aggregators are intentionally deferred to follow-up PRs, mirroring #7745's scoping.
Tests
test_field_aggregator_registry.py,test_field_aggregators.py,test_aggregation_merge_function.pytest_aggregation_e2e.pycovers positive merges (sum across commits, mixed aggregators, null semantics, disjoint keys, default aggregator) plus the unsupported-option guard.