diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 305a4269aa..a342d3c07a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -745,33 +745,37 @@ public CompletedSnapshot triggerAndWaitSnapshot(TableBucket tableBucket) { } private Long triggerSnapshot(TableBucket tableBucket) { - Long snapshotId = null; - Long nextSnapshotId = null; for (TabletServer ts : tabletServers.values()) { ReplicaManager.HostedReplica replica = ts.getReplicaManager().getReplica(tableBucket); if (replica instanceof ReplicaManager.OnlineReplica) { Replica r = ((ReplicaManager.OnlineReplica) replica).getReplica(); PeriodicSnapshotManager kvSnapshotManager = r.getKvSnapshotManager(); if (r.isLeader() && kvSnapshotManager != null) { - snapshotId = kvSnapshotManager.currentSnapshotId(); + long snapshotId = kvSnapshotManager.currentSnapshotId(); kvSnapshotManager.triggerSnapshot(); - nextSnapshotId = kvSnapshotManager.currentSnapshotId(); - break; + // Poll until the snapshot ID increments, confirming the async trigger was + // processed. triggerSnapshot() submits work to a guardedExecutor + // asynchronously, so the counter may not have incremented yet on return. + // If the ID does not increment within the timeout, the snapshot was + // legitimately skipped (e.g., no new data since last snapshot). + long deadline = System.currentTimeMillis() + 1_000; + while (kvSnapshotManager.currentSnapshotId() <= snapshotId) { + if (System.currentTimeMillis() > deadline) { + return null; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return snapshotId; } } } - - if (snapshotId != null) { - if (nextSnapshotId > snapshotId) { - // only there is a new snapshot triggered, we return the snapshot id - return snapshotId; - } else { - return null; - } - } else { - fail("No KV snapshot manager found for table bucket " + tableBucket); - return null; - } + fail("No KV snapshot manager found for table bucket " + tableBucket); + return null; } private CompletedSnapshot waitUntilSnapshotFinished(TableBucket tableBucket, long snapshotId) {