-
Notifications
You must be signed in to change notification settings - Fork 519
[lake/paimon] Partition expiration never fires in Paimon lake table when managed by Tiering Service #2861
Description
Search before asking
- I searched in the issues and found nothing similar.
Description
Task Overview:
Fix partition expiration in the Fluss Tiering Service for Paimon lake tables. Currently, when a Paimon lake table is configured with partition.expiration-time, partition expiration never fires during Tiering Service execution, even after many tiering rounds.
Root Cause:
This is caused by a lifecycle mismatch between the Tiering Service and Paimon's partition expiration mechanism.
In PaimonLakeCommitter, every tiering round creates a brand-new TableCommitImpl via fileStoreTable.newCommit(...). Paimon's TableCommitImpl in turn constructs a fresh PartitionExpire each time, whose lastCheck is initialized to approximately now:
// PartitionExpire constructor
long checkIntervalSeconds = checkInterval.toMillis() / 1000;
if (checkIntervalSeconds > 0) {
rndSeconds = ThreadLocalRandom.current().nextLong(checkIntervalSeconds);
}
this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds);The expire() method has three trigger conditions:
List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) {
if (checkInterval.isZero() // (1)
|| now.isAfter(lastCheck.plus(checkInterval)) // (2)
|| (endInputCheckPartitionExpire // (3)
&& Long.MAX_VALUE == commitIdentifier)) {
// ... do expire
}
}Since lastCheck is just reset on every tiering round, condition (2) is always false within the lifetime of a single short-lived PartitionExpire. checkInterval is never zero by default, so condition (1) doesn't help either.
In a standard Flink Stream job, TableCommitImpl is long-lived so (2) eventually satisfies. Additionally, Paimon provides end-input.check-partition-expire: when the batch job ends, it commits with commitIdentifier = Long.MAX_VALUE (BatchWriteBuilder.COMMIT_IDENTIFIER), unconditionally triggering condition (3).
The Tiering Service has neither property:
TableCommitImpl/PartitionExpireis recreated on every tiering round — condition (2) never satisfies.commitIdentifieris alwaysLong.MAX_VALUE(sameCOMMIT_IDENTIFIER), butendInputCheckPartitionExpiredefaults tofalse— condition (3) never fires.
Willingness to contribute
- I'm willing to submit a PR!