Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
420d331
Add new field `CommittedVoters` in DescribeQuorumResponse.json
peterxcli Oct 14, 2024
218bf77
feat(client/admin): Add committed voters to QuorumInfo
peterxcli Oct 14, 2024
c0cc6a3
refactor(raft): Rename voterStates to currentVoterStates in LeaderState
peterxcli Oct 14, 2024
aa60568
feat(raft): Add lastVoterSetWithOffset method to KRaftControlRecordSt…
peterxcli Oct 14, 2024
33ed01b
feat(raft): implement committed voters in LeaderState and DescribeQuo…
peterxcli Oct 14, 2024
de929d3
Merge branch 'trunk' into KAFKA-17243
TaiJuWu Aug 12, 2025
e975174
fix build
TaiJuWu Aug 12, 2025
488d27d
fix test
TaiJuWu Aug 12, 2025
5111c58
Merge branch 'trunk' into KAFKA-17243
TaiJuWu Aug 12, 2025
cf8f232
fix some issues
TaiJuWu Aug 13, 2025
55dabac
fix /MetadataQuorumCommandTest
TaiJuWu Aug 13, 2025
3f9fb31
bump DescribeQuorumRequest version
TaiJuWu Aug 14, 2025
91da1a1
fix majority
TaiJuWu Aug 14, 2025
406d562
Revert "fix majority"
TaiJuWu Aug 15, 2025
4096c90
finish
TaiJuWu Oct 14, 2025
238f528
Merge branch 'KAFKA-17243' into committedVoter
TaiJuWu Oct 14, 2025
0359cc7
resolve conflict again
TaiJuWu Oct 14, 2025
01a2b18
fix fail test
TaiJuWu Oct 14, 2025
cd0a170
edge case logic
TaiJuWu Oct 15, 2025
3853c2c
fix fail test
TaiJuWu Oct 15, 2025
5915adc
replace old committed voter with new one
TaiJuWu Oct 15, 2025
f0ba7e4
move offset check to leaderState from stateMachine
TaiJuWu Oct 15, 2025
17b0cdf
fix NPE
TaiJuWu Oct 15, 2025
771ea5f
fix highwatermark issue
TaiJuWu Oct 15, 2025
d5a6c9a
fix initial kraft bug
TaiJuWu Oct 15, 2025
b04a03e
fix test and refactor
TaiJuWu Oct 15, 2025
2373fd5
Merge branch 'trunk' into committedVoter
TaiJuWu Oct 15, 2025
94a3bbd
fix fail test
TaiJuWu Oct 16, 2025
7660e2a
add comment
TaiJuWu Oct 16, 2025
874b231
remove read from log and using static voter directly
TaiJuWu Oct 16, 2025
e1d8db3
fix java 25 build
TaiJuWu Oct 16, 2025
11a3812
fix all fail test
TaiJuWu Oct 16, 2025
ae588d0
improve debug info
TaiJuWu Oct 16, 2025
10727b7
remove unnecessary update
TaiJuWu Oct 17, 2025
f2e4c02
remove unnecessary public method
TaiJuWu Oct 17, 2025
399c550
Merge branch 'trunk' into committedVoter
TaiJuWu Nov 10, 2025
de0dad9
rename
TaiJuWu Nov 10, 2025
241c047
if hightwart is missing return empty set
TaiJuWu Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4684,6 +4684,10 @@ private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.Partition
.map(this::translateReplicaState)
.collect(Collectors.toList());

List<QuorumInfo.ReplicaState> committedVoters = partition.committedVoters().stream()
.map(this::translateReplicaState)
.collect(Collectors.toList());

List<QuorumInfo.ReplicaState> observers = partition.observers().stream()
.map(this::translateReplicaState)
.collect(Collectors.toList());
Expand All @@ -4701,6 +4705,7 @@ private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.Partition
partition.leaderEpoch(),
partition.highWatermark(),
voters,
committedVoters,
observers,
nodes
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class QuorumInfo {
private final long leaderEpoch;
private final long highWatermark;
private final List<ReplicaState> voters;
private final List<ReplicaState> committedVoters;
private final List<ReplicaState> observers;
private final Map<Integer, Node> nodes;

Expand All @@ -39,13 +40,15 @@ public class QuorumInfo {
long leaderEpoch,
long highWatermark,
List<ReplicaState> voters,
List<ReplicaState> committedVoters,
List<ReplicaState> observers,
Map<Integer, Node> nodes
) {
this.leaderId = leaderId;
this.leaderEpoch = leaderEpoch;
this.highWatermark = highWatermark;
this.voters = voters;
this.committedVoters = committedVoters;
this.observers = observers;
this.nodes = nodes;
}
Expand All @@ -66,6 +69,10 @@ public List<ReplicaState> voters() {
return voters;
}

public List<ReplicaState> committedVoters() {
return committedVoters;
}

public List<ReplicaState> observers() {
return observers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
"name": "DescribeQuorumRequest",
// Version 1 adds additional fields in the response. The request is unchanged (KIP-836).
// Version 2 adds additional fields in the response. The request is unchanged (KIP-853).
"validVersions": "0-2",
// Version 3 adds additional fields in the response. The request is unchanged (KIP-853).
"validVersions": "0-3",
"flexibleVersions": "0+",
"latestVersionUnstable": false,
"fields": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
"type": "response",
"name": "DescribeQuorumResponse",
// Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
// Version 2 adds ErrorMessage, Nodes, ErrorMessage in PartitionData, ReplicaDirectoryId in ReplicaState (KIP-853).
"validVersions": "0-2",
// Version 2 adds ErrorMessage, Nodes, ErrorMessage in ParitionData, ReplicaDirectoryId in ReplicaState (KIP-853).
// Version 3 adds CommittedVoters in PartitionData
"validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
Expand All @@ -46,6 +47,8 @@
"about": "The high water mark."},
{ "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+",
"about": "The current voters of the partition."},
{ "name": "CommittedVoters", "type": "[]ReplicaState", "versions": "3+", "ignorable": true,
"about": "The voters has been committed."},
{ "name": "Observers", "type": "[]ReplicaState", "versions": "0+",
"about": "The observers of the partition."}
]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,12 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
}

private static QuorumInfo defaultQuorumInfo(boolean emptyOptionals) {
return new QuorumInfo(1, 1, 1L,
return new QuorumInfo(1, 1L, 1L,
singletonList(new QuorumInfo.ReplicaState(1,
emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID,
100,
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000),
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))),
singletonList(new QuorumInfo.ReplicaState(1,
emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID,
100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1893,6 +1893,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
leaderState.epoch(),
leaderState.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L),
leaderState.voterStates().values(),
leaderState.committedVoterStates().values(),
leaderState.observerStates(currentTimeMs).values(),
currentTimeMs
);
Expand Down
72 changes: 57 additions & 15 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.AddVoterHandlerState;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.KRaftVersionUpgrade;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.RemoveVoterHandlerState;
Expand Down Expand Up @@ -72,9 +73,11 @@ public class LeaderState<T> implements EpochState {
// This field is non-empty if the voter set at epoch start came from a snapshot or log segment
private final OptionalLong offsetOfVotersAtEpochStart;
private final KRaftVersion kraftVersionAtEpochStart;
private final KRaftControlRecordStateMachine partitionState;

private Optional<LogOffsetMetadata> highWatermark = Optional.empty();
private Map<Integer, ReplicaState> voterStates = new HashMap<>();
private Map<Integer, ReplicaState> committedVoterStates = new HashMap<>();
private Optional<AddVoterHandlerState> addVoterHandlerState = Optional.empty();
private Optional<RemoveVoterHandlerState> removeVoterHandlerState = Optional.empty();

Expand Down Expand Up @@ -120,7 +123,8 @@ protected LeaderState(
BatchAccumulator<T> accumulator,
int fetchTimeoutMs,
LogContext logContext,
KafkaRaftMetrics kafkaRaftMetrics
KafkaRaftMetrics kafkaRaftMetrics,
KRaftControlRecordStateMachine partitionState
) {
if (localVoterNode.voterKey().directoryId().isEmpty()) {
throw new IllegalArgumentException(
Expand All @@ -139,14 +143,6 @@ protected LeaderState(
this.localVoterNode = localVoterNode;
this.epoch = epoch;
this.epochStartOffset = epochStartOffset;

for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) {
boolean hasAcknowledgedLeader = voterNode.isVoter(localVoterNode.voterKey());
this.voterStates.put(
voterNode.voterKey().id(),
new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners())
);
}
this.grantingVoters = Set.copyOf(grantingVoters);
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null");
Expand All @@ -158,6 +154,18 @@ protected LeaderState(
this.voterSetAtEpochStart = voterSetAtEpochStart;
this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart;
this.kraftVersionAtEpochStart = kraftVersionAtEpochStart;
this.partitionState = partitionState;

// When offset is empty or -1, it means the cluster is starting up.
boolean isInitializing = offsetOfVotersAtEpochStart.isEmpty() || offsetOfVotersAtEpochStart.getAsLong() == -1;
for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) {
boolean hasAcknowledgedLeader = voterNode.isVoter(localVoterNode.voterKey());
ReplicaState state = new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners());
this.voterStates.put(voterNode.voterKey().id(), state);
if (isInitializing) {
this.committedVoterStates.put(voterNode.voterKey().id(), state);
}
}

kafkaRaftMetrics.addLeaderMetrics();
this.kafkaRaftMetrics = kafkaRaftMetrics;
Expand Down Expand Up @@ -704,6 +712,14 @@ Map<Integer, ReplicaState> voterStates() {
return voterStates;
}

Map<Integer, ReplicaState> committedVoterStates() {
if (highWatermark.isEmpty()) {
return Map.of();
}

return committedVoterStates;
}

Map<ReplicaKey, ReplicaState> observerStates(final long currentTimeMs) {
clearInactiveObservers(currentTimeMs);
return observerStates;
Expand Down Expand Up @@ -752,6 +768,7 @@ private boolean maybeUpdateHighWatermark() {
!highWatermarkUpdateMetadata.metadata().equals(currentHighWatermarkMetadata.metadata()))) {
Optional<LogOffsetMetadata> oldHighWatermark = highWatermark;
highWatermark = highWatermarkUpdateOpt;
updateCommittedVoter(currentHighWatermarkMetadata);
logHighWatermarkUpdate(
oldHighWatermark,
highWatermarkUpdateMetadata,
Expand All @@ -772,6 +789,7 @@ private boolean maybeUpdateHighWatermark() {
} else {
Optional<LogOffsetMetadata> oldHighWatermark = highWatermark;
highWatermark = highWatermarkUpdateOpt;
updateCommittedVoter(highWatermarkUpdateMetadata);
logHighWatermarkUpdate(
oldHighWatermark,
highWatermarkUpdateMetadata,
Expand Down Expand Up @@ -821,7 +839,7 @@ public boolean updateLocalState(
LogOffsetMetadata endOffsetMetadata,
VoterSet lastVoterSet
) {
ReplicaState state = getOrCreateReplicaState(localVoterNode.voterKey());
ReplicaState state = getOrCreateObserverReplicaState(localVoterNode.voterKey());
state.endOffset.ifPresent(currentEndOffset -> {
if (currentEndOffset.offset() > endOffsetMetadata.offset()) {
throw new IllegalStateException("Detected non-monotonic update of local " +
Expand Down Expand Up @@ -858,7 +876,7 @@ public boolean updateReplicaState(
);
}

ReplicaState state = getOrCreateReplicaState(replicaKey);
ReplicaState state = getOrCreateObserverReplicaState(replicaKey);

state.endOffset.ifPresent(currentEndOffset -> {
if (currentEndOffset.offset() > fetchOffsetMetadata.offset()) {
Expand All @@ -867,7 +885,7 @@ public boolean updateReplicaState(
}
});

Optional<LogOffsetMetadata> leaderEndOffsetOpt = getOrCreateReplicaState(localVoterNode.voterKey()).endOffset;
Optional<LogOffsetMetadata> leaderEndOffsetOpt = getOrCreateObserverReplicaState(localVoterNode.voterKey()).endOffset;

state.updateFollowerState(
currentTimeMs,
Expand All @@ -886,6 +904,25 @@ public List<ReplicaKey> nonLeaderVotersByDescendingFetchOffset() {
.collect(Collectors.toList());
}

private void updateCommittedVoter(LogOffsetMetadata highWatermark) {
Map<Integer, ReplicaState> newCommittedVoterStates = new HashMap<>();

// Try to retrieve the voter set at the given high watermark offset;
// if unavailable, fall back to the static voter set.
// Note: highWatermark is the offset will be written so it is exclusive.
partitionState.voterSetAtOffset(highWatermark.offset() - 1).ifPresentOrElse(voterSet -> {
for (VoterSet.VoterNode voterNode : voterSet.voterNodes()) {
newCommittedVoterStates.put(voterNode.voterKey().id(), getOrCreateVoterReplicaState(voterNode));
}
}, () -> {
for (VoterSet.VoterNode voterNode : partitionState.staticVoterSet().voterNodes()) {
newCommittedVoterStates.put(voterNode.voterKey().id(), getOrCreateVoterReplicaState(voterNode));
}
});
log.debug("Updating committed voter at high watermark={} and committedVoter={}", highWatermark, newCommittedVoterStates);
committedVoterStates = newCommittedVoterStates;
}

private Stream<ReplicaState> followersByDescendingFetchOffset() {
return voterStates
.values()
Expand All @@ -910,7 +947,7 @@ public long epochStartOffset() {
return epochStartOffset;
}

private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
private ReplicaState getOrCreateObserverReplicaState(ReplicaKey replicaKey) {
ReplicaState state = voterStates.get(replicaKey.id());
if (state == null || !state.matchesKey(replicaKey)) {
observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty()));
Expand All @@ -920,6 +957,10 @@ private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
return state;
}

public ReplicaState getOrCreateVoterReplicaState(VoterSet.VoterNode voterNode) {
return getReplicaState(voterNode.voterKey()).orElse(new ReplicaState(voterNode.voterKey(), false, voterNode.listeners()));
}

public Optional<ReplicaState> getReplicaState(ReplicaKey replicaKey) {
ReplicaState state = voterStates.get(replicaKey.id());
if (state == null || !state.matchesKey(replicaKey)) {
Expand Down Expand Up @@ -1128,12 +1169,13 @@ public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolea
@Override
public String toString() {
return String.format(
"Leader(localVoterNode=%s, epoch=%d, epochStartOffset=%d, highWatermark=%s, voterStates=%s)",
"Leader(localVoterNode=%s, epoch=%d, epochStartOffset=%d, highWatermark=%s, voterStates=%s, committedVoterState=%s)",
localVoterNode,
epoch,
epochStartOffset,
highWatermark,
voterStates
voterStates,
committedVoterStates
);
}

Expand Down
3 changes: 2 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,8 @@ public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumul
accumulator,
fetchTimeoutMs,
logContext,
kafkaRaftMetrics
kafkaRaftMetrics,
partitionState
);

durableTransitionTo(state);
Expand Down
2 changes: 2 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse(
int leaderEpoch,
long highWatermark,
Collection<LeaderState.ReplicaState> voters,
Collection<LeaderState.ReplicaState> committedVoters,
Collection<LeaderState.ReplicaState> observers,
long currentTimeMs
) {
Expand All @@ -507,6 +508,7 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse(
.setLeaderEpoch(leaderEpoch)
.setHighWatermark(highWatermark)
.setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs))
.setCommittedVoters(toReplicaStates(apiVersion, leaderId, committedVoters, currentTimeMs))
.setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs))))));
if (apiVersion >= 2) {
DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(voters.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ public Optional<VoterSet> voterSetAtOffset(long offset) {
}
}

public VoterSet staticVoterSet() {
return staticVoterSet;
}

/**
* Returns the finalized kraft version at a given offset.
*
Expand Down
Loading
Loading