Skip to content

Commit

Permalink
KAFKA-16530: Fix high-watermark calculation to not assume the leader …
Browse files Browse the repository at this point in the history
…is in the voter set (#16079)

1. Changing log message from error to info - We may expect the HW calculation to give us a smaller result than the current HW in the case of quorum reconfiguration. We will continue to not allow the HW to actually decrease.
2. Logic for finding the updated LeaderEndOffset for updateReplicaState is changed as well. We do not assume the leader is in the voter set and check the observer states as well.
3. updateLocalState now accepts an additional "lastVoterSet" param which allows us to update the leader state with the last known voters. any nodes in this set but not in voterStates will be added to voterStates and removed from observerStates, any nodes not in this set but in voterStates will be removed from voterStates and added to observerStates

Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@apache.org>
  • Loading branch information
ahuang98 authored and showuon committed Jun 6, 2024
1 parent 04f7ed4 commit 25ca963
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ private void updateLeaderEndOffsetAndTimestamp(
) {
final LogOffsetMetadata endOffsetMetadata = log.endOffset();

if (state.updateLocalState(endOffsetMetadata)) {
if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet().voterIds())) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}

Expand Down
45 changes: 36 additions & 9 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -243,8 +244,9 @@ private boolean maybeUpdateHighWatermark() {
);
return true;
} else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) {
log.error("The latest computed high watermark {} is smaller than the current " +
"value {}, which suggests that one of the voters has lost committed data. " +
log.info("The latest computed high watermark {} is smaller than the current " +
"value {}, which should only happen when voter set membership changes. If the voter " +
"set has not changed this suggests that one of the voters has lost committed data. " +
"Full voter replication state: {}", highWatermarkUpdateOffset,
currentHighWatermarkMetadata.offset, voterStates.values());
return false;
Expand Down Expand Up @@ -296,10 +298,12 @@ private void logHighWatermarkUpdate(
* Update the local replica state.
*
* @param endOffsetMetadata updated log end offset of local replica
* @param lastVoterSet the up-to-date voter set
* @return true if the high watermark is updated as a result of this call
*/
public boolean updateLocalState(
LogOffsetMetadata endOffsetMetadata
LogOffsetMetadata endOffsetMetadata,
Set<Integer> lastVoterSet
) {
ReplicaState state = getOrCreateReplicaState(localId);
state.endOffset.ifPresent(currentEndOffset -> {
Expand All @@ -308,7 +312,8 @@ public boolean updateLocalState(
"end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
}
});
state.updateLeaderState(endOffsetMetadata);
state.updateLeaderEndOffset(endOffsetMetadata);
updateVoterAndObserverStates(lastVoterSet);
return maybeUpdateHighWatermark();
}

Expand Down Expand Up @@ -341,9 +346,7 @@ public boolean updateReplicaState(
state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset);
}
});

Optional<LogOffsetMetadata> leaderEndOffsetOpt =
voterStates.get(localId).endOffset;
Optional<LogOffsetMetadata> leaderEndOffsetOpt = getOrCreateReplicaState(localId).endOffset;

state.updateFollowerState(
currentTimeMs,
Expand Down Expand Up @@ -435,16 +438,40 @@ private DescribeQuorumResponseData.ReplicaState describeReplicaState(

}

/**
* Clear observer states that have not been active for a while and are not the leader.
*/
private void clearInactiveObservers(final long currentTimeMs) {
observerStates.entrySet().removeIf(integerReplicaStateEntry ->
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS &&
integerReplicaStateEntry.getKey() != localId
);
}

private boolean isVoter(int remoteNodeId) {
return voterStates.containsKey(remoteNodeId);
}

private void updateVoterAndObserverStates(Set<Integer> lastVoterSet) {
// Move any replica that is not in the last voter set from voterStates to observerStates
for (Iterator<Map.Entry<Integer, ReplicaState>> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Integer, ReplicaState> replica = iter.next();
if (!lastVoterSet.contains(replica.getKey())) {
observerStates.put(replica.getKey(), replica.getValue());
iter.remove();
}
}

// Add replicas that are in the last voter set and not in voterStates to voterStates (from observerStates
// if they exist)
for (int voterId : lastVoterSet) {
if (!voterStates.containsKey(voterId)) {
Optional<ReplicaState> existingObserverState = Optional.ofNullable(observerStates.remove(voterId));
voterStates.put(voterId, existingObserverState.orElse(new ReplicaState(voterId, false)));
}
}
}

private static class ReplicaState implements Comparable<ReplicaState> {
final int nodeId;
Optional<LogOffsetMetadata> endOffset;
Expand All @@ -462,7 +489,7 @@ public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
this.hasAcknowledgedLeader = hasAcknowledgedLeader;
}

void updateLeaderState(
void updateLeaderEndOffset(
LogOffsetMetadata endOffsetMetadata
) {
// For the leader, we only update the end offset. The remaining fields
Expand Down
Loading

0 comments on commit 25ca963

Please sign in to comment.