diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 9a50f52fb44ce..9cb509f10cd98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4684,6 +4684,10 @@ private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.Partition .map(this::translateReplicaState) .collect(Collectors.toList()); + List committedVoters = partition.committedVoters().stream() + .map(this::translateReplicaState) + .collect(Collectors.toList()); + List observers = partition.observers().stream() .map(this::translateReplicaState) .collect(Collectors.toList()); @@ -4701,6 +4705,7 @@ private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.Partition partition.leaderEpoch(), partition.highWatermark(), voters, + committedVoters, observers, nodes ); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java index 5264b6f6aae7a..3a898737c252e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java @@ -31,6 +31,7 @@ public class QuorumInfo { private final long leaderEpoch; private final long highWatermark; private final List voters; + private final List committedVoters; private final List observers; private final Map nodes; @@ -39,6 +40,7 @@ public class QuorumInfo { long leaderEpoch, long highWatermark, List voters, + List committedVoters, List observers, Map nodes ) { @@ -46,6 +48,7 @@ public class QuorumInfo { this.leaderEpoch = leaderEpoch; this.highWatermark = highWatermark; this.voters = voters; + this.committedVoters = committedVoters; this.observers = observers; this.nodes = nodes; } @@ -66,6 +69,10 @@ public List voters() { return voters; } + public List committedVoters() { + return committedVoters; + } + public List observers() { return observers; } diff --git a/clients/src/main/resources/common/message/DescribeQuorumRequest.json b/clients/src/main/resources/common/message/DescribeQuorumRequest.json index 7b9ee5a2328ec..787273f83a3a1 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumRequest.json +++ b/clients/src/main/resources/common/message/DescribeQuorumRequest.json @@ -20,7 +20,8 @@ "name": "DescribeQuorumRequest", // Version 1 adds additional fields in the response. The request is unchanged (KIP-836). // Version 2 adds additional fields in the response. The request is unchanged (KIP-853). - "validVersions": "0-2", + // Version 3 adds additional fields in the response. The request is unchanged (KIP-853). + "validVersions": "0-3", "flexibleVersions": "0+", "latestVersionUnstable": false, "fields": [ diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json index b5b51d1a728a2..f8cd08a4e84dc 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json +++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json @@ -18,8 +18,9 @@ "type": "response", "name": "DescribeQuorumResponse", // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836). - // Version 2 adds ErrorMessage, Nodes, ErrorMessage in PartitionData, ReplicaDirectoryId in ReplicaState (KIP-853). - "validVersions": "0-2", + // Version 2 adds ErrorMessage, Nodes, ErrorMessage in ParitionData, ReplicaDirectoryId in ReplicaState (KIP-853). + // Version 3 adds CommittedVoters in PartitionData + "validVersions": "0-3", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", @@ -46,6 +47,8 @@ "about": "The high water mark."}, { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+", "about": "The current voters of the partition."}, + { "name": "CommittedVoters", "type": "[]ReplicaState", "versions": "3+", "ignorable": true, + "about": "The voters has been committed."}, { "name": "Observers", "type": "[]ReplicaState", "versions": "0+", "about": "The observers of the partition."} ]} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 55b4119f0ce48..6a06adb141de5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -805,7 +805,12 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures } private static QuorumInfo defaultQuorumInfo(boolean emptyOptionals) { - return new QuorumInfo(1, 1, 1L, + return new QuorumInfo(1, 1L, 1L, + singletonList(new QuorumInfo.ReplicaState(1, + emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID, + 100, + emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000), + emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))), singletonList(new QuorumInfo.ReplicaState(1, emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID, 100, diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index d6623bca8ab0d..b999dc0753f0e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1893,6 +1893,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest( leaderState.epoch(), leaderState.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L), leaderState.voterStates().values(), + leaderState.committedVoterStates().values(), leaderState.observerStates(currentTimeMs).values(), currentTimeMs ); diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 23a74f33ae4b1..34b85512e5b22 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -32,6 +32,7 @@ import org.apache.kafka.raft.errors.NotLeaderException; import org.apache.kafka.raft.internals.AddVoterHandlerState; import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine; import org.apache.kafka.raft.internals.KRaftVersionUpgrade; import org.apache.kafka.raft.internals.KafkaRaftMetrics; import org.apache.kafka.raft.internals.RemoveVoterHandlerState; @@ -72,9 +73,11 @@ public class LeaderState implements EpochState { // This field is non-empty if the voter set at epoch start came from a snapshot or log segment private final OptionalLong offsetOfVotersAtEpochStart; private final KRaftVersion kraftVersionAtEpochStart; + private final KRaftControlRecordStateMachine partitionState; private Optional highWatermark = Optional.empty(); private Map voterStates = new HashMap<>(); + private Map committedVoterStates = new HashMap<>(); private Optional addVoterHandlerState = Optional.empty(); private Optional removeVoterHandlerState = Optional.empty(); @@ -120,7 +123,8 @@ protected LeaderState( BatchAccumulator accumulator, int fetchTimeoutMs, LogContext logContext, - KafkaRaftMetrics kafkaRaftMetrics + KafkaRaftMetrics kafkaRaftMetrics, + KRaftControlRecordStateMachine partitionState ) { if (localVoterNode.voterKey().directoryId().isEmpty()) { throw new IllegalArgumentException( @@ -139,14 +143,6 @@ protected LeaderState( this.localVoterNode = localVoterNode; this.epoch = epoch; this.epochStartOffset = epochStartOffset; - - for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) { - boolean hasAcknowledgedLeader = voterNode.isVoter(localVoterNode.voterKey()); - this.voterStates.put( - voterNode.voterKey().id(), - new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()) - ); - } this.grantingVoters = Set.copyOf(grantingVoters); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); @@ -158,6 +154,18 @@ protected LeaderState( this.voterSetAtEpochStart = voterSetAtEpochStart; this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart; this.kraftVersionAtEpochStart = kraftVersionAtEpochStart; + this.partitionState = partitionState; + + // When offset is empty or -1, it means the cluster is starting up. + boolean isInitializing = offsetOfVotersAtEpochStart.isEmpty() || offsetOfVotersAtEpochStart.getAsLong() == -1; + for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) { + boolean hasAcknowledgedLeader = voterNode.isVoter(localVoterNode.voterKey()); + ReplicaState state = new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()); + this.voterStates.put(voterNode.voterKey().id(), state); + if (isInitializing) { + this.committedVoterStates.put(voterNode.voterKey().id(), state); + } + } kafkaRaftMetrics.addLeaderMetrics(); this.kafkaRaftMetrics = kafkaRaftMetrics; @@ -704,6 +712,14 @@ Map voterStates() { return voterStates; } + Map committedVoterStates() { + if (highWatermark.isEmpty()) { + return Map.of(); + } + + return committedVoterStates; + } + Map observerStates(final long currentTimeMs) { clearInactiveObservers(currentTimeMs); return observerStates; @@ -752,6 +768,7 @@ private boolean maybeUpdateHighWatermark() { !highWatermarkUpdateMetadata.metadata().equals(currentHighWatermarkMetadata.metadata()))) { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; + updateCommittedVoter(currentHighWatermarkMetadata); logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -772,6 +789,7 @@ private boolean maybeUpdateHighWatermark() { } else { Optional oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; + updateCommittedVoter(highWatermarkUpdateMetadata); logHighWatermarkUpdate( oldHighWatermark, highWatermarkUpdateMetadata, @@ -821,7 +839,7 @@ public boolean updateLocalState( LogOffsetMetadata endOffsetMetadata, VoterSet lastVoterSet ) { - ReplicaState state = getOrCreateReplicaState(localVoterNode.voterKey()); + ReplicaState state = getOrCreateObserverReplicaState(localVoterNode.voterKey()); state.endOffset.ifPresent(currentEndOffset -> { if (currentEndOffset.offset() > endOffsetMetadata.offset()) { throw new IllegalStateException("Detected non-monotonic update of local " + @@ -858,7 +876,7 @@ public boolean updateReplicaState( ); } - ReplicaState state = getOrCreateReplicaState(replicaKey); + ReplicaState state = getOrCreateObserverReplicaState(replicaKey); state.endOffset.ifPresent(currentEndOffset -> { if (currentEndOffset.offset() > fetchOffsetMetadata.offset()) { @@ -867,7 +885,7 @@ public boolean updateReplicaState( } }); - Optional leaderEndOffsetOpt = getOrCreateReplicaState(localVoterNode.voterKey()).endOffset; + Optional leaderEndOffsetOpt = getOrCreateObserverReplicaState(localVoterNode.voterKey()).endOffset; state.updateFollowerState( currentTimeMs, @@ -886,6 +904,25 @@ public List nonLeaderVotersByDescendingFetchOffset() { .collect(Collectors.toList()); } + private void updateCommittedVoter(LogOffsetMetadata highWatermark) { + Map newCommittedVoterStates = new HashMap<>(); + + // Try to retrieve the voter set at the given high watermark offset; + // if unavailable, fall back to the static voter set. + // Note: highWatermark is the offset will be written so it is exclusive. + partitionState.voterSetAtOffset(highWatermark.offset() - 1).ifPresentOrElse(voterSet -> { + for (VoterSet.VoterNode voterNode : voterSet.voterNodes()) { + newCommittedVoterStates.put(voterNode.voterKey().id(), getOrCreateVoterReplicaState(voterNode)); + } + }, () -> { + for (VoterSet.VoterNode voterNode : partitionState.staticVoterSet().voterNodes()) { + newCommittedVoterStates.put(voterNode.voterKey().id(), getOrCreateVoterReplicaState(voterNode)); + } + }); + log.debug("Updating committed voter at high watermark={} and committedVoter={}", highWatermark, newCommittedVoterStates); + committedVoterStates = newCommittedVoterStates; + } + private Stream followersByDescendingFetchOffset() { return voterStates .values() @@ -910,7 +947,7 @@ public long epochStartOffset() { return epochStartOffset; } - private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { + private ReplicaState getOrCreateObserverReplicaState(ReplicaKey replicaKey) { ReplicaState state = voterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty())); @@ -920,6 +957,10 @@ private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { return state; } + public ReplicaState getOrCreateVoterReplicaState(VoterSet.VoterNode voterNode) { + return getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners())); + } + public Optional getReplicaState(ReplicaKey replicaKey) { ReplicaState state = voterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { @@ -1128,12 +1169,13 @@ public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolea @Override public String toString() { return String.format( - "Leader(localVoterNode=%s, epoch=%d, epochStartOffset=%d, highWatermark=%s, voterStates=%s)", + "Leader(localVoterNode=%s, epoch=%d, epochStartOffset=%d, highWatermark=%s, voterStates=%s, committedVoterState=%s)", localVoterNode, epoch, epochStartOffset, highWatermark, - voterStates + voterStates, + committedVoterStates ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index 4ae24c84eaf99..065e84e11620f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -719,7 +719,8 @@ public LeaderState transitionToLeader(long epochStartOffset, BatchAccumul accumulator, fetchTimeoutMs, logContext, - kafkaRaftMetrics + kafkaRaftMetrics, + partitionState ); durableTransitionTo(state); diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index d3a4f7609687f..523926eb4dfb9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -490,6 +490,7 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse( int leaderEpoch, long highWatermark, Collection voters, + Collection committedVoters, Collection observers, long currentTimeMs ) { @@ -507,6 +508,7 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse( .setLeaderEpoch(leaderEpoch) .setHighWatermark(highWatermark) .setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs)) + .setCommittedVoters(toReplicaStates(apiVersion, leaderId, committedVoters, currentTimeMs)) .setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs)))))); if (apiVersion >= 2) { DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(voters.size()); diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index d9cd47a3a2790..2259ff8c7e0dd 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -202,6 +202,10 @@ public Optional voterSetAtOffset(long offset) { } } + public VoterSet staticVoterSet() { + return staticVoterSet; + } + /** * Returns the finalized kraft version at a given offset. * diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index ecc83aa0ad2c5..02020d17a2950 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -3313,7 +3313,7 @@ public void testDescribeQuorumWithOnlyStaticVoters(boolean withKip853Rpc) throws .setLogEndOffset(-1L) .setLastFetchTimestamp(-1) .setLastCaughtUpTimestamp(-1)); - context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, expectedVoterStates, List.of()); } @ParameterizedTest @@ -3370,7 +3370,7 @@ public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean withB .setLogEndOffset(-1L) .setLastFetchTimestamp(-1) .setLastCaughtUpTimestamp(-1)); - context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, expectedVoterStates, List.of()); context.time.sleep(100); long fetchOffset = withBootstrapSnapshot ? 3L : 1L; @@ -3398,7 +3398,7 @@ public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean withB .setLogEndOffset(fetchOffset) .setLastFetchTimestamp(followerFetchTime1) .setLastCaughtUpTimestamp(followerFetchTime1); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); // After follower2 catches up to leader context.time.sleep(100); @@ -3419,13 +3419,13 @@ public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean withB .setLogEndOffset(nextFetchOffset) .setLastFetchTimestamp(followerFetchTime2) .setLastCaughtUpTimestamp(followerFetchTime2); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); // Describe quorum returns error if leader loses leadership context.time.sleep(context.checkQuorumTimeoutMs); context.deliverRequest(context.describeQuorumRequest()); context.pollUntilResponse(); - context.assertSentDescribeQuorumResponse(Errors.NOT_LEADER_OR_FOLLOWER, 0, 0, 0, List.of(), List.of()); + context.assertSentDescribeQuorumResponse(Errors.NOT_LEADER_OR_FOLLOWER, 0, 0, 0, List.of(), List.of(), List.of()); } @ParameterizedTest @@ -3498,7 +3498,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo .setLogEndOffset(0L) .setLastFetchTimestamp(observerFetchTime) .setLastCaughtUpTimestamp(-1L)); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, expectedObserverStates); // Update observer fetch state context.time.sleep(100); @@ -3518,7 +3518,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo .setLogEndOffset(fetchOffset) .setLastFetchTimestamp(observerFetchTime) .setLastCaughtUpTimestamp(observerFetchTime); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, expectedObserverStates); // Observer falls behind context.time.sleep(100); @@ -3534,7 +3534,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo .setLogEndOffset(fetchOffset + records.size()) .setLastFetchTimestamp(context.time.milliseconds()) .setLastCaughtUpTimestamp(context.time.milliseconds()); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, expectedObserverStates); // Observer is removed due to inactivity long timeToSleep = LeaderState.OBSERVER_SESSION_TIMEOUT_MS; @@ -3556,7 +3556,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo .setLastCaughtUpTimestamp(context.time.milliseconds()); expectedVoterStates.get(1) .setLastFetchTimestamp(followerFetchTime); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); // No-op for negative node id context.deliverRequest(context.fetchRequest(epoch, ReplicaKey.of(-1, ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0)); @@ -3568,7 +3568,7 @@ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean withBo expectedVoterStates.get(0) .setLastFetchTimestamp(context.time.milliseconds()) .setLastCaughtUpTimestamp(context.time.milliseconds()); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); } @ParameterizedTest @@ -3625,7 +3625,7 @@ public void testDescribeQuorumNonMonotonicFollowerFetch(boolean withKip853Rpc, b .setLogEndOffset(fetchOffset) .setLastFetchTimestamp(followerFetchTime) .setLastCaughtUpTimestamp(followerFetchTime)); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); // Follower crashes and disk is lost. It fetches an earlier offset to rebuild state. // The leader will report an error in the logs, but will not let the high watermark rewind @@ -3644,7 +3644,7 @@ public void testDescribeQuorumNonMonotonicFollowerFetch(boolean withKip853Rpc, b expectedVoterStates.get(1) .setLogEndOffset(fetchOffset - batch.size()) .setLastFetchTimestamp(followerFetchTime); - context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedVoterStates, List.of()); } @ParameterizedTest @@ -3688,7 +3688,7 @@ public void testStaticVotersIgnoredWithBootstrapSnapshot(boolean withKip853Rpc) .setLogEndOffset(-1L) .setLastFetchTimestamp(-1) .setLastCaughtUpTimestamp(-1)); - context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, List.of()); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, expectedVoterStates, List.of()); } diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index d66fb31399c4a..523e0e72170e4 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.raft.errors.NotLeaderException; import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine; import org.apache.kafka.raft.internals.KRaftVersionUpgrade; import org.apache.kafka.raft.internals.KafkaRaftMetrics; import org.apache.kafka.server.common.KRaftVersion; @@ -46,6 +47,8 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; public class LeaderStateTest { private final VoterSet.VoterNode localVoterNode = VoterSetTest.voterNode(ReplicaKey.of(0, Uuid.randomUuid())); @@ -55,6 +58,7 @@ public class LeaderStateTest { private final int fetchTimeoutMs = 2000; private final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR); private final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2; + private final KRaftControlRecordStateMachine partitionState = mock(KRaftControlRecordStateMachine.class); private LeaderState newLeaderState( VoterSet voters, @@ -65,7 +69,7 @@ private LeaderState newLeaderState( voters, epochStartOffset, kraftVersion, - Mockito.mock(BatchAccumulator.class) + mock(BatchAccumulator.class) ); } @@ -75,6 +79,12 @@ private LeaderState newLeaderState( KRaftVersion kraftVersion, BatchAccumulator accumulator ) { + Mockito + .when(partitionState.voterSetAtOffset(anyLong())) + .thenReturn(Optional.of(voters)); + Mockito + .when(partitionState.staticVoterSet()) + .thenReturn(voters); return new LeaderState<>( time, localVoterNode, @@ -87,7 +97,8 @@ private LeaderState newLeaderState( accumulator, fetchTimeoutMs, logContext, - new KafkaRaftMetrics(new Metrics(), "raft") + new KafkaRaftMetrics(new Metrics(), "raft"), + partitionState ); } @@ -138,7 +149,8 @@ public void testRequireNonNullAccumulator() { null, fetchTimeoutMs, logContext, - new KafkaRaftMetrics(new Metrics(), "raft") + new KafkaRaftMetrics(new Metrics(), "raft"), + partitionState ) ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 76d942d2bd98b..42ddb81e5a437 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; public class QuorumStateTest { private final int localId = 0; @@ -74,6 +75,12 @@ private QuorumState buildQuorumState( Mockito .when(mockPartitionState.lastVoterSetOffset()) .thenReturn(kraftVersion.isReconfigSupported() ? OptionalLong.of(0) : OptionalLong.empty()); + Mockito + .when(mockPartitionState.voterSetAtOffset(anyLong())) + .thenReturn(Optional.of(voterSet)); + Mockito + .when(mockPartitionState.lastVoterSet()) + .thenReturn(voterSet); Mockito .when(mockPartitionState.lastKraftVersion()) .thenReturn(kraftVersion); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 42fdde0fa0e7e..161176c0e36c7 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -758,10 +758,12 @@ void assertSentDescribeQuorumResponse( int leaderId, int leaderEpoch, long highWatermark, - List voterStates, + List currentVoterStates, + List committedVoterStates, List observerStates ) { - assertSentDescribeQuorumResponse(Errors.NONE, leaderId, leaderEpoch, highWatermark, voterStates, observerStates); + assertSentDescribeQuorumResponse(Errors.NONE, leaderId, leaderEpoch, highWatermark, + currentVoterStates, committedVoterStates, observerStates); } void assertSentDescribeQuorumResponse( @@ -769,7 +771,8 @@ void assertSentDescribeQuorumResponse( int leaderId, int leaderEpoch, long highWatermark, - List voterStates, + List currentVoterStates, + List committedVoterStates, List observerStates ) { DescribeQuorumResponseData response = collectDescribeQuorumResponse(); @@ -779,7 +782,8 @@ void assertSentDescribeQuorumResponse( .setLeaderId(leaderId) .setLeaderEpoch(leaderEpoch) .setHighWatermark(highWatermark) - .setCurrentVoters(voterStates) + .setCurrentVoters(currentVoterStates) + .setCommittedVoters(committedVoterStates) .setObservers(observerStates); if (!error.equals(Errors.NONE)) { @@ -788,8 +792,8 @@ void assertSentDescribeQuorumResponse( DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(0); if (describeQuorumRpcVersion() >= 2) { - nodes = new DescribeQuorumResponseData.NodeCollection(voterStates.size()); - for (ReplicaState voterState : voterStates) { + nodes = new DescribeQuorumResponseData.NodeCollection(currentVoterStates.size()); + for (ReplicaState voterState : currentVoterStates) { nodes.add(new DescribeQuorumResponseData.Node() .setNodeId(voterState.replicaId()) .setListeners(startingVoters.listeners(voterState.replicaId()).toDescribeQuorumResponseListeners())); @@ -802,7 +806,7 @@ void assertSentDescribeQuorumResponse( nodes ); - List sortedVoters = response + List sortedCuttentVoters = response .topics() .get(0) .partitions() @@ -811,7 +815,17 @@ void assertSentDescribeQuorumResponse( .stream() .sorted(Comparator.comparingInt(ReplicaState::replicaId)) .collect(Collectors.toList()); - response.topics().get(0).partitions().get(0).setCurrentVoters(sortedVoters); + List sortedCommittedVoters = response + .topics() + .get(0) + .partitions() + .get(0) + .committedVoters() + .stream() + .sorted(Comparator.comparingInt(ReplicaState::replicaId)) + .collect(Collectors.toList()); + response.topics().get(0).partitions().get(0).setCurrentVoters(sortedCuttentVoters); + response.topics().get(0).partitions().get(0).setCommittedVoters(sortedCommittedVoters); response.nodes().sort(Comparator.comparingInt(DescribeQuorumResponseData.Node::nodeId)); assertEquals(expectedResponse, response); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java index 5aacab01df687..84281296f5660 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -625,6 +625,7 @@ public void testSingletonDescribeQuorumResponseForAllVersion(final short version highWatermark, List.of(replicaState), List.of(replicaState), + List.of(replicaState), 0 ); JsonNode json = DescribeQuorumResponseDataJsonConverter.write(describeQuorumResponseData, version); diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index c6a53742f7c0d..40e57f5e8d04f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -38,6 +38,7 @@ import org.mockito.Mockito; import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Random; @@ -45,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.anyLong; public class KafkaRaftMetricsTest { @@ -77,6 +79,12 @@ private QuorumState buildQuorumState(VoterSet voterSet, KRaftVersion kraftVersio Mockito .when(mockPartitionState.lastVoterSetOffset()) .thenReturn(kraftVersion.isReconfigSupported() ? OptionalLong.of(0) : OptionalLong.empty()); + Mockito + .when(mockPartitionState.voterSetAtOffset(anyLong())) + .thenReturn(Optional.of(voterSet)); + Mockito + .when(mockPartitionState.staticVoterSet()) + .thenReturn(voterSet); Mockito .when(mockPartitionState.lastKraftVersion()) .thenReturn(kraftVersion); diff --git a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java index ad30708d7c5db..8f67b30e838c7 100644 --- a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java @@ -346,4 +346,4 @@ public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception { } } } -} \ No newline at end of file +} diff --git a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java index e42dcbb0e09c0..717c2c2639bad 100644 --- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java @@ -273,6 +273,7 @@ else if (leader.lastCaughtUpTimestamp().isPresent() && maxLagFollower.lastCaught "\nMaxFollowerLag: " + maxFollowerLag + "\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs + "\nCurrentVoters: " + printVoterState(quorumInfo) + + "\nCommittedVoters: " + printCommittedVoterState(quorumInfo) + "\nCurrentObservers: " + printObserverState(quorumInfo) ); } @@ -283,7 +284,14 @@ private static String printVoterState(QuorumInfo quorumInfo) { return printReplicaState(quorumInfo, quorumInfo.voters()); } + // Constructs the CommittedVoters string + // CommittedVoters: [{"id": 0, "directoryId": "UUID1", "endpoints": ["C://controller-0:1234"]}] + private static String printCommittedVoterState(QuorumInfo quorumInfo) { + return printReplicaState(quorumInfo, quorumInfo.committedVoters()); + } + // Constructs the CurrentObservers string + // CurrentObservers: [{"id": 0, "directoryId": "UUID1", "endpoints": ["C://controller-0:1234"]}] private static String printObserverState(QuorumInfo quorumInfo) { return printReplicaState(quorumInfo, quorumInfo.observers()); } diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java index 621943f695ab9..8b8e92a46f5cb 100644 --- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java @@ -119,13 +119,18 @@ private void testDescribeQuorumStatusSuccessful(ClusterInstance cluster, boolean "\"endpoints\":\\s+\\[\"\\S+://\\[?\\S+]?:\\d+\",?.*]"), describeOutput ); + assertTrue( + outputs[7].matches("CommittedVoters:\\s+\\[\\{\"id\":\\s+\\d+,\\s+" + + "\"endpoints\":\\s+\\[\"\\S+://\\[?\\S+]?:\\d+\",?.*]"), + describeOutput + ); // There are no observers if we have fewer brokers than controllers if (cluster.type() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers()) { - assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[]"), describeOutput); + assertTrue(outputs[8].matches("CurrentObservers:\\s+\\[]"), describeOutput); } else { assertTrue( - outputs[7].matches("CurrentObservers:\\s+\\[\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+}" + + outputs[8].matches("CurrentObservers:\\s+\\[\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+}" + "(,\\s+\\{\"id\":\\s+\\d+,\\s+\"directoryId\":\\s+\\S+})*]"), describeOutput );