From 2af47791905abc8ed10b7b7251be0780fb4f5ba6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E9=B5=BA?= Date: Wed, 8 Apr 2026 07:39:25 +0800 Subject: [PATCH] [fluss-server] Restrict replicasOnOffline to fatal errors only Only fatal errors (STORAGE_EXCEPTION, LOG_STORAGE_EXCEPTION, KV_STORAGE_EXCEPTION, UNKNOWN_SERVER_ERROR) mark replicas offline. All other NotifyLeaderAndIsr errors are transient and do not affect leader election eligibility. Also fixes ReplicaManager to return LeaderNotAvailableException instead of StorageException when leaderId is null or negative. --- .../CoordinatorEventProcessor.java | 68 +++- .../fluss/server/replica/ReplicaManager.java | 10 +- .../CoordinatorEventProcessorTest.java | 357 ++++++++++++++++++ .../tablet/TestTabletServerGateway.java | 3 +- 4 files changed, 426 insertions(+), 12 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index ce6f7f3369..c9a83fcbc0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -59,6 +59,7 @@ import org.apache.fluss.rpc.messages.RebalanceResponse; import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.event.AccessContextEvent; import org.apache.fluss.server.coordinator.event.AddServerTagEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; @@ -342,8 +343,14 @@ private void initCoordinatorContext() throws Exception { long start4loadTabletServer = System.currentTimeMillis(); Map tabletServerRegistrations = zooKeeperClient.getTabletServers(currentServers); + List skippedNullRegistration = new ArrayList<>(); + List skippedNoEndpoint = new ArrayList<>(); for (int server : currentServers) { TabletServerRegistration registration = tabletServerRegistrations.get(server); + if (registration == null) { + skippedNullRegistration.add(server); + continue; + } ServerInfo serverInfo = new ServerInfo( server, @@ -357,6 +364,7 @@ private void initCoordinatorContext() throws Exception { "Can not find endpoint for listener name {} for tablet server {}", internalListenerName, serverInfo); + skippedNoEndpoint.add(server); continue; } tabletServerInfos.add(serverInfo); @@ -370,8 +378,30 @@ private void initCoordinatorContext() throws Exception { coordinatorContext.setLiveTabletServers(tabletServerInfos); LOG.info( - "Load tablet servers success in {}ms when initializing coordinator context.", - System.currentTimeMillis() - start4loadTabletServer); + "Load tablet servers success in {}ms when initializing coordinator context. " + + "ZK returned {} servers, loaded {} into liveSet, " + + "skipped {} (null registration), skipped {} (no endpoint). " + + "Live server IDs: {}", + System.currentTimeMillis() - start4loadTabletServer, + currentServers.length, + tabletServerInfos.size(), + skippedNullRegistration.size(), + skippedNoEndpoint.size(), + tabletServerInfos.stream() + .map(s -> String.valueOf(s.id())) + .collect(Collectors.joining(","))); + if (!skippedNullRegistration.isEmpty()) { + LOG.warn( + "Skipped {} servers with null ZK registration: {}", + skippedNullRegistration.size(), + skippedNullRegistration); + } + if (!skippedNoEndpoint.isEmpty()) { + LOG.warn( + "Skipped {} servers with no internal endpoint: {}", + skippedNoEndpoint.size(), + skippedNoEndpoint); + } // init tablet server channels coordinatorChannelManager.startup(internalServerNodes); @@ -938,11 +968,25 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent( notifyLeaderAndIsrResponseReceivedEvent.getNotifyLeaderAndIsrResultForBuckets(); for (NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket : notifyLeaderAndIsrResultForBuckets) { - // if the error code is not none, we will consider it as offline if (notifyLeaderAndIsrResultForBucket.failed()) { - offlineReplicas.add( - new TableBucketReplica( - notifyLeaderAndIsrResultForBucket.getTableBucket(), serverId)); + Errors error = notifyLeaderAndIsrResultForBucket.getError().error(); + TableBucket tableBucket = notifyLeaderAndIsrResultForBucket.getTableBucket(); + if (isFatalReplicaError(error)) { + LOG.warn( + "Fatal NotifyLeaderAndIsr error for bucket {} on server {}: {}. " + + "Marking replica offline.", + tableBucket, + serverId, + notifyLeaderAndIsrResultForBucket.getError()); + offlineReplicas.add(new TableBucketReplica(tableBucket, serverId)); + } else { + LOG.warn( + "Transient NotifyLeaderAndIsr error for bucket {} on server {}: {}. " + + "Replica remains online.", + tableBucket, + serverId, + notifyLeaderAndIsrResultForBucket.getError()); + } } } if (!offlineReplicas.isEmpty()) { @@ -951,6 +995,18 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent( } } + /** + * Returns true if the error indicates a fatal replica failure (storage corruption, unknown + * internal error) that warrants excluding this replica from future leader elections. All other + * errors are considered transient and should NOT mark the replica offline. + */ + private static boolean isFatalReplicaError(Errors error) { + return error == Errors.STORAGE_EXCEPTION + || error == Errors.LOG_STORAGE_EXCEPTION + || error == Errors.KV_STORAGE_EXCEPTION + || error == Errors.UNKNOWN_SERVER_ERROR; + } + private void onReplicaBecomeOffline(Set offlineReplicas) { LOG.info("The replica {} become offline.", offlineReplicas); for (TableBucketReplica offlineReplica : offlineReplicas) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index c57f1c6fb7..6f54147379 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidColumnProjectionException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidRequiredAcksException; +import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NotLeaderOrFollowerException; @@ -1188,17 +1189,16 @@ private void addFetcherForReplicas( Integer leaderId = replica.getLeaderId(); TableBucket tb = replica.getTableBucket(); LogTablet logTablet = replica.getLogTablet(); - if (leaderId == null) { + if (leaderId == null || leaderId < 0) { result.put( tb, new NotifyLeaderAndIsrResultForBucket( tb, ApiError.fromThrowable( - new StorageException( + new LeaderNotAvailableException( String.format( - "Could not find leader for follower replica %s while make " - + "follower for %s.", - serverId, tb))))); + "No leader available for follower replica %s on server %s.", + tb, serverId))))); } else { bucketAndStatus.put( tb, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 0d304df1e4..3082d0f542 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -42,18 +42,23 @@ import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; +import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.ApiKeys; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.event.AccessContextEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; +import org.apache.fluss.server.coordinator.event.DeadTabletServerEvent; +import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.statemachine.BucketState; import org.apache.fluss.server.coordinator.statemachine.ReplicaState; import org.apache.fluss.server.entity.AdjustIsrResultForBucket; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.CommitRemoteLogManifestData; +import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; @@ -1039,6 +1044,358 @@ void testDoBucketReassignment() throws Exception { ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupPath(ZkData.ServerIdZNode.path(3)); } + /** + * Verifies that fatal errors (KvStorageException, UNKNOWN_SERVER_ERROR) in NotifyLeaderAndIsr + * responses correctly mark the replica offline, preventing it from being elected as leader. + */ + @Test + void testFatalErrorMarksReplicaOffline() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "fatal_error_test"); + long tableId = + createTable( + tablePath, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx(ctx -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + int leader = leaderAndIsr.leader(); + + List followers = + leaderAndIsr.isr().stream().filter(id -> id != leader).collect(Collectors.toList()); + int follower1 = followers.get(0); + int follower2 = followers.get(1); + + // follower1 returns KvStorageException — fatal, should be marked offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new org.apache.fluss.exception + .KvStorageException( + "kv storage error")))), + follower1)); + + // follower2 returns UNKNOWN_SERVER_ERROR — fatal, should be marked offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + new ApiError( + Errors.UNKNOWN_SERVER_ERROR, + "unexpected NPE"))), + follower2)); + + // Both followers should be offline. + retryVerifyContext( + ctx -> { + assertThat(ctx.isReplicaOnline(follower1, tb)) + .as("follower1 should be offline after KvStorageException") + .isFalse(); + assertThat(ctx.isReplicaOnline(follower2, tb)) + .as("follower2 should be offline after UNKNOWN_SERVER_ERROR") + .isFalse(); + }); + + // Leader dies — no viable candidates (both followers offline). + eventProcessor.getCoordinatorEventManager().put(new DeadTabletServerEvent(leader)); + + // Bucket should remain without a leader (or leader == -1). + retryVerifyContext( + ctx -> { + Optional lai = ctx.getBucketLeaderAndIsr(tb); + assertThat(lai).isPresent(); + assertThat(lai.get().leader()) + .as("No viable candidate — leader should be -1") + .isEqualTo(-1); + }); + } + + /** + * Verifies that transient errors (FencedLeaderEpochException, LeaderNotAvailableException) in + * NotifyLeaderAndIsr responses do NOT mark the replica offline. The replica remains eligible + * for leader election. + * + *

LeaderNotAvailableException specifically validates the ReplicaManager fix: when no leader + * is available for a follower replica, the error is now correctly classified as transient + * (LeaderNotAvailableException) instead of fatal (StorageException). + * + *

Scenario: ISR = {leader, follower1, follower2}. + * + *

    + *
  1. follower1 returns FencedLeaderEpochException (transient) + *
  2. follower2 returns LeaderNotAvailableException (transient) + *
  3. leader dies + *
  4. Either follower1 or follower2 should be elected as new leader + *
+ */ + @Test + void testTransientErrorDoesNotMarkReplicaOffline() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "transient_error_test"); + long tableId = + createTable( + tablePath, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx(ctx -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + int leader = leaderAndIsr.leader(); + + List followers = + leaderAndIsr.isr().stream().filter(id -> id != leader).collect(Collectors.toList()); + int follower1 = followers.get(0); + int follower2 = followers.get(1); + + // follower1 returns FencedLeaderEpochException — transient, should NOT be offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new FencedLeaderEpochException( + "stale epoch")))), + follower1)); + + // follower2 returns LeaderNotAvailableException — transient, should NOT be offline. + // This also validates the ReplicaManager fix: "no leader" now produces + // LeaderNotAvailableException (transient) instead of StorageException (fatal). + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new org.apache.fluss.exception + .LeaderNotAvailableException( + "no leader yet")))), + follower2)); + + // Verify both followers remain online after transient errors. + retryVerifyContext( + ctx -> { + assertThat(ctx.isReplicaOnline(follower1, tb)) + .as("follower1 should remain online after FencedLeaderEpochException") + .isTrue(); + assertThat(ctx.isReplicaOnline(follower2, tb)) + .as("follower2 should remain online after LeaderNotAvailableException") + .isTrue(); + }); + + // Leader dies — both followers are viable candidates. + eventProcessor.getCoordinatorEventManager().put(new DeadTabletServerEvent(leader)); + + // A new leader should be elected from the followers. + retryVerifyContext( + ctx -> { + Optional lai = ctx.getBucketLeaderAndIsr(tb); + assertThat(lai).isPresent(); + assertThat(lai.get().leader()) + .as("One of the followers should be elected as new leader") + .isIn(follower1, follower2); + }); + } + + /** + * Verifies the combined behavior: a follower with a transient error remains eligible for + * election, while a follower with a fatal error is excluded. + * + *

Scenario: ISR = {leader, follower1, follower2}. + * + *

    + *
  1. follower1 returns FencedLeaderEpochException (transient — remains online) + *
  2. follower2 returns StorageException (fatal — marked offline) + *
  3. leader dies + *
  4. follower1 is the ONLY viable candidate → elected as new leader + *
+ */ + @Test + void testMixedErrorsOnlyFatalMarksOffline() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "mixed_error_test"); + long tableId = + createTable( + tablePath, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx(ctx -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + int leader = leaderAndIsr.leader(); + + List followers = + leaderAndIsr.isr().stream().filter(id -> id != leader).collect(Collectors.toList()); + int follower1 = followers.get(0); + int follower2 = followers.get(1); + + // follower1: transient error — remains online. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new FencedLeaderEpochException( + "stale epoch")))), + follower1)); + + // follower2: fatal error — marked offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new org.apache.fluss.exception + .StorageException("disk error")))), + follower2)); + + // Wait for follower2 to be offline. + retryVerifyContext( + ctx -> + assertThat(ctx.isReplicaOnline(follower2, tb)) + .as("follower2 should be offline after StorageException") + .isFalse()); + + // Verify follower1 is still online. + retryVerifyContext( + ctx -> + assertThat(ctx.isReplicaOnline(follower1, tb)) + .as("follower1 should remain online after transient error") + .isTrue()); + + // Leader dies — follower1 is the ONLY viable candidate. + eventProcessor.getCoordinatorEventManager().put(new DeadTabletServerEvent(leader)); + + // follower1 should be elected as new leader. + retryVerifyContext( + ctx -> { + Optional lai = ctx.getBucketLeaderAndIsr(tb); + assertThat(lai).isPresent(); + assertThat(lai.get().leader()) + .as( + "follower1 (server %d) should be elected as new leader " + + "after leader (server %d) died", + follower1, leader) + .isEqualTo(follower1); + }); + } + + /** + * Verifies that when a server with offline-marked replicas dies, the offline markers are + * cleaned up (via removeOfflineBucketInServer in processDeadTabletServer) and the server is + * removed from the live set. + * + *

Note: This test covers the "death" half of the restart cycle. The full restart cycle + * (death → re-registration → replica back online) depends on ZK watcher integration which is + * covered by testRestartTriggerReplicaToOffline. + */ + @Test + void testDeadServerClearsOfflineMarker() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "server_restart_test"); + long tableId = + createTable( + tablePath, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx(ctx -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + int leader = leaderAndIsr.leader(); + + List followers = + leaderAndIsr.isr().stream().filter(id -> id != leader).collect(Collectors.toList()); + int follower1 = followers.get(0); + + // follower1 gets a fatal error — marked offline. + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tb, + ApiError.fromThrowable( + new org.apache.fluss.exception + .StorageException("disk error")))), + follower1)); + + // Verify follower1 is offline. + retryVerifyContext( + ctx -> + assertThat(ctx.isReplicaOnline(follower1, tb)) + .as("follower1 should be offline after StorageException") + .isFalse()); + + // Simulate server death: this should clear offline markers for follower1. + eventProcessor.getCoordinatorEventManager().put(new DeadTabletServerEvent(follower1)); + + // After dead event, follower1 should be removed from live set + // and its offline markers should be cleared. + // Note: we check getOfflineBucketCount() instead of isReplicaOnline() because + // isReplicaOnline() requires the server to be in the live set, which is false + // after death. getOfflineBucketCount() directly verifies removeOfflineBucketInServer + // was called. + retryVerifyContext( + ctx -> { + assertThat(ctx.liveTabletServerSet()) + .as("follower1 should be removed from live set after death") + .doesNotContain(follower1); + assertThat(ctx.getOfflineBucketCount()) + .as( + "offline bucket count should be 0 after server death " + + "clears offline markers") + .isEqualTo(0); + }); + } + private void verifyIsr(TableBucket tb, int expectedLeader, List expectedIsr) throws Exception { LeaderAndIsr leaderAndIsr = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 317dee0a2e..5a85853f54 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -90,6 +90,7 @@ import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.messages.UpdateMetadataResponse; import org.apache.fluss.rpc.protocol.ApiKeys; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.utils.types.Tuple2; @@ -275,7 +276,7 @@ public CompletableFuture notifyLeaderAndIsr( .setTableBucket() .setTableId(pbNotifyLeaderForBucket.getTableBucket().getTableId()) .setBucketId(pbNotifyLeaderForBucket.getTableBucket().getBucketId()); - pbNotifyLeaderRespForBucket.setErrorCode(1); + pbNotifyLeaderRespForBucket.setErrorCode(Errors.STORAGE_EXCEPTION.code()); pbNotifyLeaderRespForBucket.setErrorMessage( "mock notifyLeaderAndIsr fail for test purpose."); bucketsResps.add(pbNotifyLeaderRespForBucket);