[core] chain table support special partition expire.#7643
Conversation
e79ac90 to
408f4cc
Compare
|
Hello @JingsongLi , could you PTAL at this PR when you have a moment? I'd really appreciate your feedback. |
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for the detailed implementation and tests. I found one correctness issue in the explicit Flink action path:
ExpirePartitionsAction still builds a PartitionExpireStrategy from the user-provided expireStrategy argument and calls FileStore.newPartitionExpire(..., expireStrategy). However, the chain-table branch in AbstractFileStore.newPartitionExpire(String, FileStoreTable, Duration, Duration, PartitionExpireStrategy) ignores that argument and always returns newChainTablePartitionExpire(...).
This means an explicit action call with expireStrategy = update-time (or a custom strategy) against a chain table will silently run the chain table's values-time expiration instead of rejecting the unsupported strategy. The schema validation added in this PR catches persisted/dynamic table options, but it does not cover this action path because the action passes the strategy only as an argument to the overload.
Could we add the same validation in this overload before creating ChainTablePartitionExpire (for example, require the provided strategy to be PartitionValuesTimeExpireStrategy), or route the action through dynamic options so SchemaValidation rejects non-values-time consistently?
JingsongLi
left a comment
There was a problem hiding this comment.
This is a significant and well-thought-out feature. The segment-based expiration algorithm is the right approach for preserving chain integrity. Comments:
Design:
-
Segment abstraction: The "one snapshot + its dependent deltas = atomic unit" model is clean. The documentation in the chain-table.md is excellent and clearly explains the algorithm.
-
Interface extraction: Extracting
PartitionExpireto an interface withNormalPartitionExpireas the original implementation is a good refactoring pattern. All existing call sites only use interface methods — no compatibility impact. -
Delta-before-snapshot ordering: Dropping deltas before their snapshot partition ensures the commit pre-check always passes. This is a critical correctness detail — well handled.
Concerns:
-
Anchor selection edge case: "If fewer than 2 snapshots fall before the cutoff, nothing is expired." What about the case where there are 100 expired snapshots but only 1 is before cutoff? The algorithm description says we need at least 2 before cutoff to expire the ones before the anchor. This seems correct but make sure the test covers the boundary (exactly 2 snapshots before cutoff — the earliest is expired, the second becomes anchor).
-
Group partition awareness: The PR description mentions a fix to
ChainTableCommitPreCallbackfor group-partition awareness. This is a bug fix bundled with a feature — consider whether it should be a separate commit for easier bisection. -
Performance: For tables with many partitions, the sorting + filtering per group could be expensive during each commit. Is this cached or does it re-scan manifest state each time?
-
SchemaValidationchanges: The change file list includesSchemaValidation.java. What validation is being added/changed? Ensure partition expiration options are validated at table creation time.
Good work overall. Please confirm test coverage for the boundary conditions (exactly 2 snapshots before cutoff, groups with no snapshot branch partitions).
|
Please rebase master. |
408f4cc to
63a35e5
Compare
Already add validation in AbstractFileStore.newPartitionExpire. |
OK |
Purpose
This PR implements partition expiration for chain tables. Chain tables store data across snapshot
and delta branches, where delta partitions depend on their nearest earlier snapshot partition as an
anchor for merge-on-read. Standard partition expiration cannot be applied directly because dropping
a snapshot partition without considering its dependent deltas would break the chain integrity.
Changes
New: Segment-based partition expiration (
ChainTablePartitionExpire)Introduces a segment-based expiration algorithm that preserves chain integrity. A segment consists
of one snapshot partition and all delta partitions whose time falls between that snapshot and the
next snapshot. The segment is the atomic unit of expiration.
Algorithm per group:
now - partition.expiration-time).kept as anchor for its dependent deltas).
expirable segments together with their associated delta partitions.
Refactored:
PartitionExpireinterface extractionExtracted
PartitionExpirefrom a concrete class into an interface with three methods:expire(long),isValueExpiration(), andisValueAllExpired(Collection<BinaryRow>).The original implementation is preserved as
NormalPartitionExpire. All existing consumers(Flink/Spark procedures, actions,
TableCommitImpl,ConflictDetection) use only the interfacemethods — no compatibility impact.
Fixed:
ChainTableCommitPreCallbackgroup partition awarenessThe commit pre-callback that validates snapshot partition drops was not group-partition aware.
It used full partition comparators and triangular predicates, which could match partitions across
different groups and produce incorrect pre/next snapshot lookups. Refactored to:
ChainPartitionProjectorto extract group and chain dimensions.Tests
maxExpireNumlimits number of expired segmentsisValueAllExpired: anchor partitions not reported as expiredisValueAllExpired: groups with < 2 snapshots retain allisValueAllExpired: cross-group mixed scenariosisValueAllExpired: partitions after cutoff not expired