From 6ca43d28cec8cbd15cb221dcbacf850e0cd52cc9 Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Mon, 22 Sep 2025 19:08:22 +0800 Subject: [PATCH 1/3] fix kv snapshot commit inconsistency --- .../kv/snapshot/KvTabletSnapshotTarget.java | 60 +++++- .../kv/snapshot/RocksIncrementalSnapshot.java | 3 +- .../apache/fluss/server/replica/Replica.java | 1 + .../snapshot/KvTabletSnapshotTargetTest.java | 172 ++++++++++++++++++ 4 files changed, 233 insertions(+), 3 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java index d982d486ad..6d907e4641 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java @@ -23,6 +23,8 @@ import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.server.SequenceIDCounter; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketSnapshot; import org.apache.fluss.utils.CloseableRegistry; import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.FlussPaths; @@ -55,6 +57,8 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter; + private final ZooKeeperClient zooKeeperClient; + private final RocksIncrementalSnapshot rocksIncrementalSnapshot; private final FsPath remoteKvTabletDir; private final FsPath remoteSnapshotSharedDir; @@ -82,6 +86,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT KvTabletSnapshotTarget( TableBucket tableBucket, CompletedKvSnapshotCommitter completedKvSnapshotCommitter, + ZooKeeperClient zooKeeperClient, RocksIncrementalSnapshot rocksIncrementalSnapshot, FsPath remoteKvTabletDir, Executor ioExecutor, @@ -97,6 +102,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT this( tableBucket, completedKvSnapshotCommitter, + zooKeeperClient, rocksIncrementalSnapshot, remoteKvTabletDir, (int) ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE.defaultValue().getBytes(), @@ -114,6 +120,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT public KvTabletSnapshotTarget( TableBucket tableBucket, CompletedKvSnapshotCommitter completedKvSnapshotCommitter, + ZooKeeperClient zooKeeperClient, RocksIncrementalSnapshot rocksIncrementalSnapshot, FsPath remoteKvTabletDir, int snapshotWriteBufferSize, @@ -129,6 +136,7 @@ public KvTabletSnapshotTarget( throws IOException { this.tableBucket = tableBucket; this.completedKvSnapshotCommitter = completedKvSnapshotCommitter; + this.zooKeeperClient = zooKeeperClient; this.rocksIncrementalSnapshot = rocksIncrementalSnapshot; this.remoteKvTabletDir = remoteKvTabletDir; this.remoteSnapshotSharedDir = FlussPaths.remoteKvSharedDir(remoteKvTabletDir); @@ -219,8 +227,56 @@ public void handleSnapshotResult( updateMinRetainOffset.accept(snapshotResult.getLogOffset()); } catch (Exception e) { Throwable t = ExceptionUtils.stripExecutionException(e); - snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); - handleSnapshotFailure(snapshotId, snapshotLocation, t); + + // Fix for issue: https://github.com/apache/fluss/issues/1304 + // Tablet server try to commit kv snapshot to coordinator server, + // coordinator server commit the kv snapshot to zk, then failover. + // Tablet server will got exception from coordinator server, but mistake it as a fail + // commit although coordinator server has committed to zk, then discard the commited kv + // snapshot. + // + // Idempotent check: Double check ZK to verify if the snapshot actually exists before + // cleanup + boolean shouldCleanSnapshot = true; + try { + if (zooKeeperClient != null) { + Optional zkSnapshot = + zooKeeperClient.getTableBucketSnapshot(tableBucket, snapshotId); + if (zkSnapshot.isPresent()) { + // Snapshot exists in ZK, indicating the commit was actually successful, + // just response was lost + LOG.warn( + "Snapshot {} for TableBucket {} already exists in ZK. " + + "The commit was successful but response was lost due to coordinator failover. " + + "Skipping cleanup and treating as successful.", + snapshotId, + tableBucket); + + // Update local state as if the commit was successful + rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId); + logOffsetOfLatestSnapshot = snapshotResult.getLogOffset(); + snapshotSize = snapshotResult.getSnapshotSize(); + updateMinRetainOffset.accept(snapshotResult.getLogOffset()); + + shouldCleanSnapshot = false; + return; // Snapshot commit succeeded, return directly + } + } + } catch (Exception zkException) { + LOG.warn( + "Failed to query ZK for snapshot {} of TableBucket {}. " + + "Assuming commit failed and proceeding with cleanup.", + snapshotId, + tableBucket, + zkException); + } + + // Only execute cleanup when the snapshot truly does not exist in ZK + if (shouldCleanSnapshot) { + snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); + handleSnapshotFailure(snapshotId, snapshotLocation, t); + } + // throw the exception to make PeriodicSnapshotManager can catch the exception throw t; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java index 0723fd4888..fbfa42e1f7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java @@ -341,7 +341,8 @@ protected static class PreviousSnapshot { @Nonnull private final Map confirmedSstFiles; - private PreviousSnapshot(@Nullable Collection confirmedSstFiles) { + protected PreviousSnapshot( + @Nullable Collection confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles != null ? confirmedSstFiles.stream() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index e4e25de80f..3db0be3cc6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -808,6 +808,7 @@ private void startPeriodicKvSnapshot(@Nullable CompletedSnapshot completedSnapsh new KvTabletSnapshotTarget( tableBucket, completedKvSnapshotCommitter, + snapshotContext.getZooKeeperClient(), rocksIncrementalSnapshot, remoteKvTabletDir, snapshotContext.getSnapshotFsWriteBufferSize(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index 9b5a3b161a..318d0e754a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -297,6 +297,139 @@ void testAddToSnapshotToStoreFail(@TempDir Path kvTabletDir) throws Exception { assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(1L); } + @Test + void testIdempotentCheckWhenSnapshotExistsInZK(@TempDir Path kvTabletDir) throws Exception { + // Test case: coordinator commits to ZK successfully but response is lost due to failover + // The tablet server should detect the snapshot exists in ZK and skip cleanup + CompletedSnapshotHandleStore completedSnapshotHandleStore = + new ZooKeeperCompletedSnapshotHandleStore(zooKeeperClient); + FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile()); + + // Create a committer that commits to ZK first, then throws exception + // This simulates coordinator failover after ZK commit but before response + CompletedKvSnapshotCommitter failingAfterZkCommitCommitter = + (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> { + // Always commit to ZK first + CompletedSnapshotHandle handle = + new CompletedSnapshotHandle( + snapshot.getSnapshotID(), + snapshot.getSnapshotLocation(), + snapshot.getLogOffset()); + completedSnapshotHandleStore.add( + snapshot.getTableBucket(), snapshot.getSnapshotID(), handle); + + // Then throw exception - simulating coordinator failover after ZK commit + throw new FlussException("Coordinator failover after ZK commit"); + }; + + KvTabletSnapshotTarget kvTabletSnapshotTarget = + createSnapshotTargetWithCustomCommitter( + remoteKvTabletDir, failingAfterZkCommitCommitter); + + periodicSnapshotManager = createSnapshotManager(kvTabletSnapshotTarget); + periodicSnapshotManager.start(); + + RocksDB rocksDB = rocksDBExtension.getRocksDb(); + rocksDB.put("key1".getBytes(), "val1".getBytes()); + + // Trigger snapshot - will commit to ZK but throw exception + periodicSnapshotManager.triggerSnapshot(); + long snapshotId1 = 1; + + TestRocksIncrementalSnapshot rocksIncrementalSnapshot = + (TestRocksIncrementalSnapshot) kvTabletSnapshotTarget.getRocksIncrementalSnapshot(); + + // The snapshot should be treated as successful due to idempotent check + // Even though commit threw exception, idempotent check should find it in ZK + retry( + Duration.ofMinutes(1), + () -> + assertThat(rocksIncrementalSnapshot.completedSnapshots) + .contains(snapshotId1)); + + // Verify snapshot was not cleaned up and state was updated correctly + FsPath snapshotPath1 = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1); + assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isTrue(); + assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(1L); + } + + @Test + void testIdempotentCheckWhenSnapshotNotExistsInZK(@TempDir Path kvTabletDir) throws Exception { + // Test case: genuine commit failure - snapshot should not exist in ZK and cleanup should + // occur + FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile()); + + // Create a committer that always fails - simulating genuine coordinator failure + CompletedKvSnapshotCommitter alwaysFailingCommitter = + (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> { + throw new FlussException( + "Genuine coordinator failure - snapshot not committed to ZK"); + }; + + KvTabletSnapshotTarget kvTabletSnapshotTarget = + createSnapshotTargetWithCustomCommitter(remoteKvTabletDir, alwaysFailingCommitter); + + periodicSnapshotManager = createSnapshotManager(kvTabletSnapshotTarget); + periodicSnapshotManager.start(); + + RocksDB rocksDB = rocksDBExtension.getRocksDb(); + rocksDB.put("key1".getBytes(), "val1".getBytes()); + periodicSnapshotManager.triggerSnapshot(); + + long snapshotId1 = 1; + TestRocksIncrementalSnapshot rocksIncrementalSnapshot = + (TestRocksIncrementalSnapshot) kvTabletSnapshotTarget.getRocksIncrementalSnapshot(); + + // The snapshot should be aborted since it genuinely failed + retry( + Duration.ofMinutes(1), + () -> assertThat(rocksIncrementalSnapshot.abortedSnapshots).contains(snapshotId1)); + + // Verify cleanup occurred + FsPath snapshotPath1 = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1); + assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isFalse(); + assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE); + } + + @Test + void testIdempotentCheckWhenZKQueryFails(@TempDir Path kvTabletDir) throws Exception { + // Test case: ZK query fails - should fall back to cleanup (conservative approach) + FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile()); + + // Use null as ZK client to simulate ZK query failure + ZooKeeperClient failingZkClient = null; + + CompletedKvSnapshotCommitter failingCommitter = + (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> { + throw new FlussException("Commit failed"); + }; + + KvTabletSnapshotTarget kvTabletSnapshotTarget = + createSnapshotTargetWithCustomZkAndCommitter( + remoteKvTabletDir, failingZkClient, failingCommitter); + + periodicSnapshotManager = createSnapshotManager(kvTabletSnapshotTarget); + periodicSnapshotManager.start(); + + RocksDB rocksDB = rocksDBExtension.getRocksDb(); + rocksDB.put("key1".getBytes(), "val1".getBytes()); + periodicSnapshotManager.triggerSnapshot(); + + long snapshotId1 = 1; + TestRocksIncrementalSnapshot rocksIncrementalSnapshot = + (TestRocksIncrementalSnapshot) kvTabletSnapshotTarget.getRocksIncrementalSnapshot(); + + // Since ZK query failed, should fall back to cleanup (conservative approach) + retry( + Duration.ofMinutes(1), + () -> assertThat(rocksIncrementalSnapshot.abortedSnapshots).contains(snapshotId1)); + + // Verify cleanup occurred + FsPath snapshotPath1 = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1); + assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isFalse(); + assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE); + } + private PeriodicSnapshotManager createSnapshotManager( PeriodicSnapshotManager.SnapshotTarget target) { return new PeriodicSnapshotManager( @@ -343,6 +476,45 @@ private KvTabletSnapshotTarget createSnapshotTarget( return new KvTabletSnapshotTarget( tableBucket, new TestingStoreCompletedKvSnapshotCommitter(completedSnapshotStore), + zooKeeperClient, + rocksIncrementalSnapshot, + remoteKvTabletDir, + executor, + cancelStreamRegistry, + testingSnapshotIdCounter, + logOffsetGenerator::get, + updateMinRetainOffsetConsumer::set, + bucketLeaderEpochSupplier, + coordinatorEpochSupplier, + 0, + 0L); + } + + private KvTabletSnapshotTarget createSnapshotTargetWithCustomCommitter( + FsPath remoteKvTabletDir, CompletedKvSnapshotCommitter customCommitter) + throws IOException { + return createSnapshotTargetWithCustomZkAndCommitter( + remoteKvTabletDir, zooKeeperClient, customCommitter); + } + + private KvTabletSnapshotTarget createSnapshotTargetWithCustomZkAndCommitter( + FsPath remoteKvTabletDir, + ZooKeeperClient zkClient, + CompletedKvSnapshotCommitter customCommitter) + throws IOException { + TableBucket tableBucket = new TableBucket(1, 1); + Executor executor = Executors.directExecutor(); + RocksIncrementalSnapshot rocksIncrementalSnapshot = + createIncrementalSnapshot(SnapshotFailType.NONE); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + TestingSnapshotIDCounter testingSnapshotIdCounter = new TestingSnapshotIDCounter(); + Supplier bucketLeaderEpochSupplier = () -> 0; + Supplier coordinatorEpochSupplier = () -> 0; + + return new KvTabletSnapshotTarget( + tableBucket, + customCommitter, + zkClient, rocksIncrementalSnapshot, remoteKvTabletDir, executor, From 41b691f72f167ea775366b050791dbe550cf4e10 Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Tue, 23 Sep 2025 19:22:25 +0800 Subject: [PATCH 2/3] refactor code --- .../kv/snapshot/KvTabletSnapshotTarget.java | 145 ++++++++++-------- .../snapshot/KvTabletSnapshotTargetTest.java | 48 +++++- 2 files changed, 127 insertions(+), 66 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java index 6d907e4641..8d58a8512b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; @@ -57,7 +58,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter; - private final ZooKeeperClient zooKeeperClient; + @Nonnull private final ZooKeeperClient zooKeeperClient; private final RocksIncrementalSnapshot rocksIncrementalSnapshot; private final FsPath remoteKvTabletDir; @@ -86,7 +87,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT KvTabletSnapshotTarget( TableBucket tableBucket, CompletedKvSnapshotCommitter completedKvSnapshotCommitter, - ZooKeeperClient zooKeeperClient, + @Nonnull ZooKeeperClient zooKeeperClient, RocksIncrementalSnapshot rocksIncrementalSnapshot, FsPath remoteKvTabletDir, Executor ioExecutor, @@ -120,7 +121,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT public KvTabletSnapshotTarget( TableBucket tableBucket, CompletedKvSnapshotCommitter completedKvSnapshotCommitter, - ZooKeeperClient zooKeeperClient, + @Nonnull ZooKeeperClient zooKeeperClient, RocksIncrementalSnapshot rocksIncrementalSnapshot, FsPath remoteKvTabletDir, int snapshotWriteBufferSize, @@ -219,66 +220,13 @@ public void handleSnapshotResult( // commit the completed snapshot completedKvSnapshotCommitter.commitKvSnapshot( completedSnapshot, coordinatorEpoch, bucketLeaderEpoch); - // notify the snapshot complete - rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId); - logOffsetOfLatestSnapshot = snapshotResult.getLogOffset(); - snapshotSize = snapshotResult.getSnapshotSize(); - // update LogTablet to notify the lowest offset that should be retained - updateMinRetainOffset.accept(snapshotResult.getLogOffset()); + // update local state after successful commit + updateStateOnCommitSuccess(snapshotId, snapshotResult); } catch (Exception e) { Throwable t = ExceptionUtils.stripExecutionException(e); - - // Fix for issue: https://github.com/apache/fluss/issues/1304 - // Tablet server try to commit kv snapshot to coordinator server, - // coordinator server commit the kv snapshot to zk, then failover. - // Tablet server will got exception from coordinator server, but mistake it as a fail - // commit although coordinator server has committed to zk, then discard the commited kv - // snapshot. - // - // Idempotent check: Double check ZK to verify if the snapshot actually exists before - // cleanup - boolean shouldCleanSnapshot = true; - try { - if (zooKeeperClient != null) { - Optional zkSnapshot = - zooKeeperClient.getTableBucketSnapshot(tableBucket, snapshotId); - if (zkSnapshot.isPresent()) { - // Snapshot exists in ZK, indicating the commit was actually successful, - // just response was lost - LOG.warn( - "Snapshot {} for TableBucket {} already exists in ZK. " - + "The commit was successful but response was lost due to coordinator failover. " - + "Skipping cleanup and treating as successful.", - snapshotId, - tableBucket); - - // Update local state as if the commit was successful - rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId); - logOffsetOfLatestSnapshot = snapshotResult.getLogOffset(); - snapshotSize = snapshotResult.getSnapshotSize(); - updateMinRetainOffset.accept(snapshotResult.getLogOffset()); - - shouldCleanSnapshot = false; - return; // Snapshot commit succeeded, return directly - } - } - } catch (Exception zkException) { - LOG.warn( - "Failed to query ZK for snapshot {} of TableBucket {}. " - + "Assuming commit failed and proceeding with cleanup.", - snapshotId, - tableBucket, - zkException); - } - - // Only execute cleanup when the snapshot truly does not exist in ZK - if (shouldCleanSnapshot) { - snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); - handleSnapshotFailure(snapshotId, snapshotLocation, t); - } - - // throw the exception to make PeriodicSnapshotManager can catch the exception - throw t; + // handle the exception with idempotent check + handleSnapshotCommitException( + snapshotId, snapshotResult, completedSnapshot, snapshotLocation, t); } } @@ -305,6 +253,81 @@ protected RocksIncrementalSnapshot getRocksIncrementalSnapshot() { return rocksIncrementalSnapshot; } + /** + * Update local state after successful snapshot completion. This includes notifying RocksDB + * about completion, updating latest snapshot offset/size, and notifying LogTablet about the + * minimum offset to retain. + */ + private void updateStateOnCommitSuccess(long snapshotId, SnapshotResult snapshotResult) { + // notify the snapshot complete + rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId); + logOffsetOfLatestSnapshot = snapshotResult.getLogOffset(); + snapshotSize = snapshotResult.getSnapshotSize(); + // update LogTablet to notify the lowest offset that should be retained + updateMinRetainOffset.accept(snapshotResult.getLogOffset()); + } + + /** + * Handle snapshot commit exception with idempotent check. This method implements the fix for + * issue #1304 by double-checking ZooKeeper to verify if the snapshot actually exists before + * cleanup. + */ + private void handleSnapshotCommitException( + long snapshotId, + SnapshotResult snapshotResult, + CompletedSnapshot completedSnapshot, + SnapshotLocation snapshotLocation, + Throwable t) + throws Throwable { + + // Fix for issue: https://github.com/apache/fluss/issues/1304 + // Tablet server try to commit kv snapshot to coordinator server, + // coordinator server commit the kv snapshot to zk, then failover. + // Tablet server will got exception from coordinator server, but mistake it as a fail + // commit although coordinator server has committed to zk, then discard the commited kv + // snapshot. + // + // Idempotent check: Double check ZK to verify if the snapshot actually exists before + // cleanup + boolean shouldCleanSnapshot = true; + try { + Optional zkSnapshot = + zooKeeperClient.getTableBucketSnapshot(tableBucket, snapshotId); + if (zkSnapshot.isPresent()) { + // Snapshot exists in ZK, indicating the commit was actually successful, + // just response was lost + LOG.warn( + "Snapshot {} for TableBucket {} already exists in ZK. " + + "The commit was successful but response was lost due to coordinator failover. " + + "Skipping cleanup and treating as successful.", + snapshotId, + tableBucket); + + // Update local state as if the commit was successful + updateStateOnCommitSuccess(snapshotId, snapshotResult); + + shouldCleanSnapshot = false; + return; // Snapshot commit succeeded, return directly + } + } catch (Exception zkException) { + LOG.warn( + "Failed to query ZK for snapshot {} of TableBucket {}. " + + "Assuming commit failed and proceeding with cleanup.", + snapshotId, + tableBucket, + zkException); + } + + // Only execute cleanup when the snapshot truly does not exist in ZK + if (shouldCleanSnapshot) { + snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); + handleSnapshotFailure(snapshotId, snapshotLocation, t); + } + + // throw the exception to make PeriodicSnapshotManager can catch the exception + throw t; + } + private SnapshotRunner createSnapshotRunner(CloseableRegistry cancelStreamRegistry) { return new SnapshotRunner(rocksIncrementalSnapshot, cancelStreamRegistry); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index 318d0e754a..fafe4d09ba 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -29,9 +29,12 @@ import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.testutils.KvTestUtils; import org.apache.fluss.server.utils.ResourceGuard; +import org.apache.fluss.server.zk.CuratorFrameworkWithUnhandledErrorListener; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.BucketSnapshot; +import org.apache.fluss.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService; import org.apache.fluss.utils.CloseableRegistry; @@ -55,15 +58,19 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static org.apache.fluss.server.zk.ZooKeeperUtils.startZookeeperClient; +import static org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory.Builder; +import static org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory.builder; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; @@ -396,8 +403,8 @@ void testIdempotentCheckWhenZKQueryFails(@TempDir Path kvTabletDir) throws Excep // Test case: ZK query fails - should fall back to cleanup (conservative approach) FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile()); - // Use null as ZK client to simulate ZK query failure - ZooKeeperClient failingZkClient = null; + // Create a failing ZK client that throws exception to simulate ZK query failure + ZooKeeperClient failingZkClient = createFailingZooKeeperClient(); CompletedKvSnapshotCommitter failingCommitter = (snapshot, coordinatorEpoch, bucketLeaderEpoch) -> { @@ -552,10 +559,41 @@ private RocksIncrementalSnapshot createIncrementalSnapshot(SnapshotFailType snap snapshotFailType); } + private ZooKeeperClient createFailingZooKeeperClient() { + // Create a ZooKeeperClient that throws exception on getTableBucketSnapshot + return new FailingZooKeeperClient(); + } + + private static class FailingZooKeeperClient extends ZooKeeperClient { + + public FailingZooKeeperClient() { + // Create a new ZooKeeperClient using ZooKeeperUtils.startZookeeperClient + super(createCuratorFrameworkWrapper(), new Configuration()); + } + + private static CuratorFrameworkWithUnhandledErrorListener createCuratorFrameworkWrapper() { + Builder builder = + builder() + .connectString( + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)); + + return startZookeeperClient(builder, NOPErrorHandler.INSTANCE); + } + + @Override + public Optional getTableBucketSnapshot( + TableBucket tableBucket, long snapshotId) throws Exception { + throw new Exception("ZK query failed"); + } + } + private static final class TestRocksIncrementalSnapshot extends RocksIncrementalSnapshot { - private final Set abortedSnapshots = new HashSet<>(); - private final Set completedSnapshots = new HashSet<>(); + private final Set abortedSnapshots = ConcurrentHashMap.newKeySet(); + private final Set completedSnapshots = ConcurrentHashMap.newKeySet(); private final SnapshotFailType snapshotFailType; public TestRocksIncrementalSnapshot( From acde05487aa207512bfb0faf5233cca9d50160de Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Sun, 28 Sep 2025 13:43:05 +0800 Subject: [PATCH 3/3] do not disgard snapshot files when zk query failed --- .../kv/snapshot/KvTabletSnapshotTarget.java | 30 +++++++++++-------- .../snapshot/KvTabletSnapshotTargetTest.java | 23 ++++++++++---- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java index 8d58a8512b..c84a26b70f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java @@ -58,7 +58,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter; - @Nonnull private final ZooKeeperClient zooKeeperClient; + private final ZooKeeperClient zooKeeperClient; private final RocksIncrementalSnapshot rocksIncrementalSnapshot; private final FsPath remoteKvTabletDir; @@ -87,7 +87,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT KvTabletSnapshotTarget( TableBucket tableBucket, CompletedKvSnapshotCommitter completedKvSnapshotCommitter, - @Nonnull ZooKeeperClient zooKeeperClient, + ZooKeeperClient zooKeeperClient, RocksIncrementalSnapshot rocksIncrementalSnapshot, FsPath remoteKvTabletDir, Executor ioExecutor, @@ -289,7 +289,6 @@ private void handleSnapshotCommitException( // // Idempotent check: Double check ZK to verify if the snapshot actually exists before // cleanup - boolean shouldCleanSnapshot = true; try { Optional zkSnapshot = zooKeeperClient.getTableBucketSnapshot(tableBucket, snapshotId); @@ -305,23 +304,30 @@ private void handleSnapshotCommitException( // Update local state as if the commit was successful updateStateOnCommitSuccess(snapshotId, snapshotResult); - - shouldCleanSnapshot = false; return; // Snapshot commit succeeded, return directly + } else { + // Snapshot does not exist in ZK, indicating the commit truly failed + LOG.warn( + "Snapshot {} for TableBucket {} does not exist in ZK. " + + "The commit truly failed, proceeding with cleanup.", + snapshotId, + tableBucket); + snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); + handleSnapshotFailure(snapshotId, snapshotLocation, t); } } catch (Exception zkException) { LOG.warn( "Failed to query ZK for snapshot {} of TableBucket {}. " - + "Assuming commit failed and proceeding with cleanup.", + + "Cannot determine actual snapshot status, keeping snapshot in current state " + + "to avoid potential data loss.", snapshotId, tableBucket, zkException); - } - - // Only execute cleanup when the snapshot truly does not exist in ZK - if (shouldCleanSnapshot) { - snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); - handleSnapshotFailure(snapshotId, snapshotLocation, t); + // When ZK query fails, we cannot determine the actual status. + // The snapshot might have succeeded or failed on the ZK side. + // Therefore, we must not clean up the snapshot files and not update local state. + // This avoids the risk of discarding a successfully committed snapshot that + // connectors may already be reading, which would cause data loss or job failure. } // throw the exception to make PeriodicSnapshotManager can catch the exception diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index fafe4d09ba..9f9ea078da 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -400,7 +400,7 @@ void testIdempotentCheckWhenSnapshotNotExistsInZK(@TempDir Path kvTabletDir) thr @Test void testIdempotentCheckWhenZKQueryFails(@TempDir Path kvTabletDir) throws Exception { - // Test case: ZK query fails - should fall back to cleanup (conservative approach) + // Test case: ZK query fails - should keep snapshot in current state to avoid data loss FsPath remoteKvTabletDir = FsPath.fromLocalFile(kvTabletDir.toFile()); // Create a failing ZK client that throws exception to simulate ZK query failure @@ -426,14 +426,25 @@ void testIdempotentCheckWhenZKQueryFails(@TempDir Path kvTabletDir) throws Excep TestRocksIncrementalSnapshot rocksIncrementalSnapshot = (TestRocksIncrementalSnapshot) kvTabletSnapshotTarget.getRocksIncrementalSnapshot(); - // Since ZK query failed, should fall back to cleanup (conservative approach) + // Wait for snapshot processing to complete + // The snapshot should be created but commit will fail, then ZK query will fail + // In this case, the new logic should preserve the snapshot files (no cleanup) retry( Duration.ofMinutes(1), - () -> assertThat(rocksIncrementalSnapshot.abortedSnapshots).contains(snapshotId1)); + () -> { + // Verify that snapshot creation happened but neither completion nor abortion + // occurred + // Since both commit and ZK query failed, snapshot should remain in limbo state + FsPath snapshotPath1 = + FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1); + assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isTrue(); + assertThat(rocksIncrementalSnapshot.abortedSnapshots) + .doesNotContain(snapshotId1); + assertThat(rocksIncrementalSnapshot.completedSnapshots) + .doesNotContain(snapshotId1); + }); - // Verify cleanup occurred - FsPath snapshotPath1 = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1); - assertThat(snapshotPath1.getFileSystem().exists(snapshotPath1)).isFalse(); + // Verify local state was not updated (remain unchanged) assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(Long.MAX_VALUE); }