From 420d3317f01f7e6fc4e17ae84ac0762995043c84 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 15 Oct 2024 01:24:01 +0800 Subject: [PATCH 01/33] Add new field `CommittedVoters` in DescribeQuorumResponse.json --- .../main/resources/common/message/DescribeQuorumResponse.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json index e0be61781f5fc..53295c89a4361 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json +++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json @@ -19,7 +19,8 @@ "name": "DescribeQuorumResponse", // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836). // Version 2 adds ErrorMessage, Nodes, ErrorMessage in ParitionData, ReplicaDirectoryId in ReplicaState (KIP-853). - "validVersions": "0-2", + // Version 3 adds CommittedVoters in PartitionData + "validVersions": "0-3", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", @@ -43,6 +44,7 @@ "about": "The latest known leader epoch"}, { "name": "HighWatermark", "type": "int64", "versions": "0+"}, { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" }, + { "name": "CommittedVoters", "type": "[]ReplicaState", "versions": "3+", "nullableVersions": "3+", "ignorable": true }, { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" } ]} ]}, From 218bf77945a664e4edc9a7ab89f5f7cc3bdec11f Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 15 Oct 2024 01:26:34 +0800 Subject: [PATCH 02/33] feat(client/admin): Add committed voters to QuorumInfo - Introduce committedVoters field in QuorumInfo class - Update KafkaAdminClient to populate committedVoters from partition info - Add getter method for committedVoters in QuorumInfo --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 5 +++++ .../java/org/apache/kafka/clients/admin/QuorumInfo.java | 9 ++++++++- .../org/apache/kafka/tools/MetadataQuorumCommand.java | 8 ++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 269ffd1099b32..d88b7e31f872e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4618,6 +4618,10 @@ private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.Partition .map(this::translateReplicaState) .collect(Collectors.toList()); + List committedVoters = partition.committedVoters().stream() + .map(this::translateReplicaState) + .collect(Collectors.toList()); + List observers = partition.observers().stream() .map(this::translateReplicaState) .collect(Collectors.toList()); @@ -4635,6 +4639,7 @@ private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.Partition partition.leaderEpoch(), partition.highWatermark(), voters, + committedVoters, observers, nodes ); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java index 5264b6f6aae7a..74b835f4b242c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java @@ -31,6 +31,7 @@ public class QuorumInfo { private final long leaderEpoch; private final long highWatermark; private final List voters; + private final List committedVoters; private final List observers; private final Map nodes; @@ -39,13 +40,15 @@ public class QuorumInfo { long leaderEpoch, long highWatermark, List voters, + List committedVoters, List observers, Map nodes ) { this.leaderId = leaderId; - this.leaderEpoch = leaderEpoch; + this.leaderEpoch = leaderEpoch; this.highWatermark = highWatermark; this.voters = voters; + this.committedVoters = committedVoters; this.observers = observers; this.nodes = nodes; } @@ -66,6 +69,10 @@ public List voters() { return voters; } + public List committedVoters() { + return committedVoters; + } + public List observers() { return observers; } diff --git a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java index b2f6d9f847044..da1f95233f4d8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java @@ -271,6 +271,7 @@ else if (leader.lastCaughtUpTimestamp().isPresent() && maxLagFollower.lastCaught "\nMaxFollowerLag: " + maxFollowerLag + "\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs + "\nCurrentVoters: " + printVoterState(quorumInfo) + + "\nCommittedVoters: " + printCommittedVoterState(quorumInfo) + "\nCurrentObservers: " + printObserverState(quorumInfo) ); } @@ -281,7 +282,14 @@ private static String printVoterState(QuorumInfo quorumInfo) { return printReplicaState(quorumInfo, quorumInfo.voters()); } + // Constructs the CommittedVoters string + // CommittedVoters: [{"id": 0, "directoryId": "UUID1", "endpoints": ["C://controller-0:1234"]}] + private static String printCommittedVoterState(QuorumInfo quorumInfo) { + return printReplicaState(quorumInfo, quorumInfo.committedVoters()); + } + // Constructs the CurrentObservers string + // CurrentObservers: [{"id": 0, "directoryId": "UUID1", "endpoints": ["C://controller-0:1234"]}] private static String printObserverState(QuorumInfo quorumInfo) { return printReplicaState(quorumInfo, quorumInfo.observers()); } From c0cc6a38d100c3fcf9946dbd93a2abdd46cec84b Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 15 Oct 2024 01:32:52 +0800 Subject: [PATCH 03/33] refactor(raft): Rename voterStates to currentVoterStates in LeaderState - Rename `voterStates` to `currentVoterStates` for clarity - Update all references to use the new name - Add `currentVoterStates()` getter method - Remove unused `voterStates()` method --- .../org/apache/kafka/raft/LeaderState.java | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index c09282c87c9ea..7ae8a5caadce2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -69,7 +69,7 @@ public class LeaderState implements EpochState { private final KRaftVersion kraftVersionAtEpochStart; private Optional highWatermark = Optional.empty(); - private Map voterStates = new HashMap<>(); + private Optional currentVoterOffset = Optional.empty(); private Optional addVoterHandlerState = Optional.empty(); private Optional removeVoterHandlerState = Optional.empty(); @@ -107,7 +107,7 @@ protected LeaderState( for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) { boolean hasAcknowledgedLeader = voterNode.isVoter(localReplicaKey); - this.voterStates.put( + this.currentVoterStates.put( voterNode.voterKey().id(), new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()) ); @@ -146,7 +146,7 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) { */ public long timeUntilCheckQuorumExpires(long currentTimeMs) { // if there's only 1 voter, it should never get expired. - if (voterStates.size() == 1) { + if (currentVoterStates.size() == 1) { return Long.MAX_VALUE; } checkQuorumTimer.update(currentTimeMs); @@ -157,7 +157,7 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { "Current fetched voters are {}, and voters are {}", checkQuorumTimeoutMs, fetchedVoters, - voterStates.values().stream().map(voter -> voter.replicaKey) + currentVoterStates.values().stream().map(voter -> voter.replicaKey) ); } return remainingMs; @@ -172,12 +172,12 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { public void updateCheckQuorumForFollowingVoter(ReplicaKey replicaKey, long currentTimeMs) { updateFetchedVoters(replicaKey); // The majority number of the voters. Ex: 2 for 3 voters, 3 for 4 voters... etc. - int majority = (voterStates.size() / 2) + 1; + int majority = (currentVoterStates.size() / 2) + 1; // If the leader is in the voter set, it should be implicitly counted as part of the // majority, but the leader will never be a member of the fetchedVoters. // If the leader is not in the voter set, it is not in the majority. Then, the // majority can only be composed of fetched voters. - if (voterStates.containsKey(localReplicaKey.id())) { + if (currentVoterStates.containsKey(localReplicaKey.id())) { majority = majority - 1; } @@ -193,7 +193,7 @@ private void updateFetchedVoters(ReplicaKey replicaKey) { throw new IllegalArgumentException("Received a FETCH/FETCH_SNAPSHOT request from the leader itself."); } - ReplicaState state = voterStates.get(replicaKey.id()); + ReplicaState state = currentVoterStates.get(replicaKey.id()); if (state != null && state.matchesKey(replicaKey)) { fetchedVoters.add(replicaKey.id()); } @@ -296,7 +296,7 @@ public void appendStartOfEpochControlRecords(VoterSet.VoterNode localVoterNode, ); } - List voters = convertToVoters(voterStates.keySet()); + List voters = convertToVoters(currentVoterStates.keySet()); List grantingVoters = convertToVoters(this.grantingVoters()); LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() @@ -407,7 +407,7 @@ public Optional highWatermark() { @Override public ElectionState election() { - return ElectionState.withElectedLeader(epoch, localReplicaKey.id(), voterStates.keySet()); + return ElectionState.withElectedLeader(epoch, localReplicaKey.id(), currentVoterStates.keySet()); } @Override @@ -420,8 +420,9 @@ public Endpoints leaderEndpoints() { return localListeners; } - Map voterStates() { - return voterStates; + Map currentVoterStates() { + return currentVoterStates; + } } Map observerStates(final long currentTimeMs) { @@ -436,7 +437,7 @@ public Set grantingVoters() { // visible for testing Set nonAcknowledgingVoters() { Set nonAcknowledging = new HashSet<>(); - for (ReplicaState state : voterStates.values()) { + for (ReplicaState state : currentVoterStates.values()) { if (!state.hasAcknowledgedLeader) { nonAcknowledging.add(state.replicaKey); } @@ -449,7 +450,7 @@ private boolean maybeUpdateHighWatermark() { ArrayList followersByDescendingFetchOffset = followersByDescendingFetchOffset() .collect(Collectors.toCollection(ArrayList::new)); - int indexOfHw = voterStates.size() / 2; + int indexOfHw = currentVoterStates.size() / 2; Optional highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset; if (highWatermarkUpdateOpt.isPresent()) { @@ -484,7 +485,7 @@ private boolean maybeUpdateHighWatermark() { "value {}, which should only happen when voter set membership changes. If the voter " + "set has not changed this suggests that one of the voters has lost committed data. " + "Full voter replication state: {}", highWatermarkUpdateOffset, - currentHighWatermarkMetadata.offset(), voterStates.values()); + currentHighWatermarkMetadata.offset(), currentVoterStates.values()); return false; } else { return false; @@ -607,7 +608,7 @@ public List nonLeaderVotersByDescendingFetchOffset() { } private Stream followersByDescendingFetchOffset() { - return voterStates + return currentVoterStates .values() .stream() .sorted(); @@ -631,7 +632,7 @@ public long epochStartOffset() { } private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { - ReplicaState state = voterStates.get(replicaKey.id()); + ReplicaState state = currentVoterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty())); return observerStates.get(replicaKey); @@ -640,7 +641,7 @@ private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { } public Optional getReplicaState(ReplicaKey replicaKey) { - ReplicaState state = voterStates.get(replicaKey.id()); + ReplicaState state = currentVoterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { state = observerStates.get(replicaKey); } @@ -659,7 +660,7 @@ private void clearInactiveObservers(final long currentTimeMs) { } private boolean isVoter(ReplicaKey remoteReplicaKey) { - ReplicaState state = voterStates.get(remoteReplicaKey.id()); + ReplicaState state = currentVoterStates.get(remoteReplicaKey.id()); return state != null && state.matchesKey(remoteReplicaKey); } @@ -683,7 +684,7 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) { state.updateListeners(voterNode.listeners()); newVoterStates.put(state.replicaKey.id(), state); } - voterStates = newVoterStates; + currentVoterStates = newVoterStates; // Move any of the remaining old voters to observerStates for (ReplicaState replicaStateEntry : oldVoterStates.values()) { @@ -849,7 +850,7 @@ public String toString() { epoch, epochStartOffset, highWatermark, - voterStates + currentVoterStates ); } From aa605683765f9aa5f99a14bda6dc80c04fb3e495 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 15 Oct 2024 01:33:26 +0800 Subject: [PATCH 04/33] feat(raft): Add lastVoterSetWithOffset method to KRaftControlRecordStateMachine - Introduce new method `lastVoterSetWithOffset()` in KRaftControlRecordStateMachine - Return a Map.Entry containing the last VoterSet and its corresponding offset - Synchronize access to voterSetHistory for thread safety --- .../internals/KRaftControlRecordStateMachine.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index c1d4a0b2f2dbb..266f677e6f1d2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; @@ -142,6 +143,17 @@ public VoterSet lastVoterSet() { } } + /** + * Returns the last voter set with its offset. + */ + public Map.Entry lastVoterSetWithOffset() { + synchronized (voterSetHistory) { + VoterSet voters = voterSetHistory.lastValue(); + Long offset = voterSetHistory.lastVoterSetOffset().orElse(0L); + return Map.entry(voters, offset); + } + } + /** * Return the latest entry for the set of voters. */ From 33ed01b329b28e4861dc421a074619ce82890822 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 15 Oct 2024 01:34:01 +0800 Subject: [PATCH 05/33] feat(raft): implement committed voters in LeaderState and DescribeQuorum API - Add committedVoterStates to LeaderState to track committed voters - Update LeaderState to manage both current and committed voter states - Modify KafkaRaftClient to use new lastVoterSetWithOffset method - Extend RaftUtil.buildDescribeQuorumResponse to include committed voters - Update DescribeQuorum API to support committed voters (version 3) --- .../apache/kafka/raft/KafkaRaftClient.java | 9 ++++-- .../org/apache/kafka/raft/LeaderState.java | 28 +++++++++++++++---- .../java/org/apache/kafka/raft/RaftUtil.java | 6 ++++ 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 51aa5e59f2f4e..2930b5d6b74b1 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -352,7 +352,11 @@ private void updateLeaderEndOffsetAndTimestamp( ) { final LogOffsetMetadata endOffsetMetadata = log.endOffset(); - if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet())) { + Map.Entry lastVoterSetWithOffset = partitionState.lastVoterSetWithOffset(); + VoterSet lastVoterSet = lastVoterSetWithOffset.getKey(); + Long lastVoterSetOffset = lastVoterSetWithOffset.getValue(); + + if (state.updateLocalState(endOffsetMetadata, lastVoterSet, lastVoterSetOffset)) { onUpdateLeaderHighWatermark(state, currentTimeMs); } @@ -1751,7 +1755,8 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest( quorum.localIdOrThrow(), leaderState.epoch(), leaderState.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L), - leaderState.voterStates().values(), + leaderState.currentVoterStates().values(), + leaderState.committedVoterStates().values(), leaderState.observerStates(currentTimeMs).values(), currentTimeMs ); diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 7ae8a5caadce2..d7fba838b13a3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -70,6 +70,8 @@ public class LeaderState implements EpochState { private Optional highWatermark = Optional.empty(); private Optional currentVoterOffset = Optional.empty(); + private Map currentVoterStates = new HashMap<>(); + private Map committedVoterStates = new HashMap<>(); private Optional addVoterHandlerState = Optional.empty(); private Optional removeVoterHandlerState = Optional.empty(); @@ -111,6 +113,10 @@ protected LeaderState( voterNode.voterKey().id(), new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()) ); + this.committedVoterStates.put( + voterNode.voterKey().id(), + new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()) + ); } this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); @@ -423,6 +429,9 @@ public Endpoints leaderEndpoints() { Map currentVoterStates() { return currentVoterStates; } + + Map committedVoterStates() { + return committedVoterStates; } Map observerStates(final long currentTimeMs) { @@ -473,6 +482,13 @@ private boolean maybeUpdateHighWatermark() { !highWatermarkUpdateMetadata.metadata().equals(currentHighWatermarkMetadata.metadata()))) { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; + + // If the new high watermark is greater than the current voter offset, + // then the current voter set is committed. + if (currentVoterOffset.isPresent() && currentVoterOffset.get() <= highWatermarkUpdateOffset) { + committedVoterStates = currentVoterStates; + } + logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -540,7 +556,8 @@ private void logHighWatermarkUpdate( */ public boolean updateLocalState( LogOffsetMetadata endOffsetMetadata, - VoterSet lastVoterSet + VoterSet lastVoterSet, + long lastVoterSetOffset ) { ReplicaState state = getOrCreateReplicaState(localReplicaKey); state.endOffset.ifPresent(currentEndOffset -> { @@ -551,7 +568,7 @@ public boolean updateLocalState( }); state.updateLeaderEndOffset(endOffsetMetadata); - updateVoterAndObserverStates(lastVoterSet); + updateVoterAndObserverStates(lastVoterSet, lastVoterSetOffset); return maybeUpdateHighWatermark(); } @@ -620,7 +637,7 @@ public void addAcknowledgementFrom(int remoteNodeId) { } private ReplicaState ensureValidVoter(int remoteNodeId) { - ReplicaState state = voterStates.get(remoteNodeId); + ReplicaState state = currentVoterStates.get(remoteNodeId); if (state == null) { throw new IllegalArgumentException("Unexpected acknowledgement from non-voter " + remoteNodeId); } @@ -664,9 +681,9 @@ private boolean isVoter(ReplicaKey remoteReplicaKey) { return state != null && state.matchesKey(remoteReplicaKey); } - private void updateVoterAndObserverStates(VoterSet lastVoterSet) { + private void updateVoterAndObserverStates(VoterSet lastVoterSet, Long lastVoterSetOffset) { Map newVoterStates = new HashMap<>(); - Map oldVoterStates = new HashMap<>(voterStates); + Map oldVoterStates = new HashMap<>(currentVoterStates); // Compute the new voter states map for (VoterSet.VoterNode voterNode : lastVoterSet.voterNodes()) { @@ -685,6 +702,7 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) { newVoterStates.put(state.replicaKey.id(), state); } currentVoterStates = newVoterStates; + currentVoterOffset = Optional.ofNullable(lastVoterSetOffset); // Move any of the remaining old voters to observerStates for (ReplicaState replicaStateEntry : oldVoterStates.values()) { diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index 018bec0d632ad..3a268f6079dbd 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -493,9 +493,14 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse( int leaderEpoch, long highWatermark, Collection voters, + Collection committedVoters, Collection observers, long currentTimeMs ) { + if (apiVersion < 3) { + committedVoters = null; + } + DescribeQuorumResponseData response = new DescribeQuorumResponseData() .setTopics( Collections.singletonList( @@ -510,6 +515,7 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse( .setLeaderEpoch(leaderEpoch) .setHighWatermark(highWatermark) .setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs)) + .setCommittedVoters(toReplicaStates(apiVersion, leaderId, committedVoters, currentTimeMs)) .setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs)))))); if (apiVersion >= 2) { DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(voters.size()); From e975174137b2048a510edde7592b0ef1389e18a4 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 12 Aug 2025 15:40:41 +0800 Subject: [PATCH 06/33] fix build --- .../main/resources/common/message/DescribeQuorumResponse.json | 3 ++- raft/src/main/java/org/apache/kafka/raft/RaftUtil.java | 4 ---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json index 75ce8e52024ce..e97a84d162daa 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json +++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json @@ -47,7 +47,8 @@ "about": "The high water mark."}, { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+", "about": "The current voters of the partition."}, - { "name": "CommittedVoters", "type": "[]ReplicaState", "versions": "3+", "nullableVersions": "3+", "ignorable": true, + { "name": "CommittedVoters", "type": "[]ReplicaState", "versions": "3+", + "nullableVersions": "3+", "ignorable": true, "default" : "null", "about": "The voters has been committed."}, { "name": "Observers", "type": "[]ReplicaState", "versions": "0+", "about": "The observers of the partition."} diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index e4cd6ccb94046..2f1e22bfd370a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -492,10 +492,6 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse( Collection observers, long currentTimeMs ) { - if (apiVersion < 3) { - committedVoters = null; - } - DescribeQuorumResponseData response = new DescribeQuorumResponseData() .setTopics( List.of( From 488d27dd2385f11f4352526c9a46490a90728db4 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 12 Aug 2025 16:26:30 +0800 Subject: [PATCH 07/33] fix test --- .../kafka/clients/admin/QuorumInfo.java | 2 +- .../message/DescribeQuorumResponse.json | 3 +- .../kafka/raft/KafkaRaftClientTest.java | 26 ++++++++-------- .../kafka/raft/RaftClientTestContext.java | 30 ++++++++++++++----- 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java index 74b835f4b242c..3a898737c252e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java @@ -45,7 +45,7 @@ public class QuorumInfo { Map nodes ) { this.leaderId = leaderId; - this.leaderEpoch = leaderEpoch; + this.leaderEpoch = leaderEpoch; this.highWatermark = highWatermark; this.voters = voters; this.committedVoters = committedVoters; diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json index e97a84d162daa..f8cd08a4e84dc 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json +++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json @@ -47,8 +47,7 @@ "about": "The high water mark."}, { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+", "about": "The current voters of the partition."}, - { "name": "CommittedVoters", "type": "[]ReplicaState", "versions": "3+", - "nullableVersions": "3+", "ignorable": true, "default" : "null", + { "name": "CommittedVoters", "type": "[]ReplicaState", "versions": "3+", "ignorable": true, "about": "The voters has been committed."}, { "name": "Observers", "type": "[]ReplicaState", "versions": "0+", "about": "The observers of the partition."} diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 4687fd3d90376..16523fccaf40b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -3265,7 +3265,7 @@ public void testDescribeQuorumWithOnlyStaticVoters(boolean withKip853Rpc) throws .setLogEndOffset(-1L) .setLastFetchTimestamp(-1) .setLastCaughtUpTimestamp(-1)); - context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, expectedVoterStates, List.of()); } @ParameterizedTest @@ -3322,7 +3322,7 @@ public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean withB .setLogEndOffset(-1L) .setLastFetchTimestamp(-1) .setLastCaughtUpTimestamp(-1)); - context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, expectedVoterStates, List.of()); context.time.sleep(100); long fetchOffset = withBootstrapSnapshot ? 3L : 1L; @@ -3350,7 +3350,7 @@ public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean withB .setLogEndOffset(fetchOffset) .setLastFetchTimestamp(followerFetchTime1) .setLastCaughtUpTimestamp(followerFetchTime1); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); // After follower2 catches up to leader context.time.sleep(100); @@ -3371,13 +3371,13 @@ public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean withB .setLogEndOffset(nextFetchOffset) .setLastFetchTimestamp(followerFetchTime2) .setLastCaughtUpTimestamp(followerFetchTime2); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); // Describe quorum returns error if leader loses leadership context.time.sleep(context.checkQuorumTimeoutMs); context.deliverRequest(context.describeQuorumRequest()); context.pollUntilResponse(); - context.assertSentDescribeQuorumResponse(Errors.NOT_LEADER_OR_FOLLOWER, 0, 0, 0, List.of(), List.of()); + context.assertSentDescribeQuorumResponse(Errors.NOT_LEADER_OR_FOLLOWER, 0, 0, 0, List.of(), List.of(), List.of()); } @ParameterizedTest @@ -3450,7 +3450,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo .setLogEndOffset(0L) .setLastFetchTimestamp(observerFetchTime) .setLastCaughtUpTimestamp(-1L)); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, expectedObserverStates); // Update observer fetch state context.time.sleep(100); @@ -3470,7 +3470,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo .setLogEndOffset(fetchOffset) .setLastFetchTimestamp(observerFetchTime) .setLastCaughtUpTimestamp(observerFetchTime); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, expectedObserverStates); // Observer falls behind context.time.sleep(100); @@ -3486,7 +3486,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo .setLogEndOffset(fetchOffset + records.size()) .setLastFetchTimestamp(context.time.milliseconds()) .setLastCaughtUpTimestamp(context.time.milliseconds()); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, expectedObserverStates); // Observer is removed due to inactivity long timeToSleep = LeaderState.OBSERVER_SESSION_TIMEOUT_MS; @@ -3508,7 +3508,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo .setLastCaughtUpTimestamp(context.time.milliseconds()); expectedVoterStates.get(1) .setLastFetchTimestamp(followerFetchTime); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); // No-op for negative node id context.deliverRequest(context.fetchRequest(epoch, ReplicaKey.of(-1, ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0)); @@ -3520,7 +3520,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo expectedVoterStates.get(0) .setLastFetchTimestamp(context.time.milliseconds()) .setLastCaughtUpTimestamp(context.time.milliseconds()); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); } @ParameterizedTest @@ -3577,7 +3577,7 @@ public void testDescribeQuorumNonMonotonicFollowerFetch(boolean withKip853Rpc, b .setLogEndOffset(fetchOffset) .setLastFetchTimestamp(followerFetchTime) .setLastCaughtUpTimestamp(followerFetchTime)); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); // Follower crashes and disk is lost. It fetches an earlier offset to rebuild state. // The leader will report an error in the logs, but will not let the high watermark rewind @@ -3596,7 +3596,7 @@ public void testDescribeQuorumNonMonotonicFollowerFetch(boolean withKip853Rpc, b expectedVoterStates.get(1) .setLogEndOffset(fetchOffset - batch.size()) .setLastFetchTimestamp(followerFetchTime); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); } @ParameterizedTest @@ -3640,7 +3640,7 @@ public void testStaticVotersIgnoredWithBootstrapSnapshot(boolean withKip853Rpc) .setLogEndOffset(-1L) .setLastFetchTimestamp(-1) .setLastCaughtUpTimestamp(-1)); - context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, expectedVoterStates, List.of()); } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 970f442c0040b..d081cef126550 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -748,10 +748,12 @@ void assertSentDescribeQuorumResponse( int leaderId, int leaderEpoch, long highWatermark, - List voterStates, + List currentVoterStates, + List committedVoterStates, List observerStates ) { - assertSentDescribeQuorumResponse(Errors.NONE, leaderId, leaderEpoch, highWatermark, voterStates, observerStates); + assertSentDescribeQuorumResponse(Errors.NONE, leaderId, leaderEpoch, highWatermark, + currentVoterStates, committedVoterStates, observerStates); } void assertSentDescribeQuorumResponse( @@ -759,7 +761,8 @@ void assertSentDescribeQuorumResponse( int leaderId, int leaderEpoch, long highWatermark, - List voterStates, + List currentVoterStates, + List committedVoterStates, List observerStates ) { DescribeQuorumResponseData response = collectDescribeQuorumResponse(); @@ -769,7 +772,8 @@ void assertSentDescribeQuorumResponse( .setLeaderId(leaderId) .setLeaderEpoch(leaderEpoch) .setHighWatermark(highWatermark) - .setCurrentVoters(voterStates) + .setCurrentVoters(currentVoterStates) + .setCommittedVoters(committedVoterStates) .setObservers(observerStates); if (!error.equals(Errors.NONE)) { @@ -778,8 +782,8 @@ void assertSentDescribeQuorumResponse( DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(0); if (describeQuorumRpcVersion() >= 2) { - nodes = new DescribeQuorumResponseData.NodeCollection(voterStates.size()); - for (ReplicaState voterState : voterStates) { + nodes = new DescribeQuorumResponseData.NodeCollection(currentVoterStates.size()); + for (ReplicaState voterState : currentVoterStates) { nodes.add(new DescribeQuorumResponseData.Node() .setNodeId(voterState.replicaId()) .setListeners(startingVoters.listeners(voterState.replicaId()).toDescribeQuorumResponseListeners())); @@ -792,7 +796,7 @@ void assertSentDescribeQuorumResponse( nodes ); - List sortedVoters = response + List sortedCuttentVoters = response .topics() .get(0) .partitions() @@ -801,7 +805,17 @@ void assertSentDescribeQuorumResponse( .stream() .sorted(Comparator.comparingInt(ReplicaState::replicaId)) .collect(Collectors.toList()); - response.topics().get(0).partitions().get(0).setCurrentVoters(sortedVoters); + List sortedCommittedVoters = response + .topics() + .get(0) + .partitions() + .get(0) + .committedVoters() + .stream() + .sorted(Comparator.comparingInt(ReplicaState::replicaId)) + .collect(Collectors.toList()); + response.topics().get(0).partitions().get(0).setCurrentVoters(sortedCuttentVoters); + response.topics().get(0).partitions().get(0).setCommittedVoters(sortedCommittedVoters); response.nodes().sort(Comparator.comparingInt(DescribeQuorumResponseData.Node::nodeId)); assertEquals(expectedResponse, response); From cf8f2324f21735b856e835cd5e658dd8cbf6aa81 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 13 Aug 2025 09:42:35 +0800 Subject: [PATCH 08/33] fix some issues --- .../org/apache/kafka/raft/LeaderState.java | 19 +++++++++++++------ .../KRaftControlRecordStateMachine.java | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 07d9f86f434e6..ccf2262d2558d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -740,12 +740,7 @@ private boolean maybeUpdateHighWatermark() { !highWatermarkUpdateMetadata.metadata().equals(currentHighWatermarkMetadata.metadata()))) { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; - - // If the new high watermark is greater than the current voter offset, - // then the current voter set is committed. - if (currentVoterOffset.isPresent() && currentVoterOffset.getAsLong() <= highWatermarkUpdateOffset) { - committedVoterStates = currentVoterStates; - } + maybeUpdateCommittedVoters(highWatermarkUpdateOffset); logHighWatermarkUpdate( oldHighWatermark, @@ -767,6 +762,7 @@ private boolean maybeUpdateHighWatermark() { } else { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; + maybeUpdateCommittedVoters(highWatermarkUpdateOffset); logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -883,6 +879,17 @@ public List nonLeaderVotersByDescendingFetchOffset() { .collect(Collectors.toList()); } + /** + * If the new high watermark is greater than the current voter offset, + * then the current voter set is committed. + * @param highWatermarkUpdateOffset the update-to-day high watermark offset + */ + private void maybeUpdateCommittedVoters(long highWatermarkUpdateOffset) { + if (currentVoterOffset.isPresent() && currentVoterOffset.getAsLong() <= highWatermarkUpdateOffset) { + committedVoterStates = currentVoterStates; + } + } + private Stream followersByDescendingFetchOffset() { return currentVoterStates .values() diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index c0dfb90a18301..326df274e763c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -167,7 +167,7 @@ public VoterSet lastVoterSet() { public Map.Entry lastVoterSetWithOffset() { synchronized (voterSetHistory) { VoterSet voters = voterSetHistory.lastValue(); - Long offset = voterSetHistory.lastVoterSetOffset().orElse(0L); + Long offset = voterSetHistory.lastVoterSetOffset().orElse(SMALLEST_LOG_OFFSET); return Map.entry(voters, offset); } } From 55dabac9d6748571323e56481eb0e007388d9852 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 13 Aug 2025 13:41:26 +0800 Subject: [PATCH 09/33] fix /MetadataQuorumCommandTest --- .../apache/kafka/tools/MetadataQuorumCommandTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java index 621943f695ab9..8b8e92a46f5cb 100644 --- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java @@ -119,13 +119,18 @@ private void testDescribeQuorumStatusSuccessful(ClusterInstance cluster, boolean "\"endpoints\":\\s+\\[\"\\S+://\\[?\\S+]?:\\d+\",?.*]"), describeOutput ); + assertTrue( + outputs[7].matches("CommittedVoters:\\s+\\[\\{\"id\":\\s+\\d+,\\s+" + + "\"endpoints\":\\s+\\[\"\\S+://\\[?\\S+]?:\\d+\",?.*]"), + describeOutput + ); // There are no observers if we have fewer brokers than controllers if (cluster.type() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers()) { - assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[]"), describeOutput); + assertTrue(outputs[8].matches("CurrentObservers:\\s+\\[]"), describeOutput); } else { assertTrue( - outputs[7].matches("CurrentObservers:\\s+\\[\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+}" + + outputs[8].matches("CurrentObservers:\\s+\\[\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+}" + "(,\\s+\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+})*]"), describeOutput ); From 3f9fb3135e1ef169f654960cc914e9d53a4b02d1 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 14 Aug 2025 09:39:52 +0800 Subject: [PATCH 10/33] bump DescribeQuorumRequest version --- .../main/resources/common/message/DescribeQuorumRequest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/resources/common/message/DescribeQuorumRequest.json b/clients/src/main/resources/common/message/DescribeQuorumRequest.json index 7b9ee5a2328ec..fa4e641fe229a 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumRequest.json +++ b/clients/src/main/resources/common/message/DescribeQuorumRequest.json @@ -20,7 +20,7 @@ "name": "DescribeQuorumRequest", // Version 1 adds additional fields in the response. The request is unchanged (KIP-836). // Version 2 adds additional fields in the response. The request is unchanged (KIP-853). - "validVersions": "0-2", + "validVersions": "0-3", "flexibleVersions": "0+", "latestVersionUnstable": false, "fields": [ From 91da1a17c6e5ce47ed4e7abfdcbf83347f4334d9 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 14 Aug 2025 13:28:17 +0800 Subject: [PATCH 11/33] fix majority --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index ccf2262d2558d..4fc77cffdb57c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -717,7 +717,7 @@ private boolean maybeUpdateHighWatermark() { ArrayList followersByDescendingFetchOffset = followersByDescendingFetchOffset() .collect(Collectors.toCollection(ArrayList::new)); - int indexOfHw = currentVoterStates.size() / 2; + int indexOfHw = (currentVoterStates.size() / 2) + 1; Optional highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset; if (highWatermarkUpdateOpt.isPresent()) { From 406d562b59b31be7cda1c9d8bac30321567186df Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 15 Aug 2025 13:09:30 +0800 Subject: [PATCH 12/33] Revert "fix majority" This reverts commit 91da1a17c6e5ce47ed4e7abfdcbf83347f4334d9. --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 4fc77cffdb57c..ccf2262d2558d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -717,7 +717,7 @@ private boolean maybeUpdateHighWatermark() { ArrayList followersByDescendingFetchOffset = followersByDescendingFetchOffset() .collect(Collectors.toCollection(ArrayList::new)); - int indexOfHw = (currentVoterStates.size() / 2) + 1; + int indexOfHw = currentVoterStates.size() / 2; Optional highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset; if (highWatermarkUpdateOpt.isPresent()) { From 4096c900690336e90f8e5e5e6d2d20825621597e Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 14 Oct 2025 18:07:01 +0800 Subject: [PATCH 13/33] finish --- .../org/apache/kafka/raft/LeaderState.java | 33 +++++++++++++++++-- .../KRaftControlRecordStateMachine.java | 26 +++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 7c379275a0e50..2b1571b3ba26d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -32,6 +32,7 @@ import org.apache.kafka.raft.errors.NotLeaderException; import org.apache.kafka.raft.internals.AddVoterHandlerState; import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine; import org.apache.kafka.raft.internals.KRaftVersionUpgrade; import org.apache.kafka.raft.internals.KafkaRaftMetrics; import org.apache.kafka.raft.internals.RemoveVoterHandlerState; @@ -72,9 +73,11 @@ public class LeaderState implements EpochState { // This field is non-empty if the voter set at epoch start came from a snapshot or log segment private final OptionalLong offsetOfVotersAtEpochStart; private final KRaftVersion kraftVersionAtEpochStart; + private final KRaftControlRecordStateMachine partitionState; private Optional highWatermark = Optional.empty(); private Map voterStates = new HashMap<>(); + private Map committedVoterStates = new HashMap<>(); private Optional addVoterHandlerState = Optional.empty(); private Optional removeVoterHandlerState = Optional.empty(); @@ -120,7 +123,8 @@ protected LeaderState( BatchAccumulator accumulator, int fetchTimeoutMs, LogContext logContext, - KafkaRaftMetrics kafkaRaftMetrics + KafkaRaftMetrics kafkaRaftMetrics, + KRaftControlRecordStateMachine partitionState ) { if (localVoterNode.voterKey().directoryId().isEmpty()) { throw new IllegalArgumentException( @@ -158,6 +162,8 @@ protected LeaderState( this.voterSetAtEpochStart = voterSetAtEpochStart; this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart; this.kraftVersionAtEpochStart = kraftVersionAtEpochStart; + this.partitionState = partitionState; + offsetOfVotersAtEpochStart.ifPresent(this::updateCommittedVoter); kafkaRaftMetrics.addLeaderMetrics(); this.kafkaRaftMetrics = kafkaRaftMetrics; @@ -735,6 +741,7 @@ private boolean maybeUpdateHighWatermark() { indexOfHw, followersByDescendingFetchOffset ); + updateCommittedVoter(highWatermark.get().offset()); return true; } else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset()) { log.info("The latest computed high watermark {} is smaller than the current " + @@ -755,6 +762,7 @@ private boolean maybeUpdateHighWatermark() { indexOfHw, followersByDescendingFetchOffset ); + highWatermark.ifPresent(logOffsetMetadata -> updateCommittedVoter(logOffsetMetadata.offset())); return true; } } @@ -863,6 +871,24 @@ public List nonLeaderVotersByDescendingFetchOffset() { .collect(Collectors.toList()); } + private void updateCommittedVoter(long highWatermark) { + Optional voters = partitionState.voterSetAtOffset(highWatermark); + if (voters.isPresent()) { + // if voters are present in partitionState, we read it from memory + for (VoterSet.VoterNode voterNode : voters.get().voterNodes()) { + ReplicaKey key = voterNode.voterKey(); + committedVoterStates.put(voterNode.voterKey().id(), getOrCreateReplicaState(key)); + } + } else { + log.info("Read committed voter with start offset={} from log", highWatermark - 1); + VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark - 1); + for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { + ReplicaKey key = voterNode.voterKey(); + committedVoterStates.put(key.id(), getOrCreateReplicaState(key)); + } + } + } + private Stream followersByDescendingFetchOffset() { return voterStates .values() @@ -1105,12 +1131,13 @@ public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolea @Override public String toString() { return String.format( - "Leader(localVoterNode=%s, epoch=%d, epochStartOffset=%d, highWatermark=%s, voterStates=%s)", + "Leader(localVoterNode=%s, epoch=%d, epochStartOffset=%d, highWatermark=%s, voterStates=%s, committedVoterState=%s)", localVoterNode, epoch, epochStartOffset, highWatermark, - voterStates + voterStates, + committedVoterStates ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index d9cd47a3a2790..2d70c19e8e595 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.message.KRaftVersionRecord; import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.raft.Batch; @@ -35,6 +36,7 @@ import org.slf4j.Logger; +import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -114,6 +116,7 @@ public KRaftControlRecordStateMachine( * Must be called whenever the {@code log} has changed. */ public void updateState() { + maybeLoadSnapshot(); maybeLoadLog(); } @@ -202,6 +205,29 @@ public Optional voterSetAtOffset(long offset) { } } + public VoterSet committedVoterSetFromLog(long offset) { + LogFetchInfo info = log.read(offset, Isolation.COMMITTED); + try (RecordsIterator iterator = new RecordsIterator<>( + info.records, + serde, + bufferSupplier, + maxBatchSizeBytes, + true, // Validate batch CRC + logContext + )) { + while (iterator.hasNext()) { + Batch batch = iterator.next(); + for (ControlRecord controlRecord : batch.controlRecords()) { + // Skip the rest of the control records + if (Objects.requireNonNull(controlRecord.type()) == ControlRecordType.KRAFT_VOTERS) { + return VoterSet.fromVotersRecord((VotersRecord) controlRecord.message()); + } + } + } + } + return staticVoterSet; + } + /** * Returns the finalized kraft version at a given offset. * From 0359cc750f7bc8e7be3b34eee196d3406fa4a23d Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 14 Oct 2025 19:47:02 +0800 Subject: [PATCH 14/33] resolve conflict again --- .../apache/kafka/raft/KafkaRaftClient.java | 6 +- .../org/apache/kafka/raft/LeaderState.java | 12 ++-- .../apache/kafka/raft/LeaderStateTest.java | 60 +++++++++---------- .../raft/internals/KafkaRaftMetricsTest.java | 2 +- 4 files changed, 36 insertions(+), 44 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 64936ba26adf5..e43bd02771022 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -359,11 +359,7 @@ private void updateLeaderEndOffsetAndTimestamp( ) { final LogOffsetMetadata endOffsetMetadata = log.endOffset(); - Map.Entry lastVoterSetWithOffset = partitionState.lastVoterSetWithOffset(); - VoterSet lastVoterSet = lastVoterSetWithOffset.getKey(); - Long lastVoterSetOffset = lastVoterSetWithOffset.getValue(); - - if (state.updateLocalState(endOffsetMetadata, lastVoterSet, lastVoterSetOffset)) { + if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet())) { onUpdateLeaderHighWatermark(state, currentTimeMs); } diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 879fbcb4a7ffa..b0ed9962875c3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -674,8 +674,7 @@ public Optional highWatermark() { @Override public ElectionState election() { - return ElectionState.withElectedLeader( - epoch, localVoterNode.voterKey().id(), Optional.empty(), voterStates.keySet()); + return ElectionState.withElectedLeader(epoch, localVoterNode.voterKey().id(), Optional.empty(), voterStates.keySet()); } @Override @@ -745,7 +744,6 @@ private boolean maybeUpdateHighWatermark() { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; updateCommittedVoter(highWatermark.get().offset()); - logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -810,13 +808,11 @@ private void logHighWatermarkUpdate( * * @param endOffsetMetadata updated log end offset of local replica * @param lastVoterSet the up-to-date voter set - * @param lastVoterSetOffset the offset of up-to-date voter set * @return true if the high watermark is updated as a result of this call */ public boolean updateLocalState( LogOffsetMetadata endOffsetMetadata, - VoterSet lastVoterSet, - long lastVoterSetOffset + VoterSet lastVoterSet ) { ReplicaState state = getOrCreateReplicaState(localVoterNode.voterKey()); state.endOffset.ifPresent(currentEndOffset -> { @@ -827,7 +823,7 @@ public boolean updateLocalState( }); state.updateLeaderEndOffset(endOffsetMetadata); - updateVoterAndObserverStates(lastVoterSet, lastVoterSetOffset); + updateVoterAndObserverStates(lastVoterSet); return maybeUpdateHighWatermark(); } @@ -960,7 +956,7 @@ private boolean isVoter(ReplicaKey remoteReplicaKey) { return state != null && state.matchesKey(remoteReplicaKey); } - private void updateVoterAndObserverStates(VoterSet lastVoterSet, long lastVoterSetOffset) { + private void updateVoterAndObserverStates(VoterSet lastVoterSet) { Map newVoterStates = new HashMap<>(); Map oldVoterStates = new HashMap<>(voterStates); diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index c13d2d47afcad..1cce3a7950b3c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -186,12 +186,12 @@ public void testUpdateHighWatermarkQuorumSizeOne() { ); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voters, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voters)); assertEquals(Set.of(), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voters, 0)); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voters)); assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(20), voters, 0)); + assertTrue(state.updateLocalState(new LogOffsetMetadata(20), voters)); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } @@ -205,11 +205,11 @@ public void testNonMonotonicLocalEndOffsetUpdate() { ); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voters, 0)); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voters)); assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); assertThrows( IllegalStateException.class, - () -> state.updateLocalState(new LogOffsetMetadata(15L), voters, 0) + () -> state.updateLocalState(new LogOffsetMetadata(15L), voters) ); } @@ -222,8 +222,8 @@ public void testIdempotentEndOffsetUpdate() { KRaftVersion.KRAFT_VERSION_1 ); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voters, 0)); - assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voters, 0)); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voters)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voters)); assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); } @@ -238,11 +238,11 @@ public void testUpdateHighWatermarkMetadata() { assertEquals(Optional.empty(), state.highWatermark()); LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar"))); - assertTrue(state.updateLocalState(initialHw, voters, 0)); + assertTrue(state.updateLocalState(initialHw, voters)); assertEquals(Optional.of(initialHw), state.highWatermark()); LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz"))); - assertTrue(state.updateLocalState(updateHw, voters, 0)); + assertTrue(state.updateLocalState(updateHw, voters)); assertEquals(Optional.of(updateHw), state.highWatermark()); } @@ -258,7 +258,7 @@ public void testUpdateHighWatermarkQuorumSizeTwo(boolean withDirectoryId) { KRaftVersion.KRAFT_VERSION_1 ); - assertFalse(state.updateLocalState(new LogOffsetMetadata(13L), voters, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(13L), voters)); assertEquals(Set.of(otherNodeKey), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); assertFalse(state.updateReplicaState(otherNodeKey, 0, new LogOffsetMetadata(10L))); @@ -283,7 +283,7 @@ public void testUpdateHighWatermarkQuorumSizeThree(boolean withDirectoryId) { KRaftVersion.KRAFT_VERSION_1 ); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voters, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voters)); assertEquals(Set.of(nodeKey1, nodeKey2), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); assertFalse(state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(10L))); @@ -294,7 +294,7 @@ public void testUpdateHighWatermarkQuorumSizeThree(boolean withDirectoryId) { assertEquals(Optional.empty(), state.highWatermark()); assertTrue(state.updateReplicaState(nodeKey2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voters, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voters)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertTrue(state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); @@ -314,7 +314,7 @@ public void testHighWatermarkDoesIncreaseFromNewVoter() { KRaftVersion.KRAFT_VERSION_1 ); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoters, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoters)); assertTrue(state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(10L))); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); @@ -324,11 +324,11 @@ public void testHighWatermarkDoesIncreaseFromNewVoter() { // adding node2 to voterSet will cause HW to increase to 15L VoterSet votersWithNode2 = originalVoters.addVoter(VoterSetTest.voterNode(nodeKey2)).get(); - assertTrue(state.updateLocalState(new LogOffsetMetadata(15L), votersWithNode2, 0)); + assertTrue(state.updateLocalState(new LogOffsetMetadata(15L), votersWithNode2)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // HW will not update to 16L until a majority reaches it - assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), votersWithNode2, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), votersWithNode2)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertTrue(state.updateReplicaState(nodeKey2, 0, new LogOffsetMetadata(16L))); assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); @@ -348,7 +348,7 @@ public void testHighWatermarkDoesNotDecreaseFromNewVoter() { KRaftVersion.KRAFT_VERSION_1 ); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoters, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoters)); assertTrue(state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateReplicaState(nodeKey2, 0, new LogOffsetMetadata(10L))); @@ -358,7 +358,7 @@ public void testHighWatermarkDoesNotDecreaseFromNewVoter() { // adding node3 to voterSet should not cause HW to decrease even if majority is < HW VoterSet votersWithNode3 = originalVoters.addVoter(VoterSetTest.voterNode(nodeKey3)).get(); - assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), votersWithNode3, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), votersWithNode3)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // HW will not decrease if calculated HW is anything lower than the last HW @@ -386,20 +386,20 @@ public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { KRaftVersion.KRAFT_VERSION_1 ); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoters, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoters)); assertTrue(state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(15L))); assertFalse(state.updateReplicaState(nodeKey2, 0, new LogOffsetMetadata(10L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // removing node1 should not decrement HW to 10L VoterSet votersWithoutNode1 = originalVoters.removeVoter(nodeKey1).get(); - assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), votersWithoutNode1, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), votersWithoutNode1)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // HW cannot change until after node2 catches up to last HW assertFalse(state.updateReplicaState(nodeKey2, 0, new LogOffsetMetadata(14L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), votersWithoutNode1, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), votersWithoutNode1)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(18L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); @@ -423,20 +423,20 @@ public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { KRaftVersion.KRAFT_VERSION_1 ); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoters, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoters)); assertTrue(state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(15L))); assertFalse(state.updateReplicaState(nodeKey2, 0, new LogOffsetMetadata(10L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // removing leader should not decrement HW to 10L VoterSet votersWithoutLeader = originalVoters.removeVoter(localVoterNode.voterKey()).get(); - assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), votersWithoutLeader, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), votersWithoutLeader)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // HW cannot change until node2 catches up to last HW assertFalse(state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(16L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), votersWithoutLeader, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), votersWithoutLeader)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateReplicaState(nodeKey2, 0, new LogOffsetMetadata(14L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); @@ -461,7 +461,7 @@ public void testNonMonotonicHighWatermarkUpdate(boolean withDirectoryId) { KRaftVersion.KRAFT_VERSION_1 ); - state.updateLocalState(new LogOffsetMetadata(10L), voters, 0); + state.updateLocalState(new LogOffsetMetadata(10L), voters); state.updateReplicaState(nodeKey1, time.milliseconds(), new LogOffsetMetadata(10L)); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); @@ -486,7 +486,7 @@ public void testGetNonLeaderFollowersByFetchOffsetDescending(boolean withDirecto KRaftVersion.KRAFT_VERSION_1 ); - state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voters, 0); + state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voters); assertEquals(Optional.empty(), state.highWatermark()); state.updateReplicaState(nodeKey1, 0, new LogOffsetMetadata(leaderStartOffset)); state.updateReplicaState(nodeKey2, 0, new LogOffsetMetadata(leaderEndOffset)); @@ -568,7 +568,7 @@ public void testCheckQuorumAfterVoterSetChanges() { // Adding 1 new voter to the voter set VoterSet votersWithNode3 = originalVoters.addVoter(VoterSetTest.voterNode(nodeKey3)).get(); - state.updateLocalState(new LogOffsetMetadata(1L), votersWithNode3, 0); + state.updateLocalState(new LogOffsetMetadata(1L), votersWithNode3); time.sleep(checkQuorumTimeoutMs / 2); // received fetch request from 1 voter node, the timer should not be reset because the majority should be 3 @@ -581,7 +581,7 @@ public void testCheckQuorumAfterVoterSetChanges() { // removing leader from the voter set VoterSet votersWithoutLeader = votersWithNode3.removeVoter(localVoterNode.voterKey()).get(); - state.updateLocalState(new LogOffsetMetadata(1L), votersWithoutLeader, 0); + state.updateLocalState(new LogOffsetMetadata(1L), votersWithoutLeader); time.sleep(checkQuorumTimeoutMs / 2); // received fetch request from 1 voter, the timer should not be reset. @@ -648,13 +648,13 @@ public void testUpdateVotersFromNoDirectoryIdToDirectoryId() { KRaftVersion.KRAFT_VERSION_1 ); - assertFalse(state.updateLocalState(new LogOffsetMetadata(10L), votersBeforeUpgrade, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(10L), votersBeforeUpgrade)); assertTrue(state.updateReplicaState(nodeKey1, 0L, new LogOffsetMetadata(10L))); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); VoterSet votersAfterUpgrade = localWithRemoteVoterSet(Stream.of(nodeKey1, nodeKey2), true); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), votersAfterUpgrade, 0)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), votersAfterUpgrade)); assertTrue(state.updateReplicaState(nodeKey2, 0L, new LogOffsetMetadata(13L))); assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark()); } @@ -822,7 +822,7 @@ public void testMaybeAppendUpgradedKRaftVersion() { int follower1 = 1; int follower2 = 2; long epochStartOffset = 10L; - BatchAccumulator accumulator = mock(BatchAccumulator.class); + BatchAccumulator accumulator = Mockito.mock(BatchAccumulator.class); VoterSet persistedVoters = localWithRemoteVoterSet(IntStream.of(follower1, follower2), false); LeaderState state = newLeaderState( diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 1ac868973496d..c6a53742f7c0d 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -223,7 +223,7 @@ public void shouldRecordVoterQuorumState(KRaftVersion kraftVersion) { assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); // todo, bug fix // leader with updated HW - state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(10L), voters, 0); + state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(10L), voters); state.leaderStateOrThrow().updateReplicaState( voterMap.get(1).voterKey(), 0, From 01a2b1873cb7a8a966acb0965dd6178fc9a66d08 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 14 Oct 2025 21:50:19 +0800 Subject: [PATCH 15/33] fix fail test --- .../main/resources/common/message/DescribeQuorumRequest.json | 1 + raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/src/main/resources/common/message/DescribeQuorumRequest.json b/clients/src/main/resources/common/message/DescribeQuorumRequest.json index fa4e641fe229a..787273f83a3a1 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumRequest.json +++ b/clients/src/main/resources/common/message/DescribeQuorumRequest.json @@ -20,6 +20,7 @@ "name": "DescribeQuorumRequest", // Version 1 adds additional fields in the response. The request is unchanged (KIP-836). // Version 2 adds additional fields in the response. The request is unchanged (KIP-853). + // Version 3 adds additional fields in the response. The request is unchanged (KIP-853). "validVersions": "0-3", "flexibleVersions": "0+", "latestVersionUnstable": false, diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index b0ed9962875c3..9ebc6926d0bf0 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -743,7 +743,7 @@ private boolean maybeUpdateHighWatermark() { !highWatermarkUpdateMetadata.metadata().equals(currentHighWatermarkMetadata.metadata()))) { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; - updateCommittedVoter(highWatermark.get().offset()); + highWatermark.ifPresent(highWatermark -> updateCommittedVoter(highWatermark.offset())); logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -880,7 +880,8 @@ public List nonLeaderVotersByDescendingFetchOffset() { } private void updateCommittedVoter(long highWatermark) { - Optional voters = partitionState.voterSetAtOffset(highWatermark); + // high watermark is the offset will be written so we need to minus 1 + Optional voters = partitionState.voterSetAtOffset(highWatermark - 1); if (voters.isPresent()) { // if voters are present in partitionState, we read it from memory for (VoterSet.VoterNode voterNode : voters.get().voterNodes()) { From cd0a1709d25784220941293811ee2144356456e1 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 15 Oct 2025 08:22:47 +0800 Subject: [PATCH 16/33] edge case logic --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 9ebc6926d0bf0..8fc3ee0cc394b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -764,7 +764,7 @@ private boolean maybeUpdateHighWatermark() { } else { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; - highWatermark.ifPresent(logOffsetMetadata -> updateCommittedVoter(logOffsetMetadata.offset())); + highWatermark.ifPresent(highWatermark -> updateCommittedVoter(highWatermark.offset())); logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -889,6 +889,7 @@ private void updateCommittedVoter(long highWatermark) { committedVoterStates.put(voterNode.voterKey().id(), getOrCreateReplicaState(key)); } } else { + highWatermark = Math.min(1, highWatermark); log.info("Read committed voter with start offset={} from log", highWatermark - 1); VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark - 1); for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { From 3853c2c71eec670e14efee50139491879ba5b42d Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 15 Oct 2025 08:50:32 +0800 Subject: [PATCH 17/33] fix fail test --- .../org/apache/kafka/raft/LeaderState.java | 21 +++++++++++++------ .../KRaftControlRecordStateMachine.java | 1 + 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 8fc3ee0cc394b..160949c09d6f5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -883,18 +883,16 @@ private void updateCommittedVoter(long highWatermark) { // high watermark is the offset will be written so we need to minus 1 Optional voters = partitionState.voterSetAtOffset(highWatermark - 1); if (voters.isPresent()) { + log.debug("Read committed voter with start offset={} from memory", highWatermark - 1); // if voters are present in partitionState, we read it from memory for (VoterSet.VoterNode voterNode : voters.get().voterNodes()) { - ReplicaKey key = voterNode.voterKey(); - committedVoterStates.put(voterNode.voterKey().id(), getOrCreateReplicaState(key)); + committedVoterStates.put(voterNode.voterKey().id(), getOrCreateReplicaState(voterNode)); } } else { - highWatermark = Math.min(1, highWatermark); - log.info("Read committed voter with start offset={} from log", highWatermark - 1); + log.debug("Read committed voter with start offset={} from log", highWatermark); VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark - 1); for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { - ReplicaKey key = voterNode.voterKey(); - committedVoterStates.put(key.id(), getOrCreateReplicaState(key)); + committedVoterStates.put(voterNode.voterKey().id(), getOrCreateReplicaState(voterNode)); } } } @@ -923,6 +921,17 @@ public long epochStartOffset() { return epochStartOffset; } + private ReplicaState getOrCreateReplicaState(VoterSet.VoterNode voterNode) { + ReplicaKey replicaKey = voterNode.voterKey(); + ReplicaState voterState = voterStates.get(replicaKey.id()); + if (voterState != null) return voterState; + + ReplicaState observerState = observerStates.get(replicaKey.id()); + if (observerState != null) return observerState; + + return new ReplicaState(replicaKey, false, voterNode.listeners()); + } + private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { ReplicaState state = voterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 3a7e71b91ac1e..382906ae0a6b7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -218,6 +218,7 @@ public Optional voterSetAtOffset(long offset) { } public VoterSet committedVoterSetFromLog(long offset) { + offset = Math.max(0, offset); LogFetchInfo info = log.read(offset, Isolation.COMMITTED); try (RecordsIterator iterator = new RecordsIterator<>( info.records, From 5915adcea2dcb04ac77537ddb7a71596e68e6b28 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 15 Oct 2025 10:46:36 +0800 Subject: [PATCH 18/33] replace old committed voter with new one --- .../org/apache/kafka/raft/LeaderState.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 160949c09d6f5..b35262fd61fde 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -882,19 +882,22 @@ public List nonLeaderVotersByDescendingFetchOffset() { private void updateCommittedVoter(long highWatermark) { // high watermark is the offset will be written so we need to minus 1 Optional voters = partitionState.voterSetAtOffset(highWatermark - 1); + Map newCommittedVoterStates = new HashMap<>(); if (voters.isPresent()) { log.debug("Read committed voter with start offset={} from memory", highWatermark - 1); // if voters are present in partitionState, we read it from memory for (VoterSet.VoterNode voterNode : voters.get().voterNodes()) { - committedVoterStates.put(voterNode.voterKey().id(), getOrCreateReplicaState(voterNode)); - } + newCommittedVoterStates.put(voterNode.voterKey().id(), + getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners()))); } } else { log.debug("Read committed voter with start offset={} from log", highWatermark); VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark - 1); for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { - committedVoterStates.put(voterNode.voterKey().id(), getOrCreateReplicaState(voterNode)); + newCommittedVoterStates.put(voterNode.voterKey().id(), + getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners()))); } } + committedVoterStates = newCommittedVoterStates; } private Stream followersByDescendingFetchOffset() { @@ -921,17 +924,6 @@ public long epochStartOffset() { return epochStartOffset; } - private ReplicaState getOrCreateReplicaState(VoterSet.VoterNode voterNode) { - ReplicaKey replicaKey = voterNode.voterKey(); - ReplicaState voterState = voterStates.get(replicaKey.id()); - if (voterState != null) return voterState; - - ReplicaState observerState = observerStates.get(replicaKey.id()); - if (observerState != null) return observerState; - - return new ReplicaState(replicaKey, false, voterNode.listeners()); - } - private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { ReplicaState state = voterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { From f0ba7e4d3f87845bc7efc20837264a97f1e1e7d7 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 15 Oct 2025 10:51:10 +0800 Subject: [PATCH 19/33] move offset check to leaderState from stateMachine --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 4 +++- .../kafka/raft/internals/KRaftControlRecordStateMachine.java | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index b35262fd61fde..5e36ec03713a4 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -890,7 +890,9 @@ private void updateCommittedVoter(long highWatermark) { newCommittedVoterStates.put(voterNode.voterKey().id(), getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners()))); } } else { - log.debug("Read committed voter with start offset={} from log", highWatermark); + // The log starting offset must great or equals zero so we need to set minimize as one. + highWatermark = Math.max(1, highWatermark); + log.debug("Read committed voter with start offset={} from log", highWatermark - 1); VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark - 1); for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { newCommittedVoterStates.put(voterNode.voterKey().id(), diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 382906ae0a6b7..3a7e71b91ac1e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -218,7 +218,6 @@ public Optional voterSetAtOffset(long offset) { } public VoterSet committedVoterSetFromLog(long offset) { - offset = Math.max(0, offset); LogFetchInfo info = log.read(offset, Isolation.COMMITTED); try (RecordsIterator iterator = new RecordsIterator<>( info.records, From 17b0cdf4af0c1f56be1749976198d3b24e6490c2 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 15 Oct 2025 21:48:48 +0800 Subject: [PATCH 20/33] fix NPE --- .../src/test/java/org/apache/kafka/raft/LeaderStateTest.java | 4 ++++ .../src/test/java/org/apache/kafka/raft/QuorumStateTest.java | 4 ++++ .../apache/kafka/raft/internals/KafkaRaftMetricsTest.java | 5 +++++ 3 files changed, 13 insertions(+) diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 1cce3a7950b3c..e8ae09ee9f037 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -47,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; public class LeaderStateTest { @@ -78,6 +79,9 @@ private LeaderState newLeaderState( KRaftVersion kraftVersion, BatchAccumulator accumulator ) { + Mockito + .when(partitionState.voterSetAtOffset(anyLong())) + .thenReturn(Optional.of(voters)); return new LeaderState<>( time, localVoterNode, diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 76d942d2bd98b..db71088c6892f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; public class QuorumStateTest { private final int localId = 0; @@ -77,6 +78,9 @@ private QuorumState buildQuorumState( Mockito .when(mockPartitionState.lastKraftVersion()) .thenReturn(kraftVersion); + Mockito + .when(mockPartitionState.voterSetAtOffset(anyLong())) + .thenReturn(Optional.of(voterSet)); return new QuorumState( localId, diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index c6a53742f7c0d..282251526d9fe 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -38,6 +38,7 @@ import org.mockito.Mockito; import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Random; @@ -45,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.anyLong; public class KafkaRaftMetricsTest { @@ -77,6 +79,9 @@ private QuorumState buildQuorumState(VoterSet voterSet, KRaftVersion kraftVersio Mockito .when(mockPartitionState.lastVoterSetOffset()) .thenReturn(kraftVersion.isReconfigSupported() ? OptionalLong.of(0) : OptionalLong.empty()); + Mockito + .when(mockPartitionState.voterSetAtOffset(anyLong())) + .thenReturn(Optional.of(voterSet)); Mockito .when(mockPartitionState.lastKraftVersion()) .thenReturn(kraftVersion); From 771ea5fdeaa6cef2e583438822aef3cd2ee9f9fe Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 15 Oct 2025 23:04:22 +0800 Subject: [PATCH 21/33] fix highwatermark issue --- .../org/apache/kafka/raft/LeaderState.java | 10 ++++----- .../KRaftControlRecordStateMachine.java | 22 ++++++------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 5e36ec03713a4..d54f7dca36b00 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -881,19 +881,17 @@ public List nonLeaderVotersByDescendingFetchOffset() { private void updateCommittedVoter(long highWatermark) { // high watermark is the offset will be written so we need to minus 1 - Optional voters = partitionState.voterSetAtOffset(highWatermark - 1); + Optional voters = partitionState.voterSetAtOffsetUnchecked(highWatermark); Map newCommittedVoterStates = new HashMap<>(); if (voters.isPresent()) { - log.debug("Read committed voter with start offset={} from memory", highWatermark - 1); + log.debug("Read committed voter with start offset={} from memory", highWatermark); // if voters are present in partitionState, we read it from memory for (VoterSet.VoterNode voterNode : voters.get().voterNodes()) { newCommittedVoterStates.put(voterNode.voterKey().id(), getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners()))); } } else { - // The log starting offset must great or equals zero so we need to set minimize as one. - highWatermark = Math.max(1, highWatermark); - log.debug("Read committed voter with start offset={} from log", highWatermark - 1); - VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark - 1); + log.debug("Read committed voter with start offset={} from log", highWatermark); + VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark); for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { newCommittedVoterStates.put(voterNode.voterKey().id(), getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners()))); diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 3a7e71b91ac1e..f23d440114e33 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -36,7 +36,6 @@ import org.slf4j.Logger; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -164,17 +163,6 @@ public VoterSet lastVoterSet() { } } - /** - * Returns the last voter set with its offset. - */ - public Map.Entry lastVoterSetWithOffset() { - synchronized (voterSetHistory) { - VoterSet voters = voterSetHistory.lastValue(); - Long offset = voterSetHistory.lastVoterSetOffset().orElse(SMALLEST_LOG_OFFSET); - return Map.entry(voters, offset); - } - } - /** * Return the latest entry for the set of voters. */ @@ -212,12 +200,17 @@ public KRaftVersion lastKraftVersion() { public Optional voterSetAtOffset(long offset) { checkOffsetIsValid(offset); + return voterSetAtOffsetUnchecked(offset); + } + + public Optional voterSetAtOffsetUnchecked(long offset) { synchronized (voterSetHistory) { return voterSetHistory.valueAtOrBefore(offset); } } public VoterSet committedVoterSetFromLog(long offset) { + VoterSet voterSet = staticVoterSet; LogFetchInfo info = log.read(offset, Isolation.COMMITTED); try (RecordsIterator iterator = new RecordsIterator<>( info.records, @@ -230,14 +223,13 @@ public VoterSet committedVoterSetFromLog(long offset) { while (iterator.hasNext()) { Batch batch = iterator.next(); for (ControlRecord controlRecord : batch.controlRecords()) { - // Skip the rest of the control records if (Objects.requireNonNull(controlRecord.type()) == ControlRecordType.KRAFT_VOTERS) { - return VoterSet.fromVotersRecord((VotersRecord) controlRecord.message()); + voterSet = VoterSet.fromVotersRecord((VotersRecord) controlRecord.message()); } } } } - return staticVoterSet; + return voterSet; } /** From d5a6c9ad144d16f49e5194932d561d96ffcf76a3 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 15 Oct 2025 23:55:45 +0800 Subject: [PATCH 22/33] fix initial kraft bug --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index d54f7dca36b00..6f3697ecdff68 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -152,7 +152,7 @@ protected LeaderState( ); this.committedVoterStates.put( voterNode.voterKey().id(), - new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()) + getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners())) ); } this.grantingVoters = Set.copyOf(grantingVoters); From b04a03e1cb1e7b1cec9180dc66bed86d9173bb25 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 16 Oct 2025 05:42:56 +0800 Subject: [PATCH 23/33] fix test and refactor --- .../java/org/apache/kafka/raft/LeaderState.java | 16 +++++++++------- .../org/apache/kafka/raft/LeaderStateTest.java | 3 +++ .../org/apache/kafka/raft/QuorumStateTest.java | 9 ++++++--- .../raft/internals/KafkaRaftMetricsTest.java | 3 +++ 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 6f3697ecdff68..9e88c160a4257 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -151,8 +151,7 @@ protected LeaderState( new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()) ); this.committedVoterStates.put( - voterNode.voterKey().id(), - getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners())) + voterNode.voterKey().id(), getOrBuildReplicaState(voterNode) ); } this.grantingVoters = Set.copyOf(grantingVoters); @@ -880,23 +879,22 @@ public List nonLeaderVotersByDescendingFetchOffset() { } private void updateCommittedVoter(long highWatermark) { - // high watermark is the offset will be written so we need to minus 1 Optional voters = partitionState.voterSetAtOffsetUnchecked(highWatermark); Map newCommittedVoterStates = new HashMap<>(); if (voters.isPresent()) { log.debug("Read committed voter with start offset={} from memory", highWatermark); // if voters are present in partitionState, we read it from memory for (VoterSet.VoterNode voterNode : voters.get().voterNodes()) { - newCommittedVoterStates.put(voterNode.voterKey().id(), - getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners()))); } + newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); + } } else { log.debug("Read committed voter with start offset={} from log", highWatermark); VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark); for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { - newCommittedVoterStates.put(voterNode.voterKey().id(), - getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners()))); + newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); } } + committedVoterStates.clear(); committedVoterStates = newCommittedVoterStates; } @@ -934,6 +932,10 @@ private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { return state; } + public ReplicaState getOrBuildReplicaState(VoterSet.VoterNode voterNode) { + return getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners())); + } + public Optional getReplicaState(ReplicaKey replicaKey) { ReplicaState state = voterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index e8ae09ee9f037..423a6a616b105 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -82,6 +82,9 @@ private LeaderState newLeaderState( Mockito .when(partitionState.voterSetAtOffset(anyLong())) .thenReturn(Optional.of(voters)); + Mockito + .when(partitionState.committedVoterSetFromLog(anyLong())) + .thenReturn(voters); return new LeaderState<>( time, localVoterNode, diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index db71088c6892f..6409d151d57ac 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -75,12 +75,15 @@ private QuorumState buildQuorumState( Mockito .when(mockPartitionState.lastVoterSetOffset()) .thenReturn(kraftVersion.isReconfigSupported() ? OptionalLong.of(0) : OptionalLong.empty()); - Mockito - .when(mockPartitionState.lastKraftVersion()) - .thenReturn(kraftVersion); Mockito .when(mockPartitionState.voterSetAtOffset(anyLong())) .thenReturn(Optional.of(voterSet)); + Mockito + .when(mockPartitionState.committedVoterSetFromLog(anyLong())) + .thenReturn(voterSet); + Mockito + .when(mockPartitionState.lastKraftVersion()) + .thenReturn(kraftVersion); return new QuorumState( localId, diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 282251526d9fe..1d1e6ee30547f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -82,6 +82,9 @@ private QuorumState buildQuorumState(VoterSet voterSet, KRaftVersion kraftVersio Mockito .when(mockPartitionState.voterSetAtOffset(anyLong())) .thenReturn(Optional.of(voterSet)); + Mockito + .when(mockPartitionState.committedVoterSetFromLog(anyLong())) + .thenReturn(voterSet); Mockito .when(mockPartitionState.lastKraftVersion()) .thenReturn(kraftVersion); From 94a3bbdaec3f50489ae6ba5103fc28b811c0494f Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 16 Oct 2025 12:36:14 +0800 Subject: [PATCH 24/33] fix fail test --- .../src/main/java/org/apache/kafka/raft/LeaderState.java | 9 ++++++--- .../raft/internals/KRaftControlRecordStateMachine.java | 7 +++++-- .../test/java/org/apache/kafka/raft/QuorumStateTest.java | 4 ++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 9e88c160a4257..b2b4fc146f5d6 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -150,9 +150,6 @@ protected LeaderState( voterNode.voterKey().id(), new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()) ); - this.committedVoterStates.put( - voterNode.voterKey().id(), getOrBuildReplicaState(voterNode) - ); } this.grantingVoters = Set.copyOf(grantingVoters); this.log = logContext.logger(LeaderState.class); @@ -171,6 +168,12 @@ protected LeaderState( kafkaRaftMetrics.addLeaderMetrics(); this.kafkaRaftMetrics = kafkaRaftMetrics; + if (highWatermark.isPresent()) { + this.updateCommittedVoter(highWatermark.get().offset()); + } else { + this.updateCommittedVoter(-1); + } + if (!kraftVersionAtEpochStart.isReconfigSupported()) { var updatedVoters = voterSetAtEpochStart .updateVoterIgnoringDirectoryId(localVoterNode) diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index f23d440114e33..09d4626cb7065 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -116,7 +116,6 @@ public KRaftControlRecordStateMachine( * Must be called whenever the {@code log} has changed. */ public void updateState() { - maybeLoadSnapshot(); maybeLoadLog(); } @@ -199,7 +198,6 @@ public KRaftVersion lastKraftVersion() { */ public Optional voterSetAtOffset(long offset) { checkOffsetIsValid(offset); - return voterSetAtOffsetUnchecked(offset); } @@ -211,6 +209,11 @@ public Optional voterSetAtOffsetUnchecked(long offset) { public VoterSet committedVoterSetFromLog(long offset) { VoterSet voterSet = staticVoterSet; + + if (offset < 0) { + return voterSet; + } + LogFetchInfo info = log.read(offset, Isolation.COMMITTED); try (RecordsIterator iterator = new RecordsIterator<>( info.records, diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 6409d151d57ac..260ecf32ce435 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -79,8 +79,8 @@ private QuorumState buildQuorumState( .when(mockPartitionState.voterSetAtOffset(anyLong())) .thenReturn(Optional.of(voterSet)); Mockito - .when(mockPartitionState.committedVoterSetFromLog(anyLong())) - .thenReturn(voterSet); + .when(mockPartitionState.committedVoterSetFromLog(anyLong())) + .thenReturn(voterSet); Mockito .when(mockPartitionState.lastKraftVersion()) .thenReturn(kraftVersion); From 7660e2aa9d8c4ac0ccd4c7b265e3ac75c26d28d1 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 16 Oct 2025 12:47:23 +0800 Subject: [PATCH 25/33] add comment --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 1 + 1 file changed, 1 insertion(+) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index b2b4fc146f5d6..a8c6d28547940 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -882,6 +882,7 @@ public List nonLeaderVotersByDescendingFetchOffset() { } private void updateCommittedVoter(long highWatermark) { + // The Latest set of voters can appear at offset -1 so we need to relex the check Optional voters = partitionState.voterSetAtOffsetUnchecked(highWatermark); Map newCommittedVoterStates = new HashMap<>(); if (voters.isPresent()) { From 874b231ac54dda9f2c0e2741f8a3ffd66fb938ae Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 16 Oct 2025 14:29:12 +0800 Subject: [PATCH 26/33] remove read from log and using static voter directly --- .../org/apache/kafka/raft/LeaderState.java | 8 ++--- .../KRaftControlRecordStateMachine.java | 30 ++----------------- .../apache/kafka/raft/LeaderStateTest.java | 4 +-- .../apache/kafka/raft/QuorumStateTest.java | 2 +- .../raft/internals/KafkaRaftMetricsTest.java | 4 +-- 5 files changed, 11 insertions(+), 37 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index a8c6d28547940..93ea92d34730d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -882,18 +882,18 @@ public List nonLeaderVotersByDescendingFetchOffset() { } private void updateCommittedVoter(long highWatermark) { - // The Latest set of voters can appear at offset -1 so we need to relex the check + // Relaxing the check and we can use high watermark to get voters + // if there is not this change, it will throw IllegalArgumentException Optional voters = partitionState.voterSetAtOffsetUnchecked(highWatermark); Map newCommittedVoterStates = new HashMap<>(); if (voters.isPresent()) { log.debug("Read committed voter with start offset={} from memory", highWatermark); - // if voters are present in partitionState, we read it from memory for (VoterSet.VoterNode voterNode : voters.get().voterNodes()) { newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); } } else { - log.debug("Read committed voter with start offset={} from log", highWatermark); - VoterSet committedvoterSet = partitionState.committedVoterSetFromLog(highWatermark); + // Once there are no voters, it means we use static voter when initializing. + VoterSet committedvoterSet = partitionState.staticVoterSet(); for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 09d4626cb7065..aa73794bbef99 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.message.KRaftVersionRecord; import org.apache.kafka.common.message.VotersRecord; -import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.raft.Batch; @@ -36,7 +35,6 @@ import org.slf4j.Logger; -import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -207,32 +205,8 @@ public Optional voterSetAtOffsetUnchecked(long offset) { } } - public VoterSet committedVoterSetFromLog(long offset) { - VoterSet voterSet = staticVoterSet; - - if (offset < 0) { - return voterSet; - } - - LogFetchInfo info = log.read(offset, Isolation.COMMITTED); - try (RecordsIterator iterator = new RecordsIterator<>( - info.records, - serde, - bufferSupplier, - maxBatchSizeBytes, - true, // Validate batch CRC - logContext - )) { - while (iterator.hasNext()) { - Batch batch = iterator.next(); - for (ControlRecord controlRecord : batch.controlRecords()) { - if (Objects.requireNonNull(controlRecord.type()) == ControlRecordType.KRAFT_VOTERS) { - voterSet = VoterSet.fromVotersRecord((VotersRecord) controlRecord.message()); - } - } - } - } - return voterSet; + public VoterSet staticVoterSet() { + return staticVoterSet; } /** diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 423a6a616b105..523e0e72170e4 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -83,8 +83,8 @@ private LeaderState newLeaderState( .when(partitionState.voterSetAtOffset(anyLong())) .thenReturn(Optional.of(voters)); Mockito - .when(partitionState.committedVoterSetFromLog(anyLong())) - .thenReturn(voters); + .when(partitionState.staticVoterSet()) + .thenReturn(voters); return new LeaderState<>( time, localVoterNode, diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 260ecf32ce435..42ddb81e5a437 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -79,7 +79,7 @@ private QuorumState buildQuorumState( .when(mockPartitionState.voterSetAtOffset(anyLong())) .thenReturn(Optional.of(voterSet)); Mockito - .when(mockPartitionState.committedVoterSetFromLog(anyLong())) + .when(mockPartitionState.lastVoterSet()) .thenReturn(voterSet); Mockito .when(mockPartitionState.lastKraftVersion()) diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 1d1e6ee30547f..40e57f5e8d04f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -83,8 +83,8 @@ private QuorumState buildQuorumState(VoterSet voterSet, KRaftVersion kraftVersio .when(mockPartitionState.voterSetAtOffset(anyLong())) .thenReturn(Optional.of(voterSet)); Mockito - .when(mockPartitionState.committedVoterSetFromLog(anyLong())) - .thenReturn(voterSet); + .when(mockPartitionState.staticVoterSet()) + .thenReturn(voterSet); Mockito .when(mockPartitionState.lastKraftVersion()) .thenReturn(kraftVersion); From e1d8db3877f6d6b9f3d6676211ae2c60a1e3b392 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 16 Oct 2025 16:52:40 +0800 Subject: [PATCH 27/33] fix java 25 build --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 93ea92d34730d..c0d802cfa864e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -163,17 +163,10 @@ protected LeaderState( this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart; this.kraftVersionAtEpochStart = kraftVersionAtEpochStart; this.partitionState = partitionState; - offsetOfVotersAtEpochStart.ifPresent(this::updateCommittedVoter); kafkaRaftMetrics.addLeaderMetrics(); this.kafkaRaftMetrics = kafkaRaftMetrics; - if (highWatermark.isPresent()) { - this.updateCommittedVoter(highWatermark.get().offset()); - } else { - this.updateCommittedVoter(-1); - } - if (!kraftVersionAtEpochStart.isReconfigSupported()) { var updatedVoters = voterSetAtEpochStart .updateVoterIgnoringDirectoryId(localVoterNode) From 11a3812747fee3fe605917d650239c00aca45c36 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 16 Oct 2025 23:55:14 +0800 Subject: [PATCH 28/33] fix all fail test --- .../org/apache/kafka/raft/LeaderState.java | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index c0d802cfa864e..eb8edc6be77a5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -143,14 +143,6 @@ protected LeaderState( this.localVoterNode = localVoterNode; this.epoch = epoch; this.epochStartOffset = epochStartOffset; - - for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) { - boolean hasAcknowledgedLeader = voterNode.isVoter(localVoterNode.voterKey()); - this.voterStates.put( - voterNode.voterKey().id(), - new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()) - ); - } this.grantingVoters = Set.copyOf(grantingVoters); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); @@ -164,6 +156,17 @@ protected LeaderState( this.kraftVersionAtEpochStart = kraftVersionAtEpochStart; this.partitionState = partitionState; + // When offset is empty or -1, it means the cluster is starting up. + boolean isInitializing = offsetOfVotersAtEpochStart.isEmpty() || offsetOfVotersAtEpochStart.getAsLong() == -1; + for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) { + boolean hasAcknowledgedLeader = voterNode.isVoter(localVoterNode.voterKey()); + ReplicaState state = new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()); + this.voterStates.put(voterNode.voterKey().id(), state); + if (isInitializing) { + this.committedVoterStates.put(voterNode.voterKey().id(), state); + } + } + kafkaRaftMetrics.addLeaderMetrics(); this.kafkaRaftMetrics = kafkaRaftMetrics; @@ -717,6 +720,7 @@ private boolean maybeUpdateHighWatermark() { int indexOfHw = voterStates.size() / 2; Optional highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset; + highWatermark.ifPresent(this::updateCommittedVoter); if (highWatermarkUpdateOpt.isPresent()) { @@ -738,7 +742,7 @@ private boolean maybeUpdateHighWatermark() { !highWatermarkUpdateMetadata.metadata().equals(currentHighWatermarkMetadata.metadata()))) { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; - highWatermark.ifPresent(highWatermark -> updateCommittedVoter(highWatermark.offset())); + updateCommittedVoter(currentHighWatermarkMetadata); logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -759,7 +763,7 @@ private boolean maybeUpdateHighWatermark() { } else { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; - highWatermark.ifPresent(highWatermark -> updateCommittedVoter(highWatermark.offset())); + updateCommittedVoter(highWatermarkUpdateMetadata); logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -874,24 +878,22 @@ public List nonLeaderVotersByDescendingFetchOffset() { .collect(Collectors.toList()); } - private void updateCommittedVoter(long highWatermark) { - // Relaxing the check and we can use high watermark to get voters - // if there is not this change, it will throw IllegalArgumentException - Optional voters = partitionState.voterSetAtOffsetUnchecked(highWatermark); + private void updateCommittedVoter(LogOffsetMetadata highWatermark) { + log.debug("Updating committed voter states based on high watermark={} ", highWatermark); Map newCommittedVoterStates = new HashMap<>(); - if (voters.isPresent()) { - log.debug("Read committed voter with start offset={} from memory", highWatermark); - for (VoterSet.VoterNode voterNode : voters.get().voterNodes()) { + + // Try to retrieve the voter set at the given high watermark offset; + // if unavailable, fall back to the static voter set. + partitionState.voterSetAtOffsetUnchecked(highWatermark.offset()).ifPresentOrElse(voterSet -> { + for (VoterSet.VoterNode voterNode : voterSet.voterNodes()) { newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); } - } else { - // Once there are no voters, it means we use static voter when initializing. - VoterSet committedvoterSet = partitionState.staticVoterSet(); - for (VoterSet.VoterNode voterNode : committedvoterSet.voterNodes()) { + }, () -> { + for (VoterSet.VoterNode voterNode : partitionState.staticVoterSet().voterNodes()) { newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); } - } - committedVoterStates.clear(); + }); + committedVoterStates = newCommittedVoterStates; } From ae588d037b9f08758f17839d5cfa263e73fedcb7 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 17 Oct 2025 00:02:52 +0800 Subject: [PATCH 29/33] improve debug info --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index eb8edc6be77a5..89cf0e92bca7c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -879,7 +879,6 @@ public List nonLeaderVotersByDescendingFetchOffset() { } private void updateCommittedVoter(LogOffsetMetadata highWatermark) { - log.debug("Updating committed voter states based on high watermark={} ", highWatermark); Map newCommittedVoterStates = new HashMap<>(); // Try to retrieve the voter set at the given high watermark offset; @@ -893,7 +892,7 @@ private void updateCommittedVoter(LogOffsetMetadata highWatermark) { newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); } }); - + log.debug("Updating committed voter at high watermark={} and committedVoter={}", highWatermark, newCommittedVoterStates); committedVoterStates = newCommittedVoterStates; } From 10727b7ed3a43640592f1f411e970eb63b63eb7c Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 17 Oct 2025 12:54:55 +0800 Subject: [PATCH 30/33] remove unnecessary update --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 1 - 1 file changed, 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 89cf0e92bca7c..23e4e34a23841 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -720,7 +720,6 @@ private boolean maybeUpdateHighWatermark() { int indexOfHw = voterStates.size() / 2; Optional highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset; - highWatermark.ifPresent(this::updateCommittedVoter); if (highWatermarkUpdateOpt.isPresent()) { From f2e4c025b01581a68e9ab12f17340042a79e8015 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 17 Oct 2025 21:37:48 +0800 Subject: [PATCH 31/33] remove unnecessary public method --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 3 ++- .../kafka/raft/internals/KRaftControlRecordStateMachine.java | 3 --- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 23e4e34a23841..36d201697b6c3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -882,7 +882,8 @@ private void updateCommittedVoter(LogOffsetMetadata highWatermark) { // Try to retrieve the voter set at the given high watermark offset; // if unavailable, fall back to the static voter set. - partitionState.voterSetAtOffsetUnchecked(highWatermark.offset()).ifPresentOrElse(voterSet -> { + // Note: highWatermark is the offset will be written so it is exclusive. + partitionState.voterSetAtOffset(highWatermark.offset() - 1).ifPresentOrElse(voterSet -> { for (VoterSet.VoterNode voterNode : voterSet.voterNodes()) { newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index aa73794bbef99..2259ff8c7e0dd 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -196,10 +196,7 @@ public KRaftVersion lastKraftVersion() { */ public Optional voterSetAtOffset(long offset) { checkOffsetIsValid(offset); - return voterSetAtOffsetUnchecked(offset); - } - public Optional voterSetAtOffsetUnchecked(long offset) { synchronized (voterSetHistory) { return voterSetHistory.valueAtOrBefore(offset); } From de0dad92fc6c4681c2d453b0f6a25a83ed7f4d4e Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Mon, 10 Nov 2025 20:58:12 +0800 Subject: [PATCH 32/33] rename --- .../java/org/apache/kafka/raft/LeaderState.java | 14 +++++++------- .../ReconfigurableQuorumIntegrationTest.java | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 9f51569f0ee21..f826a1ac71195 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -835,7 +835,7 @@ public boolean updateLocalState( LogOffsetMetadata endOffsetMetadata, VoterSet lastVoterSet ) { - ReplicaState state = getOrCreateReplicaState(localVoterNode.voterKey()); + ReplicaState state = getOrCreateObserverReplicaState(localVoterNode.voterKey()); state.endOffset.ifPresent(currentEndOffset -> { if (currentEndOffset.offset() > endOffsetMetadata.offset()) { throw new IllegalStateException("Detected non-monotonic update of local " + @@ -872,7 +872,7 @@ public boolean updateReplicaState( ); } - ReplicaState state = getOrCreateReplicaState(replicaKey); + ReplicaState state = getOrCreateObserverReplicaState(replicaKey); state.endOffset.ifPresent(currentEndOffset -> { if (currentEndOffset.offset() > fetchOffsetMetadata.offset()) { @@ -881,7 +881,7 @@ public boolean updateReplicaState( } }); - Optional leaderEndOffsetOpt = getOrCreateReplicaState(localVoterNode.voterKey()).endOffset; + Optional leaderEndOffsetOpt = getOrCreateObserverReplicaState(localVoterNode.voterKey()).endOffset; state.updateFollowerState( currentTimeMs, @@ -908,11 +908,11 @@ private void updateCommittedVoter(LogOffsetMetadata highWatermark) { // Note: highWatermark is the offset will be written so it is exclusive. partitionState.voterSetAtOffset(highWatermark.offset() - 1).ifPresentOrElse(voterSet -> { for (VoterSet.VoterNode voterNode : voterSet.voterNodes()) { - newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); + newCommittedVoterStates.put(voterNode.voterKey().id(), getOrCreateVoterReplicaState(voterNode)); } }, () -> { for (VoterSet.VoterNode voterNode : partitionState.staticVoterSet().voterNodes()) { - newCommittedVoterStates.put(voterNode.voterKey().id(), getOrBuildReplicaState(voterNode)); + newCommittedVoterStates.put(voterNode.voterKey().id(), getOrCreateVoterReplicaState(voterNode)); } }); log.debug("Updating committed voter at high watermark={} and committedVoter={}", highWatermark, newCommittedVoterStates); @@ -943,7 +943,7 @@ public long epochStartOffset() { return epochStartOffset; } - private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { + private ReplicaState getOrCreateObserverReplicaState(ReplicaKey replicaKey) { ReplicaState state = voterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty())); @@ -953,7 +953,7 @@ private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { return state; } - public ReplicaState getOrBuildReplicaState(VoterSet.VoterNode voterNode) { + public ReplicaState getOrCreateVoterReplicaState(VoterSet.VoterNode voterNode) { return getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners())); } diff --git a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java index ad30708d7c5db..8f67b30e838c7 100644 --- a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java @@ -346,4 +346,4 @@ public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception { } } } -} \ No newline at end of file +} From 241c047f2244af1a5b61d62782427ee914bed503 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 11 Nov 2025 04:53:27 +0800 Subject: [PATCH 33/33] if hightwart is missing return empty set --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index f826a1ac71195..34b85512e5b22 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -713,6 +713,10 @@ Map voterStates() { } Map committedVoterStates() { + if (highWatermark.isEmpty()) { + return Map.of(); + } + return committedVoterStates; }