Skip to content

Commit

Permalink
KAFKA-15489: address reviewer's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
showuon committed Nov 28, 2023
1 parent 09e892b commit 0689a69
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 39 deletions.
9 changes: 5 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Expand Up @@ -1341,8 +1341,8 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(

UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize));

Optional<LeaderState<T>> state = quorum.maybeLeaderState();
state.ifPresent(s -> s.maybeResetMajorityFollowerFetchTimer(data.replicaId(), currentTimeMs));
LeaderState<T> state = quorum.leaderStateOrThrow();
state.updateCheckQuorumForFollowingVoter(data.replicaId(), currentTimeMs);

return FetchSnapshotResponse.singleton(
log.topicPartition(),
Expand Down Expand Up @@ -1995,7 +1995,8 @@ private long pollLeader(long currentTimeMs) {
LeaderState<T> state = quorum.leaderStateOrThrow();
maybeFireLeaderChange(state);

if (shutdown.get() != null || state.isResignRequested() || state.hasMajorityFollowerFetchExpired(currentTimeMs)) {
long timeUntilCheckQuorumExpires = state.timeUntilCheckQuorumExpires(currentTimeMs);
if (shutdown.get() != null || state.isResignRequested() || timeUntilCheckQuorumExpires == 0) {
transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
return 0L;
}
Expand All @@ -2011,7 +2012,7 @@ private long pollLeader(long currentTimeMs) {
this::buildBeginQuorumEpochRequest
);

return Math.min(timeUntilFlush, timeUntilSend);
return Math.min(timeUntilFlush, Math.min(timeUntilSend, timeUntilCheckQuorumExpires));
}

private long maybeSendVoteRequests(
Expand Down
60 changes: 39 additions & 21 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Expand Up @@ -46,6 +46,7 @@
*/
public class LeaderState<T> implements EpochState {
static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
static final double CHECK_QUORUM_TIMEOUT_FACTOR = 1.5;

private final int localId;
private final int epoch;
Expand All @@ -57,9 +58,10 @@ public class LeaderState<T> implements EpochState {
private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
private final Logger log;
private final BatchAccumulator<T> accumulator;
// The set includes all of the followers voters that FETCH or FETCH_SNAPSHOT during the current checkQuorumTimer interval.
private final Set<Integer> fetchedVoters = new HashSet<>();
private final Timer fetchTimer;
private final int fetchTimeoutMs;
private final Timer checkQuorumTimer;
private final int checkQuorumTimeoutMs;

// This is volatile because resignation can be requested from an external thread.
private volatile boolean resignRequested = false;
Expand All @@ -86,36 +88,52 @@ protected LeaderState(
this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters));
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null");
// use the 1.5x fetch timeout to tolerate some network transition time or other IO time.
this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5);
this.fetchTimer = time.timer(fetchTimeoutMs);
}

// Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of
// the voters within fetch timeout.
public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {
fetchTimer.update(currentTimeMs);
boolean isExpired = fetchTimer.isExpired();
if (isExpired) {
log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.",
fetchTimeoutMs, fetchedVoters);
// use the 1.5x of fetch timeout to tolerate some network transition time or other IO time.
this.checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);
this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs);
}

/**
* Get the remaining time in milliseconds until the checkQuorumTimer expires.
* This will happen if we didn't receive a valid fetch/fetchSnapshot request from the majority of the voters within checkQuorumTimeoutMs.
*
* @param currentTimeMs the current timestamp in millisecond
* @return the remainingMs before the checkQuorumTimer expired
*/
public long timeUntilCheckQuorumExpires(long currentTimeMs) {
checkQuorumTimer.update(currentTimeMs);
long remainingMs = checkQuorumTimer.remainingMs();
if (remainingMs == 0) {
log.info(
"Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.",
checkQuorumTimeoutMs,
fetchedVoters);
}
return isExpired;
return remainingMs;
}

// Reset the fetch timer if we've received fetch/fetchSnapshot request from the majority of the voter
public void maybeResetMajorityFollowerFetchTimer(int id, long currentTimeMs) {
/**
* Reset the checkQuorumTimer if we've received fetch/fetchSnapshot request from the majority of the voter
*
* @param id the node id
* @param currentTimeMs the current timestamp in millisecond
*/
public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) {
updateFetchedVoters(id);
// The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1
int majority = voterStates.size() / 2;
if (fetchedVoters.size() >= majority) {
fetchedVoters.clear();
fetchTimer.update(currentTimeMs);
fetchTimer.reset(fetchTimeoutMs);
checkQuorumTimer.update(currentTimeMs);
checkQuorumTimer.reset(checkQuorumTimeoutMs);
}
}

private void updateFetchedVoters(int id) {
if (id == localId) {
throw new IllegalArgumentException("Received a FETCH/FETCH_SNAPSHOT request from the leader itself.");
}

if (isVoter(id)) {
fetchedVoters.add(id);
}
Expand Down Expand Up @@ -327,7 +345,7 @@ public boolean updateReplicaState(
fetchOffsetMetadata,
leaderEndOffsetOpt
);
maybeResetMajorityFollowerFetchTimer(replicaId, currentTimeMs);
updateCheckQuorumForFollowingVoter(replicaId, currentTimeMs);

return isVoter(state.nodeId) && maybeUpdateHighWatermark();
}
Expand Down
Expand Up @@ -716,7 +716,7 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajo
.appendToLog(snapshotId.epoch(), Arrays.asList("a"))
.build();

int resignLeadershipTimeout = (int) (context.fetchTimeoutMs * 1.5);
int resignLeadershipTimeout = context.checkQuorumTimeoutMs;
context.becomeLeader();
int epoch = context.currentEpoch();

Expand Down
Expand Up @@ -494,7 +494,7 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVote
Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
int resignLeadershipTimeout = (int) (context.fetchTimeoutMs * 1.5);
int resignLeadershipTimeout = context.checkQuorumTimeoutMs;

context.becomeLeader();
int epoch = context.currentEpoch();
Expand Down
29 changes: 17 additions & 12 deletions raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
Expand Up @@ -36,6 +36,7 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -48,6 +49,7 @@ public class LeaderStateTest {
private final BatchAccumulator<?> accumulator = Mockito.mock(BatchAccumulator.class);
private final MockTime time = new MockTime();
private final int fetchTimeoutMs = 2000;
private final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);

private LeaderState<?> newLeaderState(
Set<Integer> voters,
Expand Down Expand Up @@ -453,35 +455,38 @@ public void testDescribeQuorumWithObservers() {
}

@Test
public void testMajorityFollowerFetchTimeoutExpiration() {
public void testCheckQuorum() {
int node1 = 1;
int node2 = 2;
int node3 = 3;
int node4 = 4;
int observer5 = 5;
LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2, node3, node4), 0L);
assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds()));
int resignLeadershipTimeout = (int) (fetchTimeoutMs * 1.5);
assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds()));
int resignLeadershipTimeout = checkQuorumTimeoutMs;

// fetch timeout not exceeded, should not expire the timer
// checkQuorum timeout not exceeded, should not expire the timer
time.sleep(resignLeadershipTimeout / 2);
assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds()));
assertTrue(state.timeUntilCheckQuorumExpires(time.milliseconds()) > 0);

// received fetch requests from 2 voter nodes, the timer should be reset
state.maybeResetMajorityFollowerFetchTimer(node1, time.milliseconds());
state.maybeResetMajorityFollowerFetchTimer(node2, time.milliseconds());
state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds());
state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds());
assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds()));

// Since the timer was reset, it won't expire this time.
time.sleep(resignLeadershipTimeout / 2);
assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds()));
long remainingMs = state.timeUntilCheckQuorumExpires(time.milliseconds());
assertTrue(remainingMs > 0);

// received fetch requests from 1 voter and 1 observer nodes, the timer should not be reset.
state.maybeResetMajorityFollowerFetchTimer(node3, time.milliseconds());
state.maybeResetMajorityFollowerFetchTimer(observer5, time.milliseconds());
state.updateCheckQuorumForFollowingVoter(node3, time.milliseconds());
state.updateCheckQuorumForFollowingVoter(observer5, time.milliseconds());
assertEquals(remainingMs, state.timeUntilCheckQuorumExpires(time.milliseconds()));

// This time, the fetch timer will be expired
// This time, the checkQuorum timer will be expired
time.sleep(resignLeadershipTimeout / 2);
assertTrue(state.hasMajorityFollowerFetchExpired(time.milliseconds()));
assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds()));
}

@Test
Expand Down
Expand Up @@ -78,6 +78,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR;
import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -91,6 +92,7 @@ public final class RaftClientTestContext {
final int electionBackoffMaxMs = Builder.ELECTION_BACKOFF_MAX_MS;
final int fetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS;
final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);
final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;

private int electionTimeoutMs;
Expand Down

0 comments on commit 0689a69

Please sign in to comment.