Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set #16079

Merged
merged 10 commits into from
Jun 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ private void updateLeaderEndOffsetAndTimestamp(
) {
final LogOffsetMetadata endOffsetMetadata = log.endOffset();

if (state.updateLocalState(endOffsetMetadata)) {
if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet().voterIds())) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}

Expand Down
45 changes: 36 additions & 9 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -243,8 +244,9 @@ private boolean maybeUpdateHighWatermark() {
);
return true;
} else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) {
log.error("The latest computed high watermark {} is smaller than the current " +
"value {}, which suggests that one of the voters has lost committed data. " +
log.info("The latest computed high watermark {} is smaller than the current " +
"value {}, which should only happen when voter set membership changes. If the voter " +
"set has not changed this suggests that one of the voters has lost committed data. " +
"Full voter replication state: {}", highWatermarkUpdateOffset,
currentHighWatermarkMetadata.offset, voterStates.values());
return false;
Expand Down Expand Up @@ -296,10 +298,12 @@ private void logHighWatermarkUpdate(
* Update the local replica state.
*
* @param endOffsetMetadata updated log end offset of local replica
* @param lastVoterSet the up-to-date voter set
* @return true if the high watermark is updated as a result of this call
*/
public boolean updateLocalState(
LogOffsetMetadata endOffsetMetadata
LogOffsetMetadata endOffsetMetadata,
Set<Integer> lastVoterSet
) {
ReplicaState state = getOrCreateReplicaState(localId);
state.endOffset.ifPresent(currentEndOffset -> {
Expand All @@ -308,7 +312,8 @@ public boolean updateLocalState(
"end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
}
});
state.updateLeaderState(endOffsetMetadata);
state.updateLeaderEndOffset(endOffsetMetadata);
updateVoterAndObserverStates(lastVoterSet);
return maybeUpdateHighWatermark();
}

Expand Down Expand Up @@ -341,9 +346,7 @@ public boolean updateReplicaState(
state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset);
}
});

Optional<LogOffsetMetadata> leaderEndOffsetOpt =
voterStates.get(localId).endOffset;
Optional<LogOffsetMetadata> leaderEndOffsetOpt = getOrCreateReplicaState(localId).endOffset;

state.updateFollowerState(
currentTimeMs,
Expand Down Expand Up @@ -435,16 +438,40 @@ private DescribeQuorumResponseData.ReplicaState describeReplicaState(

}

/**
* Clear observer states that have not been active for a while and are not the leader.
*/
private void clearInactiveObservers(final long currentTimeMs) {
observerStates.entrySet().removeIf(integerReplicaStateEntry ->
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS &&
integerReplicaStateEntry.getKey() != localId
);
}

private boolean isVoter(int remoteNodeId) {
return voterStates.containsKey(remoteNodeId);
}

private void updateVoterAndObserverStates(Set<Integer> lastVoterSet) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see we have any unit tests for this. Could we add some to verify the observerState/voterState are changed as expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can test this indirectly with making sure observers cannot influence HW (e.g. if we have two voters, two observers, all observers being up-to-date with leader will not cause HW to increase) until they are added back to the voterSet. This ensures nodes are correctly removed/added from the sets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, actually seems we can test this pretty directly with describeQuorum. I'll add another test.

// Move any replica that is not in the last voter set from voterStates to observerStates
for (Iterator<Map.Entry<Integer, ReplicaState>> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Integer, ReplicaState> replica = iter.next();
if (!lastVoterSet.contains(replica.getKey())) {
observerStates.put(replica.getKey(), replica.getValue());
iter.remove();
}
}

// Add replicas that are in the last voter set and not in voterStates to voterStates (from observerStates
// if they exist)
for (int voterId : lastVoterSet) {
if (!voterStates.containsKey(voterId)) {
Optional<ReplicaState> existingObserverState = Optional.ofNullable(observerStates.remove(voterId));
voterStates.put(voterId, existingObserverState.orElse(new ReplicaState(voterId, false)));
}
}
}

private static class ReplicaState implements Comparable<ReplicaState> {
final int nodeId;
Optional<LogOffsetMetadata> endOffset;
Expand All @@ -462,7 +489,7 @@ public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
this.hasAcknowledgedLeader = hasAcknowledgedLeader;
}

void updateLeaderState(
void updateLeaderEndOffset(
LogOffsetMetadata endOffsetMetadata
) {
// For the leader, we only update the end offset. The remaining fields
Expand Down
Loading