Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -745,33 +745,37 @@ public CompletedSnapshot triggerAndWaitSnapshot(TableBucket tableBucket) {
}

private Long triggerSnapshot(TableBucket tableBucket) {
Long snapshotId = null;
Long nextSnapshotId = null;
for (TabletServer ts : tabletServers.values()) {
ReplicaManager.HostedReplica replica = ts.getReplicaManager().getReplica(tableBucket);
if (replica instanceof ReplicaManager.OnlineReplica) {
Replica r = ((ReplicaManager.OnlineReplica) replica).getReplica();
PeriodicSnapshotManager kvSnapshotManager = r.getKvSnapshotManager();
if (r.isLeader() && kvSnapshotManager != null) {
snapshotId = kvSnapshotManager.currentSnapshotId();
long snapshotId = kvSnapshotManager.currentSnapshotId();
kvSnapshotManager.triggerSnapshot();
nextSnapshotId = kvSnapshotManager.currentSnapshotId();
break;
// Poll until the snapshot ID increments, confirming the async trigger was
// processed. triggerSnapshot() submits work to a guardedExecutor
// asynchronously, so the counter may not have incremented yet on return.
// If the ID does not increment within the timeout, the snapshot was
// legitimately skipped (e.g., no new data since last snapshot).
long deadline = System.currentTimeMillis() + 1_000;
while (kvSnapshotManager.currentSnapshotId() <= snapshotId) {
if (System.currentTimeMillis() > deadline) {
return null;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
return snapshotId;
}
}
}

if (snapshotId != null) {
if (nextSnapshotId > snapshotId) {
// only there is a new snapshot triggered, we return the snapshot id
return snapshotId;
} else {
return null;
}
} else {
fail("No KV snapshot manager found for table bucket " + tableBucket);
return null;
}
fail("No KV snapshot manager found for table bucket " + tableBucket);
return null;
}

private CompletedSnapshot waitUntilSnapshotFinished(TableBucket tableBucket, long snapshotId) {
Expand Down