From a9b7f7ef8beb2ea5cebcfa22a03b20c42addbe22 Mon Sep 17 00:00:00 2001 From: wattt3 Date: Sun, 31 May 2026 18:47:11 +0900 Subject: [PATCH] [test] fix flaky testRebalanceWithRemoteLog --- .../rebalance/RebalanceManagerITCase.java | 2 +- .../testutils/FlussClusterExtension.java | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java index 16d06492cf..e6dbcd535a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java @@ -311,7 +311,7 @@ private void produceRecordsAndWaitRemoteLogCopy( 0, baseOffset + i * 10L); } - FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote( + FLUSS_CLUSTER_EXTENSION.waitUntilAllLogSegmentsCopyToRemote( new TableBucket(tb.getTableId(), 0)); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 48777a6f2e..a6e19f390c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -715,6 +715,32 @@ public void waitUntilSomeLogSegmentsCopyToRemote(TableBucket tableBucket) { }); } + /** + * Wait until all log segments (non-active) copy to remote. This method waits until the remote + * log end offset reaches the base offset of the active segment. + */ + public void waitUntilAllLogSegmentsCopyToRemote(TableBucket tableBucket) { + retry( + Duration.ofMinutes(2), + () -> { + int leader = waitAndGetLeader(tableBucket); + TabletServer tabletServer = getTabletServerById(leader); + ReplicaManager.HostedReplica hostedReplica = + tabletServer.getReplicaManager().getReplica(tableBucket); + assertThat(hostedReplica).isInstanceOf(ReplicaManager.OnlineReplica.class); + Replica replica = ((ReplicaManager.OnlineReplica) hostedReplica).getReplica(); + long activeSegmentBaseOffset = + replica.getLogTablet().activeLogSegment().getBaseOffset(); + assertThat( + tabletServer + .getReplicaManager() + .getRemoteLogManager() + .remoteLogTablet(tableBucket) + .getRemoteLogEndOffset()) + .hasValue(activeSegmentBaseOffset); + }); + } + public void triggerAndWaitSnapshot(TablePath tablePath) throws Exception { Optional table = zooKeeperClient.getTable(tablePath); //noinspection SimplifyOptionalCallChains (Java 8 compatibility)