Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot;
Expand All @@ -63,6 +64,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
private final TagManager tagManager;

private ExpireConfig expireConfig;
private Supplier<Long> currentTimeMillis = System::currentTimeMillis;

public ExpireSnapshotsImpl(
SnapshotManager snapshotManager,
Expand All @@ -82,6 +84,11 @@ public ExpireSnapshotsImpl(
this.fileExecutor = snapshotDeletion.fileExecutor();
}

@VisibleForTesting
public void setCurrentTimeMillis(Supplier<Long> currentTimeMillis) {
this.currentTimeMillis = currentTimeMillis;
}

@Override
public ExpireSnapshots config(ExpireConfig expireConfig) {
this.expireConfig = expireConfig;
Expand All @@ -95,7 +102,7 @@ public int expire() {
int retainMin = expireConfig.getSnapshotRetainMin();
int maxDeletes = expireConfig.getSnapshotMaxDeletes();
long olderThanMills =
System.currentTimeMillis() - expireConfig.getSnapshotTimeRetain().toMillis();
currentTimeMillis.get() - expireConfig.getSnapshotTimeRetain().toMillis();

Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
Expand Down Expand Up @@ -136,10 +143,11 @@ public int expire() {

for (long id = min; id < maxExclusive; id++) {
// Early exit the loop for 'snapshot.time-retained'
// (the maximum time of snapshots to retain)
// A snapshot can only be expired if its next snapshot has been alive
// longer than snapshotTimeRetain, providing stronger protection
try {
Snapshot snapshot = snapshotManager.tryGetSnapshot(id);
if (olderThanMills <= snapshot.timeMillis()) {
Snapshot nextSnapshot = snapshotManager.tryGetSnapshot(id + 1);
if (olderThanMills <= nextSnapshot.timeMillis()) {
return expireUntil(earliest, id);
}
} catch (FileNotFoundException e) {
Expand All @@ -162,7 +170,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {

private int innerExpireUntil(long earliestId, long endExclusiveId)
throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
long startTime = currentTimeMillis.get();

if (endExclusiveId <= earliestId) {
// No expire happens:
Expand Down Expand Up @@ -268,7 +276,7 @@ private int innerExpireUntil(long earliestId, long endExclusiveId)
}

writeEarliestHint(endExclusiveId);
long duration = System.currentTimeMillis() - startTime;
long duration = currentTimeMillis.get() - startTime;
LOG.info(
"Finished expire snapshots, duration {} ms, range is [{}, {})",
duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.ExpireSnapshotsImpl;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
Expand Down Expand Up @@ -524,31 +527,90 @@ public void testExpireWithTime() throws Exception {
builder.snapshotRetainMin(1)
.snapshotRetainMax(Integer.MAX_VALUE)
.snapshotTimeRetain(Duration.ofMillis(1000));
ExpireSnapshots expire = store.newExpire(builder.build());
ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(builder.build());

List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
commit(5, allData, snapshotPositions);
Thread.sleep(1500);
for (int i = 1; i <= 5; i++) {
rewriteSnapshotTime(i, 0);
}
commit(5, allData, snapshotPositions);
long expireMillis = System.currentTimeMillis();
// expire twice to check for idempotence
for (int i = 6; i <= 10; i++) {
rewriteSnapshotTime(i, 2000);
}

// expire at time 2500, olderThanMills = 1500
expire.setCurrentTimeMillis(() -> 2500L);
// expire twice to check for idempotence
expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire();
expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire();

int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue();
for (int i = 1; i <= latestSnapshotId; i++) {
if (snapshotManager.snapshotExists(i)) {
assertThat(snapshotManager.snapshot(i).timeMillis())
.isBetween(expireMillis - 1000, expireMillis);
assertSnapshot(i, allData, snapshotPositions);
}
// snapshots 1-4 should be expired, snapshot 5 is retained because its next
// snapshot (6) is within the time window
for (int i = 1; i <= 4; i++) {
assertThat(snapshotManager.snapshotExists(i)).isFalse();
}
for (int i = 5; i <= latestSnapshotId; i++) {
assertThat(snapshotManager.snapshotExists(i)).isTrue();
assertSnapshot(i, allData, snapshotPositions);
}

store.assertCleaned();
}

@Test
public void testExpireWithTimeProtectsEachSnapshot() throws Exception {
// Even with a small retainMin, each snapshot should be protected by
// snapshotTimeRetain: a snapshot can only be expired when its next
// snapshot has been alive longer than snapshotTimeRetain.
ExpireConfig.Builder builder = ExpireConfig.builder();
builder.snapshotRetainMin(1)
.snapshotRetainMax(Integer.MAX_VALUE)
.snapshotTimeRetain(Duration.ofMillis(5000));
ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(builder.build());

List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();

// create 5 snapshots quickly
commit(5, allData, snapshotPositions);
for (int i = 1; i <= 5; i++) {
rewriteSnapshotTime(i, 0);
}

// expire immediately - no snapshot should be expired because each
// snapshot's next snapshot is still within the time window
expire.setCurrentTimeMillis(() -> 100L);
expire.config(builder.build()).expire();

for (int i = 1; i <= 5; i++) {
assertThat(snapshotManager.snapshotExists(i)).isTrue();
assertSnapshot(i, allData, snapshotPositions);
}

// create one more snapshot so snapshot 5 has a "next"
commit(1, allData, snapshotPositions);
rewriteSnapshotTime(6, 6000);

// expire again - now snapshots 1-4 can be expired (their next snapshots
// are older than 5000ms), but snapshot 5 is still protected because its
// next snapshot (6) was just created
expire.setCurrentTimeMillis(() -> 6500L);
expire.config(builder.build()).expire();

for (int i = 1; i <= 4; i++) {
assertThat(snapshotManager.snapshotExists(i)).isFalse();
}
assertThat(snapshotManager.snapshotExists(5)).isTrue();
assertThat(snapshotManager.snapshotExists(6)).isTrue();
assertSnapshot(5, allData, snapshotPositions);
assertSnapshot(6, allData, snapshotPositions);

store.assertCleaned();
}

@Test
public void testExpireWithUpgradedFile() throws Exception {
// write & commit data
Expand Down Expand Up @@ -740,6 +802,15 @@ private TestFileStore createStore() {
.build();
}

private void rewriteSnapshotTime(long snapshotId, long newTimeMillis) throws IOException {
String oldJson = fileIO.readFileUtf8(snapshotManager.snapshotPath(snapshotId));
ObjectNode node = (ObjectNode) JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(oldJson);
node.put("timeMillis", newTimeMillis);
String newJson = JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.writeValueAsString(node);
fileIO.overwriteFileUtf8(snapshotManager.snapshotPath(snapshotId), newJson);
snapshotManager.invalidateCache();
}

protected void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions)
throws Exception {
for (int i = 0; i < numCommits; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ public void testExpireSnapshotsProcedure() throws Exception {
+ "', retain_min => 3)");
checkSnapshots(snapshotManager, 4, 6);

// older_than => timestamp of snapshot 6, expected snapshots (6)
// older_than => timestamp + 1 of snapshot 6, expected snapshots (6)
sql(
"CALL sys.expire_snapshots(`table` => 'default.word_count', older_than => '"
+ ts6.toString()
+ new Timestamp(ts6.getTime() + 1)
+ "')");
checkSnapshots(snapshotManager, 6, 6);
}
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Excepti
.run();
checkSnapshots(snapshotManager, 4, 6);

// older_than => timestamp of snapshot 6, expected snapshots (6)
// older_than => timestamp + 1 of snapshot 6, expected snapshots (6)
createAction(
ExpireSnapshotsAction.class,
"expire_snapshots",
Expand All @@ -168,7 +168,7 @@ public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Excepti
"--table",
"word_count",
"--older_than",
ts6.toString(),
new Timestamp(ts6.getTime() + 1).toString(),
"--force_start_flink_job",
Boolean.toString(forceStartFlinkJob))
.withStreamExecutionEnvironment(env)
Expand Down
Loading