Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.server.SequenceIDCounter;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.BucketSnapshot;
import org.apache.fluss.utils.CloseableRegistry;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.FlussPaths;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
Expand All @@ -55,6 +58,8 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT

private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter;

private final ZooKeeperClient zooKeeperClient;

private final RocksIncrementalSnapshot rocksIncrementalSnapshot;
private final FsPath remoteKvTabletDir;
private final FsPath remoteSnapshotSharedDir;
Expand Down Expand Up @@ -82,6 +87,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT
KvTabletSnapshotTarget(
TableBucket tableBucket,
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
ZooKeeperClient zooKeeperClient,
RocksIncrementalSnapshot rocksIncrementalSnapshot,
FsPath remoteKvTabletDir,
Executor ioExecutor,
Expand All @@ -97,6 +103,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT
this(
tableBucket,
completedKvSnapshotCommitter,
zooKeeperClient,
rocksIncrementalSnapshot,
remoteKvTabletDir,
(int) ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE.defaultValue().getBytes(),
Expand All @@ -114,6 +121,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT
public KvTabletSnapshotTarget(
TableBucket tableBucket,
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
@Nonnull ZooKeeperClient zooKeeperClient,
RocksIncrementalSnapshot rocksIncrementalSnapshot,
FsPath remoteKvTabletDir,
int snapshotWriteBufferSize,
Expand All @@ -129,6 +137,7 @@ public KvTabletSnapshotTarget(
throws IOException {
this.tableBucket = tableBucket;
this.completedKvSnapshotCommitter = completedKvSnapshotCommitter;
this.zooKeeperClient = zooKeeperClient;
this.rocksIncrementalSnapshot = rocksIncrementalSnapshot;
this.remoteKvTabletDir = remoteKvTabletDir;
this.remoteSnapshotSharedDir = FlussPaths.remoteKvSharedDir(remoteKvTabletDir);
Expand Down Expand Up @@ -211,18 +220,13 @@ public void handleSnapshotResult(
// commit the completed snapshot
completedKvSnapshotCommitter.commitKvSnapshot(
completedSnapshot, coordinatorEpoch, bucketLeaderEpoch);
// notify the snapshot complete
rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId);
logOffsetOfLatestSnapshot = snapshotResult.getLogOffset();
snapshotSize = snapshotResult.getSnapshotSize();
// update LogTablet to notify the lowest offset that should be retained
updateMinRetainOffset.accept(snapshotResult.getLogOffset());
// update local state after successful commit
updateStateOnCommitSuccess(snapshotId, snapshotResult);
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor);
handleSnapshotFailure(snapshotId, snapshotLocation, t);
// throw the exception to make PeriodicSnapshotManager can catch the exception
throw t;
// handle the exception with idempotent check
handleSnapshotCommitException(
snapshotId, snapshotResult, completedSnapshot, snapshotLocation, t);
}
}

Expand All @@ -249,6 +253,87 @@ protected RocksIncrementalSnapshot getRocksIncrementalSnapshot() {
return rocksIncrementalSnapshot;
}

/**
* Update local state after successful snapshot completion. This includes notifying RocksDB
* about completion, updating latest snapshot offset/size, and notifying LogTablet about the
* minimum offset to retain.
*/
private void updateStateOnCommitSuccess(long snapshotId, SnapshotResult snapshotResult) {
// notify the snapshot complete
rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId);
logOffsetOfLatestSnapshot = snapshotResult.getLogOffset();
snapshotSize = snapshotResult.getSnapshotSize();
// update LogTablet to notify the lowest offset that should be retained
updateMinRetainOffset.accept(snapshotResult.getLogOffset());
}

/**
* Handle snapshot commit exception with idempotent check. This method implements the fix for
* issue #1304 by double-checking ZooKeeper to verify if the snapshot actually exists before
* cleanup.
*/
private void handleSnapshotCommitException(
long snapshotId,
SnapshotResult snapshotResult,
CompletedSnapshot completedSnapshot,
SnapshotLocation snapshotLocation,
Throwable t)
throws Throwable {

// Fix for issue: https://github.com/apache/fluss/issues/1304
// Tablet server try to commit kv snapshot to coordinator server,
// coordinator server commit the kv snapshot to zk, then failover.
// Tablet server will got exception from coordinator server, but mistake it as a fail
// commit although coordinator server has committed to zk, then discard the commited kv
// snapshot.
//
// Idempotent check: Double check ZK to verify if the snapshot actually exists before
// cleanup
try {
Optional<BucketSnapshot> zkSnapshot =
zooKeeperClient.getTableBucketSnapshot(tableBucket, snapshotId);
if (zkSnapshot.isPresent()) {
// Snapshot exists in ZK, indicating the commit was actually successful,
// just response was lost
LOG.warn(
"Snapshot {} for TableBucket {} already exists in ZK. "
+ "The commit was successful but response was lost due to coordinator failover. "
+ "Skipping cleanup and treating as successful.",
snapshotId,
tableBucket);

// Update local state as if the commit was successful
updateStateOnCommitSuccess(snapshotId, snapshotResult);
return; // Snapshot commit succeeded, return directly
} else {
// Snapshot does not exist in ZK, indicating the commit truly failed
LOG.warn(
"Snapshot {} for TableBucket {} does not exist in ZK. "
+ "The commit truly failed, proceeding with cleanup.",
snapshotId,
tableBucket);
snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor);
handleSnapshotFailure(snapshotId, snapshotLocation, t);
}
} catch (Exception zkException) {
LOG.warn(
"Failed to query ZK for snapshot {} of TableBucket {}. "
+ "Cannot determine actual snapshot status, keeping snapshot in current state "
+ "to avoid potential data loss.",
snapshotId,
tableBucket,
zkException);
// When ZK query fails, we cannot determine the actual status.
// The snapshot might have succeeded or failed on the ZK side.
// Therefore, we must not clean up the snapshot files and not update local state.
// This avoids the risk of discarding a successfully committed snapshot that
// connectors may already be reading, which would cause data loss or job failure.
}

// throw the exception to make PeriodicSnapshotManager can catch the exception
throw t;
}

private SnapshotRunner createSnapshotRunner(CloseableRegistry cancelStreamRegistry) {
return new SnapshotRunner(rocksIncrementalSnapshot, cancelStreamRegistry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ protected static class PreviousSnapshot {

@Nonnull private final Map<String, KvFileHandle> confirmedSstFiles;

private PreviousSnapshot(@Nullable Collection<KvFileHandleAndLocalPath> confirmedSstFiles) {
protected PreviousSnapshot(
@Nullable Collection<KvFileHandleAndLocalPath> confirmedSstFiles) {
this.confirmedSstFiles =
confirmedSstFiles != null
? confirmedSstFiles.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ private void startPeriodicKvSnapshot(@Nullable CompletedSnapshot completedSnapsh
new KvTabletSnapshotTarget(
tableBucket,
completedKvSnapshotCommitter,
snapshotContext.getZooKeeperClient(),
rocksIncrementalSnapshot,
remoteKvTabletDir,
snapshotContext.getSnapshotFsWriteBufferSize(),
Expand Down
Loading