diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java index d982d486ad..c84a26b70f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java @@ -23,6 +23,8 @@ import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.server.SequenceIDCounter; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketSnapshot; import org.apache.fluss.utils.CloseableRegistry; import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.FlussPaths; @@ -30,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; @@ -55,6 +58,8 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter; + private final ZooKeeperClient zooKeeperClient; + private final RocksIncrementalSnapshot rocksIncrementalSnapshot; private final FsPath remoteKvTabletDir; private final FsPath remoteSnapshotSharedDir; @@ -82,6 +87,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT KvTabletSnapshotTarget( TableBucket tableBucket, CompletedKvSnapshotCommitter completedKvSnapshotCommitter, + ZooKeeperClient zooKeeperClient, RocksIncrementalSnapshot rocksIncrementalSnapshot, FsPath remoteKvTabletDir, Executor ioExecutor, @@ -97,6 +103,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT this( tableBucket, completedKvSnapshotCommitter, + zooKeeperClient, rocksIncrementalSnapshot, remoteKvTabletDir, (int) ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE.defaultValue().getBytes(), @@ -114,6 +121,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT public KvTabletSnapshotTarget( TableBucket tableBucket, CompletedKvSnapshotCommitter completedKvSnapshotCommitter, + @Nonnull ZooKeeperClient zooKeeperClient, RocksIncrementalSnapshot rocksIncrementalSnapshot, FsPath remoteKvTabletDir, int snapshotWriteBufferSize, @@ -129,6 +137,7 @@ public KvTabletSnapshotTarget( throws IOException { this.tableBucket = tableBucket; this.completedKvSnapshotCommitter = completedKvSnapshotCommitter; + this.zooKeeperClient = zooKeeperClient; this.rocksIncrementalSnapshot = rocksIncrementalSnapshot; this.remoteKvTabletDir = remoteKvTabletDir; this.remoteSnapshotSharedDir = FlussPaths.remoteKvSharedDir(remoteKvTabletDir); @@ -211,18 +220,13 @@ public void handleSnapshotResult( // commit the completed snapshot completedKvSnapshotCommitter.commitKvSnapshot( completedSnapshot, coordinatorEpoch, bucketLeaderEpoch); - // notify the snapshot complete - rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId); - logOffsetOfLatestSnapshot = snapshotResult.getLogOffset(); - snapshotSize = snapshotResult.getSnapshotSize(); - // update LogTablet to notify the lowest offset that should be retained - updateMinRetainOffset.accept(snapshotResult.getLogOffset()); + // update local state after successful commit + updateStateOnCommitSuccess(snapshotId, snapshotResult); } catch (Exception e) { Throwable t = ExceptionUtils.stripExecutionException(e); - snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); - handleSnapshotFailure(snapshotId, snapshotLocation, t); - // throw the exception to make PeriodicSnapshotManager can catch the exception - throw t; + // handle the exception with idempotent check + handleSnapshotCommitException( + snapshotId, snapshotResult, completedSnapshot, snapshotLocation, t); } } @@ -249,6 +253,87 @@ protected RocksIncrementalSnapshot getRocksIncrementalSnapshot() { return rocksIncrementalSnapshot; } + /** + * Update local state after successful snapshot completion. This includes notifying RocksDB + * about completion, updating latest snapshot offset/size, and notifying LogTablet about the + * minimum offset to retain. + */ + private void updateStateOnCommitSuccess(long snapshotId, SnapshotResult snapshotResult) { + // notify the snapshot complete + rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId); + logOffsetOfLatestSnapshot = snapshotResult.getLogOffset(); + snapshotSize = snapshotResult.getSnapshotSize(); + // update LogTablet to notify the lowest offset that should be retained + updateMinRetainOffset.accept(snapshotResult.getLogOffset()); + } + + /** + * Handle snapshot commit exception with idempotent check. This method implements the fix for + * issue #1304 by double-checking ZooKeeper to verify if the snapshot actually exists before + * cleanup. + */ + private void handleSnapshotCommitException( + long snapshotId, + SnapshotResult snapshotResult, + CompletedSnapshot completedSnapshot, + SnapshotLocation snapshotLocation, + Throwable t) + throws Throwable { + + // Fix for issue: https://github.com/apache/fluss/issues/1304 + // Tablet server try to commit kv snapshot to coordinator server, + // coordinator server commit the kv snapshot to zk, then failover. + // Tablet server will got exception from coordinator server, but mistake it as a fail + // commit although coordinator server has committed to zk, then discard the commited kv + // snapshot. + // + // Idempotent check: Double check ZK to verify if the snapshot actually exists before + // cleanup + try { + Optional zkSnapshot = + zooKeeperClient.getTableBucketSnapshot(tableBucket, snapshotId); + if (zkSnapshot.isPresent()) { + // Snapshot exists in ZK, indicating the commit was actually successful, + // just response was lost + LOG.warn( + "Snapshot {} for TableBucket {} already exists in ZK. " + + "The commit was successful but response was lost due to coordinator failover. " + + "Skipping cleanup and treating as successful.", + snapshotId, + tableBucket); + + // Update local state as if the commit was successful + updateStateOnCommitSuccess(snapshotId, snapshotResult); + return; // Snapshot commit succeeded, return directly + } else { + // Snapshot does not exist in ZK, indicating the commit truly failed + LOG.warn( + "Snapshot {} for TableBucket {} does not exist in ZK. " + + "The commit truly failed, proceeding with cleanup.", + snapshotId, + tableBucket); + snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); + handleSnapshotFailure(snapshotId, snapshotLocation, t); + } + } catch (Exception zkException) { + LOG.warn( + "Failed to query ZK for snapshot {} of TableBucket {}. " + + "Cannot determine actual snapshot status, keeping snapshot in current state " + + "to avoid potential data loss.", + snapshotId, + tableBucket, + zkException); + // When ZK query fails, we cannot determine the actual status. + // The snapshot might have succeeded or failed on the ZK side. + // Therefore, we must not clean up the snapshot files and not update local state. + // This avoids the risk of discarding a successfully committed snapshot that + // connectors may already be reading, which would cause data loss or job failure. + } + + // throw the exception to make PeriodicSnapshotManager can catch the exception + throw t; + } + private SnapshotRunner createSnapshotRunner(CloseableRegistry cancelStreamRegistry) { return new SnapshotRunner(rocksIncrementalSnapshot, cancelStreamRegistry); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java index 0723fd4888..fbfa42e1f7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java @@ -341,7 +341,8 @@ protected static class PreviousSnapshot { @Nonnull private final Map confirmedSstFiles; - private PreviousSnapshot(@Nullable Collection confirmedSstFiles) { + protected PreviousSnapshot( + @Nullable Collection confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles != null ? confirmedSstFiles.stream() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index e4e25de80f..3db0be3cc6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -808,6 +808,7 @@ private void startPeriodicKvSnapshot(@Nullable CompletedSnapshot completedSnapsh new KvTabletSnapshotTarget( tableBucket, completedKvSnapshotCommitter, + snapshotContext.getZooKeeperClient(), rocksIncrementalSnapshot, remoteKvTabletDir, snapshotContext.getSnapshotFsWriteBufferSize(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index 9b5a3b161a..9f9ea078da 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -29,9 +29,12 @@ import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.testutils.KvTestUtils; import org.apache.fluss.server.utils.ResourceGuard; +import org.apache.fluss.server.zk.CuratorFrameworkWithUnhandledErrorListener; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.BucketSnapshot; +import org.apache.fluss.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService; import org.apache.fluss.utils.CloseableRegistry; @@ -55,15 +58,19 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static org.apache.fluss.server.zk.ZooKeeperUtils.startZookeeperClient; +import static org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory.Builder; +import static org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory.builder; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; @@ -297,6 +304,150 @@ void testAddToSnapshotToStoreFail(@TempDir Path kvTabletDir) throws Exception { assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(1L); } + @Test + void testIdempotentCheckWhenSnapshotExistsInZK(@TempDir Path kvTabletDir) throws Exception { + // Test case: coordinator commits to ZK successfully but response is lost due to failover + // The tablet server should detect the snapshot exists in ZK and skip cleanup + CompletedSnapshotHandleStore completedSnapshotHandleStore = + new ZooKeeperCompletedSnapshotHandleStore(zooKeeperClient); + FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile()); + + // Create a committer that commits to ZK first, then throws exception + // This simulates coordinator failover after ZK commit but before response + CompletedKvSnapshotCommitter failingAfterZkCommitCommitter = + (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> { + // Always commit to ZK first + CompletedSnapshotHandle handle = + new CompletedSnapshotHandle( + snapshot.getSnapshotID(), + snapshot.getSnapshotLocation(), + snapshot.getLogOffset()); + completedSnapshotHandleStore.add( + snapshot.getTableBucket(), snapshot.getSnapshotID(), handle); + + // Then throw exception - simulating coordinator failover after ZK commit + throw new FlussException("Coordinator failover after ZK commit"); + }; + + KvTabletSnapshotTarget kvTabletSnapshotTarget = + createSnapshotTargetWithCustomCommitter( + remoteKvTabletDir, failingAfterZkCommitCommitter); + + periodicSnapshotManager = createSnapshotManager(kvTabletSnapshotTarget); + periodicSnapshotManager.start(); + + RocksDB rocksDB = rocksDBExtension.getRocksDb(); + rocksDB.put("key1".getBytes(), "val1".getBytes()); + + // Trigger snapshot - will commit to ZK but throw exception + periodicSnapshotManager.triggerSnapshot(); + long snapshotId1 = 1; + + TestRocksIncrementalSnapshot rocksIncrementalSnapshot = + (TestRocksIncrementalSnapshot) kvTabletSnapshotTarget.getRocksIncrementalSnapshot(); + + // The snapshot should be treated as successful due to idempotent check + // Even though commit threw exception, idempotent check should find it in ZK + retry( + Duration.ofMinutes(1), + () -> + assertThat(rocksIncrementalSnapshot.completedSnapshots) + .contains(snapshotId1)); + + // Verify snapshot was not cleaned up and state was updated correctly + FsPath snapshotPath1 = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1); + assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isTrue(); + assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(1L); + } + + @Test + void testIdempotentCheckWhenSnapshotNotExistsInZK(@TempDir Path kvTabletDir) throws Exception { + // Test case: genuine commit failure - snapshot should not exist in ZK and cleanup should + // occur + FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile()); + + // Create a committer that always fails - simulating genuine coordinator failure + CompletedKvSnapshotCommitter alwaysFailingCommitter = + (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> { + throw new FlussException( + "Genuine coordinator failure - snapshot not committed to ZK"); + }; + + KvTabletSnapshotTarget kvTabletSnapshotTarget = + createSnapshotTargetWithCustomCommitter(remoteKvTabletDir, alwaysFailingCommitter); + + periodicSnapshotManager = createSnapshotManager(kvTabletSnapshotTarget); + periodicSnapshotManager.start(); + + RocksDB rocksDB = rocksDBExtension.getRocksDb(); + rocksDB.put("key1".getBytes(), "val1".getBytes()); + periodicSnapshotManager.triggerSnapshot(); + + long snapshotId1 = 1; + TestRocksIncrementalSnapshot rocksIncrementalSnapshot = + (TestRocksIncrementalSnapshot) kvTabletSnapshotTarget.getRocksIncrementalSnapshot(); + + // The snapshot should be aborted since it genuinely failed + retry( + Duration.ofMinutes(1), + () -> assertThat(rocksIncrementalSnapshot.abortedSnapshots).contains(snapshotId1)); + + // Verify cleanup occurred + FsPath snapshotPath1 = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1); + assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isFalse(); + assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE); + } + + @Test + void testIdempotentCheckWhenZKQueryFails(@TempDir Path kvTabletDir) throws Exception { + // Test case: ZK query fails - should keep snapshot in current state to avoid data loss + FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile()); + + // Create a failing ZK client that throws exception to simulate ZK query failure + ZooKeeperClient failingZkClient = createFailingZooKeeperClient(); + + CompletedKvSnapshotCommitter failingCommitter = + (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> { + throw new FlussException("Commit failed"); + }; + + KvTabletSnapshotTarget kvTabletSnapshotTarget = + createSnapshotTargetWithCustomZkAndCommitter( + remoteKvTabletDir, failingZkClient, failingCommitter); + + periodicSnapshotManager = createSnapshotManager(kvTabletSnapshotTarget); + periodicSnapshotManager.start(); + + RocksDB rocksDB = rocksDBExtension.getRocksDb(); + rocksDB.put("key1".getBytes(), "val1".getBytes()); + periodicSnapshotManager.triggerSnapshot(); + + long snapshotId1 = 1; + TestRocksIncrementalSnapshot rocksIncrementalSnapshot = + (TestRocksIncrementalSnapshot) kvTabletSnapshotTarget.getRocksIncrementalSnapshot(); + + // Wait for snapshot processing to complete + // The snapshot should be created but commit will fail, then ZK query will fail + // In this case, the new logic should preserve the snapshot files (no cleanup) + retry( + Duration.ofMinutes(1), + () -> { + // Verify that snapshot creation happened but neither completion nor abortion + // occurred + // Since both commit and ZK query failed, snapshot should remain in limbo state + FsPath snapshotPath1 = + FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1); + assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isTrue(); + assertThat(rocksIncrementalSnapshot.abortedSnapshots) + .doesNotContain(snapshotId1); + assertThat(rocksIncrementalSnapshot.completedSnapshots) + .doesNotContain(snapshotId1); + }); + + // Verify local state was not updated (remain unchanged) + assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE); + } + private PeriodicSnapshotManager createSnapshotManager( PeriodicSnapshotManager.SnapshotTarget target) { return new PeriodicSnapshotManager( @@ -343,6 +494,45 @@ private KvTabletSnapshotTarget createSnapshotTarget( return new KvTabletSnapshotTarget( tableBucket, new TestingStoreCompletedKvSnapshotCommitter(completedSnapshotStore), + zooKeeperClient, + rocksIncrementalSnapshot, + remoteKvTabletDir, + executor, + cancelStreamRegistry, + testingSnapshotIdCounter, + logOffsetGenerator::get, + updateMinRetainOffsetConsumer::set, + bucketLeaderEpochSupplier, + coordinatorEpochSupplier, + 0, + 0L); + } + + private KvTabletSnapshotTarget createSnapshotTargetWithCustomCommitter( + FsPath remoteKvTabletDir, CompletedKvSnapshotCommitter customCommitter) + throws IOException { + return createSnapshotTargetWithCustomZkAndCommitter( + remoteKvTabletDir, zooKeeperClient, customCommitter); + } + + private KvTabletSnapshotTarget createSnapshotTargetWithCustomZkAndCommitter( + FsPath remoteKvTabletDir, + ZooKeeperClient zkClient, + CompletedKvSnapshotCommitter customCommitter) + throws IOException { + TableBucket tableBucket = new TableBucket(1, 1); + Executor executor = Executors.directExecutor(); + RocksIncrementalSnapshot rocksIncrementalSnapshot = + createIncrementalSnapshot(SnapshotFailType.NONE); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + TestingSnapshotIDCounter testingSnapshotIdCounter = new TestingSnapshotIDCounter(); + Supplier bucketLeaderEpochSupplier = () -> 0; + Supplier coordinatorEpochSupplier = () -> 0; + + return new KvTabletSnapshotTarget( + tableBucket, + customCommitter, + zkClient, rocksIncrementalSnapshot, remoteKvTabletDir, executor, @@ -380,10 +570,41 @@ private RocksIncrementalSnapshot createIncrementalSnapshot(SnapshotFailType snap snapshotFailType); } + private ZooKeeperClient createFailingZooKeeperClient() { + // Create a ZooKeeperClient that throws exception on getTableBucketSnapshot + return new FailingZooKeeperClient(); + } + + private static class FailingZooKeeperClient extends ZooKeeperClient { + + public FailingZooKeeperClient() { + // Create a new ZooKeeperClient using ZooKeeperUtils.startZookeeperClient + super(createCuratorFrameworkWrapper(), new Configuration()); + } + + private static CuratorFrameworkWithUnhandledErrorListener createCuratorFrameworkWrapper() { + Builder builder = + builder() + .connectString( + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)); + + return startZookeeperClient(builder, NOPErrorHandler.INSTANCE); + } + + @Override + public Optional getTableBucketSnapshot( + TableBucket tableBucket, long snapshotId) throws Exception { + throw new Exception("ZK query failed"); + } + } + private static final class TestRocksIncrementalSnapshot extends RocksIncrementalSnapshot { - private final Set abortedSnapshots = new HashSet<>(); - private final Set completedSnapshots = new HashSet<>(); + private final Set abortedSnapshots = ConcurrentHashMap.newKeySet(); + private final Set completedSnapshots = ConcurrentHashMap.newKeySet(); private final SnapshotFailType snapshotFailType; public TestRocksIncrementalSnapshot(