From 6c63480a8bcf088f383f3b4f8300004b3b3ebc80 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 27 Apr 2026 10:20:36 +0800 Subject: [PATCH 1/6] [core] Check next snapshot's time for snapshotTimeRetain expiration protection Previously, a snapshot was expired if its own timestamp exceeded snapshotTimeRetain. This could cause premature expiration when snapshotRetainMin is small. Now we check the next snapshot's timestamp instead, ensuring each snapshot is protected for the full snapshotTimeRetain duration. --- .../paimon/table/ExpireSnapshotsImpl.java | 7 ++- .../paimon/operation/ExpireSnapshotsTest.java | 62 +++++++++++++++++-- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index beeb6ae2b510..917e2dd75a28 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -136,10 +136,11 @@ public int expire() { for (long id = min; id < maxExclusive; id++) { // Early exit the loop for 'snapshot.time-retained' - // (the maximum time of snapshots to retain) + // A snapshot can only be expired if its next snapshot has been alive + // longer than snapshotTimeRetain, providing stronger protection try { - Snapshot snapshot = snapshotManager.tryGetSnapshot(id); - if (olderThanMills <= snapshot.timeMillis()) { + Snapshot nextSnapshot = snapshotManager.tryGetSnapshot(id + 1); + if (olderThanMills <= nextSnapshot.timeMillis()) { return expireUntil(earliest, id); } } catch (FileNotFoundException e) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 01e0473e88fa..35ae859800d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -538,13 +538,63 @@ public void testExpireWithTime() throws Exception { expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire(); int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); - for (int i = 1; i <= latestSnapshotId; i++) { - if (snapshotManager.snapshotExists(i)) { - assertThat(snapshotManager.snapshot(i).timeMillis()) - .isBetween(expireMillis - 1000, expireMillis); - assertSnapshot(i, allData, snapshotPositions); - } + // snapshots 1-4 should be expired, snapshot 5 is retained because its next + // snapshot (6) is within the time window + for (int i = 1; i <= 4; i++) { + assertThat(snapshotManager.snapshotExists(i)).isFalse(); + } + for (int i = 5; i <= latestSnapshotId; i++) { + assertThat(snapshotManager.snapshotExists(i)).isTrue(); + assertSnapshot(i, allData, snapshotPositions); + } + + store.assertCleaned(); + } + + @Test + public void testExpireWithTimeProtectsEachSnapshot() throws Exception { + // Even with a small retainMin, each snapshot should be protected by + // snapshotTimeRetain: a snapshot can only be expired when its next + // snapshot has been alive longer than snapshotTimeRetain. + ExpireConfig.Builder builder = ExpireConfig.builder(); + builder.snapshotRetainMin(1) + .snapshotRetainMax(Integer.MAX_VALUE) + .snapshotTimeRetain(Duration.ofMillis(5000)); + ExpireSnapshots expire = store.newExpire(builder.build()); + + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + + // create 5 snapshots quickly + commit(5, allData, snapshotPositions); + + // expire immediately - no snapshot should be expired because each + // snapshot's next snapshot is still within the time window + expire.config(builder.build()).expire(); + + for (int i = 1; i <= 5; i++) { + assertThat(snapshotManager.snapshotExists(i)).isTrue(); + assertSnapshot(i, allData, snapshotPositions); + } + + // wait for snapshotTimeRetain to pass + Thread.sleep(5500); + + // create one more snapshot so snapshot 5 has a "next" + commit(1, allData, snapshotPositions); + + // expire again - now snapshots 1-4 can be expired (their next snapshots + // are older than 5000ms), but snapshot 5 is still protected because its + // next snapshot (6) was just created + expire.config(builder.build()).expire(); + + for (int i = 1; i <= 4; i++) { + assertThat(snapshotManager.snapshotExists(i)).isFalse(); } + assertThat(snapshotManager.snapshotExists(5)).isTrue(); + assertThat(snapshotManager.snapshotExists(6)).isTrue(); + assertSnapshot(5, allData, snapshotPositions); + assertSnapshot(6, allData, snapshotPositions); store.assertCleaned(); } From 0d13ec99aa1fb599e74eeca1d8b5fea328192d9c Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 27 Apr 2026 14:56:45 +0800 Subject: [PATCH 2/6] fix --- .../flink/procedure/ExpireSnapshotsProcedureITCase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java index bfec37fb0c7c..3eb3335d6605 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java @@ -72,12 +72,12 @@ public void testExpireSnapshotsProcedure() throws Exception { + "', retain_min => 3)"); checkSnapshots(snapshotManager, 4, 6); - // older_than => timestamp of snapshot 6, expected snapshots (6) + // older_than => timestamp of snapshot 6, expected snapshots (5, 6) sql( "CALL sys.expire_snapshots(`table` => 'default.word_count', older_than => '" + ts6.toString() + "')"); - checkSnapshots(snapshotManager, 6, 6); + checkSnapshots(snapshotManager, 5, 6); } @ParameterizedTest @@ -157,7 +157,7 @@ public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Excepti .run(); checkSnapshots(snapshotManager, 4, 6); - // older_than => timestamp of snapshot 6, expected snapshots (6) + // older_than => timestamp of snapshot 6, expected snapshots (5, 6) createAction( ExpireSnapshotsAction.class, "expire_snapshots", @@ -173,7 +173,7 @@ public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Excepti Boolean.toString(forceStartFlinkJob)) .withStreamExecutionEnvironment(env) .run(); - checkSnapshots(snapshotManager, 6, 6); + checkSnapshots(snapshotManager, 5, 6); } @Test From 5cabbfb1d1c02a6268ef8669d1866ec1cba6b783 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 7 May 2026 20:12:51 +0800 Subject: [PATCH 3/6] fix tests --- .../paimon/table/ExpireSnapshotsImpl.java | 13 +++-- .../paimon/operation/ExpireSnapshotsTest.java | 53 ++++++++++++++++--- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 917e2dd75a28..ef3796f20df3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot; @@ -63,6 +64,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots { private final TagManager tagManager; private ExpireConfig expireConfig; + private Supplier currentTimeMillis = System::currentTimeMillis; public ExpireSnapshotsImpl( SnapshotManager snapshotManager, @@ -82,6 +84,11 @@ public ExpireSnapshotsImpl( this.fileExecutor = snapshotDeletion.fileExecutor(); } + @VisibleForTesting + public void setCurrentTimeMillis(Supplier currentTimeMillis) { + this.currentTimeMillis = currentTimeMillis; + } + @Override public ExpireSnapshots config(ExpireConfig expireConfig) { this.expireConfig = expireConfig; @@ -95,7 +102,7 @@ public int expire() { int retainMin = expireConfig.getSnapshotRetainMin(); int maxDeletes = expireConfig.getSnapshotMaxDeletes(); long olderThanMills = - System.currentTimeMillis() - expireConfig.getSnapshotTimeRetain().toMillis(); + currentTimeMillis.get() - expireConfig.getSnapshotTimeRetain().toMillis(); Long latestSnapshotId = snapshotManager.latestSnapshotId(); if (latestSnapshotId == null) { @@ -163,7 +170,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { private int innerExpireUntil(long earliestId, long endExclusiveId) throws ExecutionException, InterruptedException { - long startTime = System.currentTimeMillis(); + long startTime = currentTimeMillis.get(); if (endExclusiveId <= earliestId) { // No expire happens: @@ -269,7 +276,7 @@ private int innerExpireUntil(long earliestId, long endExclusiveId) } writeEarliestHint(endExclusiveId); - long duration = System.currentTimeMillis() - startTime; + long duration = currentTimeMillis.get() - startTime; LOG.info( "Finished expire snapshots, duration {} ms, range is [{}, {})", duration, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 35ae859800d8..800f286691b0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -524,16 +524,22 @@ public void testExpireWithTime() throws Exception { builder.snapshotRetainMin(1) .snapshotRetainMax(Integer.MAX_VALUE) .snapshotTimeRetain(Duration.ofMillis(1000)); - ExpireSnapshots expire = store.newExpire(builder.build()); + ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(builder.build()); List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); commit(5, allData, snapshotPositions); - Thread.sleep(1500); + for (int i = 1; i <= 5; i++) { + rewriteSnapshotTime(i, 0); + } commit(5, allData, snapshotPositions); - long expireMillis = System.currentTimeMillis(); - // expire twice to check for idempotence + for (int i = 6; i <= 10; i++) { + rewriteSnapshotTime(i, 2000); + } + // expire at time 2500, olderThanMills = 1500 + expire.setCurrentTimeMillis(() -> 2500L); + // expire twice to check for idempotence expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire(); expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire(); @@ -560,16 +566,20 @@ public void testExpireWithTimeProtectsEachSnapshot() throws Exception { builder.snapshotRetainMin(1) .snapshotRetainMax(Integer.MAX_VALUE) .snapshotTimeRetain(Duration.ofMillis(5000)); - ExpireSnapshots expire = store.newExpire(builder.build()); + ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(builder.build()); List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); // create 5 snapshots quickly commit(5, allData, snapshotPositions); + for (int i = 1; i <= 5; i++) { + rewriteSnapshotTime(i, 0); + } // expire immediately - no snapshot should be expired because each // snapshot's next snapshot is still within the time window + expire.setCurrentTimeMillis(() -> 100L); expire.config(builder.build()).expire(); for (int i = 1; i <= 5; i++) { @@ -577,15 +587,14 @@ public void testExpireWithTimeProtectsEachSnapshot() throws Exception { assertSnapshot(i, allData, snapshotPositions); } - // wait for snapshotTimeRetain to pass - Thread.sleep(5500); - // create one more snapshot so snapshot 5 has a "next" commit(1, allData, snapshotPositions); + rewriteSnapshotTime(6, 6000); // expire again - now snapshots 1-4 can be expired (their next snapshots // are older than 5000ms), but snapshot 5 is still protected because its // next snapshot (6) was just created + expire.setCurrentTimeMillis(() -> 6500L); expire.config(builder.build()).expire(); for (int i = 1; i <= 4; i++) { @@ -790,6 +799,34 @@ private TestFileStore createStore() { .build(); } + private void rewriteSnapshotTime(long snapshotId, long newTimeMillis) throws IOException { + Snapshot old = snapshotManager.snapshot(snapshotId); + Snapshot updated = + new Snapshot( + old.id(), + old.schemaId(), + old.baseManifestList(), + old.baseManifestListSize(), + old.deltaManifestList(), + old.deltaManifestListSize(), + old.changelogManifestList(), + old.changelogManifestListSize(), + old.indexManifest(), + old.commitUser(), + old.commitIdentifier(), + old.commitKind(), + newTimeMillis, + old.totalRecordCount(), + old.deltaRecordCount(), + old.changelogRecordCount(), + old.watermark(), + old.statistics(), + old.properties(), + old.nextRowId()); + fileIO.overwriteFileUtf8(snapshotManager.snapshotPath(snapshotId), updated.toJson()); + snapshotManager.invalidateCache(); + } + protected void commit(int numCommits, List allData, List snapshotPositions) throws Exception { for (int i = 0; i < numCommits; i++) { From 1e31960802293984eeb918d6b6542cb8dc585f07 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 7 May 2026 21:29:14 +0800 Subject: [PATCH 4/6] fix tests --- .../procedure/ExpireSnapshotsProcedureITCase.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java index 3eb3335d6605..0c590510ab5c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java @@ -72,12 +72,12 @@ public void testExpireSnapshotsProcedure() throws Exception { + "', retain_min => 3)"); checkSnapshots(snapshotManager, 4, 6); - // older_than => timestamp of snapshot 6, expected snapshots (5, 6) + // older_than => timestamp + 1 of snapshot 6, expected snapshots (6) sql( "CALL sys.expire_snapshots(`table` => 'default.word_count', older_than => '" - + ts6.toString() + + new Timestamp(ts6.getTime() + 1) + "')"); - checkSnapshots(snapshotManager, 5, 6); + checkSnapshots(snapshotManager, 6, 6); } @ParameterizedTest @@ -157,7 +157,7 @@ public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Excepti .run(); checkSnapshots(snapshotManager, 4, 6); - // older_than => timestamp of snapshot 6, expected snapshots (5, 6) + // older_than => timestamp + 1 of snapshot 6, expected snapshots (6) createAction( ExpireSnapshotsAction.class, "expire_snapshots", @@ -168,12 +168,12 @@ public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Excepti "--table", "word_count", "--older_than", - ts6.toString(), + new Timestamp(ts6.getTime() + 1).toString(), "--force_start_flink_job", Boolean.toString(forceStartFlinkJob)) .withStreamExecutionEnvironment(env) .run(); - checkSnapshots(snapshotManager, 5, 6); + checkSnapshots(snapshotManager, 6, 6); } @Test From 6610e570917b910996c174dfe7062da2197f854c Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Fri, 8 May 2026 11:23:09 +0800 Subject: [PATCH 5/6] Update paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java Co-authored-by: tsreaper --- .../paimon/operation/ExpireSnapshotsTest.java | 29 ++++--------------- 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 800f286691b0..57a15e773f06 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -800,30 +800,11 @@ private TestFileStore createStore() { } private void rewriteSnapshotTime(long snapshotId, long newTimeMillis) throws IOException { - Snapshot old = snapshotManager.snapshot(snapshotId); - Snapshot updated = - new Snapshot( - old.id(), - old.schemaId(), - old.baseManifestList(), - old.baseManifestListSize(), - old.deltaManifestList(), - old.deltaManifestListSize(), - old.changelogManifestList(), - old.changelogManifestListSize(), - old.indexManifest(), - old.commitUser(), - old.commitIdentifier(), - old.commitKind(), - newTimeMillis, - old.totalRecordCount(), - old.deltaRecordCount(), - old.changelogRecordCount(), - old.watermark(), - old.statistics(), - old.properties(), - old.nextRowId()); - fileIO.overwriteFileUtf8(snapshotManager.snapshotPath(snapshotId), updated.toJson()); + String oldJson = fileIO.readFileUtf8(snapshotManager.snapshotPath(snapshotId)); + ObjectNode node = (ObjectNode) JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(oldJson); + node.put(Snapshot.FIELD_TIME_MILLIS, newTimeMillis); + String newJson = JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.writeValueAsString(node); + fileIO.overwriteFileUtf8(snapshotManager.snapshotPath(snapshotId), newJson); snapshotManager.invalidateCache(); } From 4076708f67a4986ddbecdc30250c9b21b6dec414 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 8 May 2026 11:25:40 +0800 Subject: [PATCH 6/6] Fix compile --- .../org/apache/paimon/operation/ExpireSnapshotsTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 57a15e773f06..0b2b0031c859 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -44,10 +44,13 @@ import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.ExpireSnapshotsImpl; import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; @@ -802,7 +805,7 @@ private TestFileStore createStore() { private void rewriteSnapshotTime(long snapshotId, long newTimeMillis) throws IOException { String oldJson = fileIO.readFileUtf8(snapshotManager.snapshotPath(snapshotId)); ObjectNode node = (ObjectNode) JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(oldJson); - node.put(Snapshot.FIELD_TIME_MILLIS, newTimeMillis); + node.put("timeMillis", newTimeMillis); String newJson = JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.writeValueAsString(node); fileIO.overwriteFileUtf8(snapshotManager.snapshotPath(snapshotId), newJson); snapshotManager.invalidateCache();