From f3efeb882f9999ca182c482803801afe7659f688 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 2 Oct 2023 21:02:29 +0800 Subject: [PATCH] KAFKA-15489: refactor --- .../apache/kafka/raft/KafkaRaftClient.java | 3 -- .../org/apache/kafka/raft/LeaderState.java | 17 +++++------ .../org/apache/kafka/raft/QuorumState.java | 6 ++-- .../kafka/raft/KafkaRaftClientTest.java | 10 +++++++ .../apache/kafka/raft/LeaderStateTest.java | 28 +++++++++---------- 5 files changed, 36 insertions(+), 28 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index da65ff91afc7..73dd3c1e3be8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -966,9 +966,6 @@ private CompletableFuture handleFetchRequest( int replicaId = FetchRequest.replicaId(request); - Optional> state = quorum.maybeLeaderState(); - state.ifPresent(s -> s.maybeResetMajorityFollowerFetchTimeout(replicaId, currentTimeMs)); - FetchResponseData response = tryCompleteFetchRequest(replicaId, fetchPartition, currentTimeMs); FetchResponseData.PartitionData partitionResponse = response.responses().get(0).partitions().get(0); diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index bb37ef64d2c0..143af8182b11 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -60,22 +60,20 @@ public class LeaderState implements EpochState { private final Set fetchedVoters = new HashSet<>(); private final Timer fetchTimer; private final int fetchTimeoutMs; - // The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 - private final int majority; // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; protected LeaderState( + Time time, int localId, int epoch, long epochStartOffset, Set voters, Set grantingVoters, BatchAccumulator accumulator, - LogContext logContext, - Time time, - int fetchTimeoutMs + int fetchTimeoutMs, + LogContext logContext ) { this.localId = localId; this.epoch = epoch; @@ -85,7 +83,6 @@ protected LeaderState( boolean hasAcknowledgedLeader = voterId == localId; this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader)); } - this.majority = voters.size() / 2; this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); @@ -93,7 +90,7 @@ protected LeaderState( this.fetchTimer = time.timer(fetchTimeoutMs); } - public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) { + public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { fetchTimer.update(currentTimeMs); boolean isExpired = fetchTimer.isExpired(); if (isExpired) { @@ -103,8 +100,11 @@ public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) { return isExpired; } - public void maybeResetMajorityFollowerFetchTimeout(int id, long currentTimeMs) { + // visible for testing + void maybeResetMajorityFollowerFetchTimer(int id, long currentTimeMs) { updateFetchedVoters(id); + // The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 + int majority = voterStates.size() / 2; if (fetchedVoters.size() >= majority) { fetchedVoters.clear(); fetchTimer.update(currentTimeMs); @@ -324,6 +324,7 @@ public boolean updateReplicaState( fetchOffsetMetadata, leaderEndOffsetOpt ); + maybeResetMajorityFollowerFetchTimer(replicaId, currentTimeMs); return isVoter(state.nodeId) && maybeUpdateHighWatermark(); } diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index bc68b7ea548f..4cd44be8c8d4 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -453,15 +453,15 @@ public LeaderState transitionToLeader(long epochStartOffset, BatchAccumul // we typically expect the state machine to be caught up anyway. LeaderState state = new LeaderState<>( + time, localIdOrThrow(), epoch(), epochStartOffset, voters, candidateState.grantingVoters(), accumulator, - logContext, - time, - fetchTimeoutMs + fetchTimeoutMs, + logContext ); durableTransitionTo(state); return state; diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 65344b7d82cd..fc82a8e3503b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -515,6 +515,16 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVote assertFalse(context.client.quorum().isResigned()); + // Received fetch request from another voter, the fetch timer should be reset. + context.deliverRequest(context.fetchRequest(epoch, remoteId2, 0, 0, 0)); + context.pollUntilRequest(); + + // Since the fetch timer is reset, the leader should not get resigned + context.time.sleep(context.fetchTimeoutMs / 2); + context.client.poll(); + + assertFalse(context.client.quorum().isResigned()); + // Received fetch request from an observer, but the fetch timer should not be reset. context.deliverRequest(context.fetchRequest(epoch, observerId3, 0, 0, 0)); context.pollUntilRequest(); diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index b72014e37bce..eaeb349580d3 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -54,30 +54,30 @@ private LeaderState newLeaderState( long epochStartOffset ) { return new LeaderState<>( + time, localId, epoch, epochStartOffset, voters, voters, accumulator, - logContext, - time, - fetchTimeoutMs + fetchTimeoutMs, + logContext ); } @Test public void testRequireNonNullAccumulator() { assertThrows(NullPointerException.class, () -> new LeaderState<>( + new MockTime(), localId, epoch, 0, Collections.emptySet(), Collections.emptySet(), null, - logContext, - new MockTime(), - fetchTimeoutMs + fetchTimeoutMs, + logContext )); } @@ -460,27 +460,27 @@ public void testMajorityFollowerFetchTimeoutExpiration() { int node4 = 4; int observer5 = 5; LeaderState state = newLeaderState(mkSet(localId, node1, node2, node3, node4), 0L); - assertFalse(state.hasMajorityFollowerFetchTimeoutExpired(time.milliseconds())); + assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds())); // fetch timeout not exceeded, should not expire the timer time.sleep(fetchTimeoutMs / 2); - assertFalse(state.hasMajorityFollowerFetchTimeoutExpired(time.milliseconds())); + assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds())); // received fetch requests from 2 voter nodes, the timer should be reset - state.maybeResetMajorityFollowerFetchTimeout(node1, time.milliseconds()); - state.maybeResetMajorityFollowerFetchTimeout(node2, time.milliseconds()); + state.maybeResetMajorityFollowerFetchTimer(node1, time.milliseconds()); + state.maybeResetMajorityFollowerFetchTimer(node2, time.milliseconds()); // Since the timer was reset, it won't expire this time. time.sleep(fetchTimeoutMs / 2); - assertFalse(state.hasMajorityFollowerFetchTimeoutExpired(time.milliseconds())); + assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds())); // received fetch requests from 1 voter and 1 observer nodes, the timer should not be reset. - state.maybeResetMajorityFollowerFetchTimeout(node3, time.milliseconds()); - state.maybeResetMajorityFollowerFetchTimeout(observer5, time.milliseconds()); + state.maybeResetMajorityFollowerFetchTimer(node3, time.milliseconds()); + state.maybeResetMajorityFollowerFetchTimer(observer5, time.milliseconds()); // This time, the fetch timer will be expired time.sleep(fetchTimeoutMs / 2); - assertTrue(state.hasMajorityFollowerFetchTimeoutExpired(time.milliseconds())); + assertTrue(state.hasMajorityFollowerFetchExpired(time.milliseconds())); } @Test