Skip to content

Commit

Permalink
KAFKA-15489: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
showuon committed Oct 2, 2023
1 parent eff7741 commit f3efeb8
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 28 deletions.
3 changes: 0 additions & 3 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Expand Up @@ -966,9 +966,6 @@ private CompletableFuture<FetchResponseData> handleFetchRequest(

int replicaId = FetchRequest.replicaId(request);

Optional<LeaderState<T>> state = quorum.maybeLeaderState();
state.ifPresent(s -> s.maybeResetMajorityFollowerFetchTimeout(replicaId, currentTimeMs));

FetchResponseData response = tryCompleteFetchRequest(replicaId, fetchPartition, currentTimeMs);
FetchResponseData.PartitionData partitionResponse =
response.responses().get(0).partitions().get(0);
Expand Down
17 changes: 9 additions & 8 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Expand Up @@ -60,22 +60,20 @@ public class LeaderState<T> implements EpochState {
private final Set<Integer> fetchedVoters = new HashSet<>();
private final Timer fetchTimer;
private final int fetchTimeoutMs;
// The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1
private final int majority;

// This is volatile because resignation can be requested from an external thread.
private volatile boolean resignRequested = false;

protected LeaderState(
Time time,
int localId,
int epoch,
long epochStartOffset,
Set<Integer> voters,
Set<Integer> grantingVoters,
BatchAccumulator<T> accumulator,
LogContext logContext,
Time time,
int fetchTimeoutMs
int fetchTimeoutMs,
LogContext logContext
) {
this.localId = localId;
this.epoch = epoch;
Expand All @@ -85,15 +83,14 @@ protected LeaderState(
boolean hasAcknowledgedLeader = voterId == localId;
this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader));
}
this.majority = voters.size() / 2;
this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters));
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null");
this.fetchTimeoutMs = fetchTimeoutMs;
this.fetchTimer = time.timer(fetchTimeoutMs);
}

public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {
public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) {
fetchTimer.update(currentTimeMs);
boolean isExpired = fetchTimer.isExpired();
if (isExpired) {
Expand All @@ -103,8 +100,11 @@ public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {
return isExpired;
}

public void maybeResetMajorityFollowerFetchTimeout(int id, long currentTimeMs) {
// visible for testing
void maybeResetMajorityFollowerFetchTimer(int id, long currentTimeMs) {
updateFetchedVoters(id);
// The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1
int majority = voterStates.size() / 2;
if (fetchedVoters.size() >= majority) {
fetchedVoters.clear();
fetchTimer.update(currentTimeMs);
Expand Down Expand Up @@ -324,6 +324,7 @@ public boolean updateReplicaState(
fetchOffsetMetadata,
leaderEndOffsetOpt
);
maybeResetMajorityFollowerFetchTimer(replicaId, currentTimeMs);

return isVoter(state.nodeId) && maybeUpdateHighWatermark();
}
Expand Down
6 changes: 3 additions & 3 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Expand Up @@ -453,15 +453,15 @@ public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumul
// we typically expect the state machine to be caught up anyway.

LeaderState<T> state = new LeaderState<>(
time,
localIdOrThrow(),
epoch(),
epochStartOffset,
voters,
candidateState.grantingVoters(),
accumulator,
logContext,
time,
fetchTimeoutMs
fetchTimeoutMs,
logContext
);
durableTransitionTo(state);
return state;
Expand Down
10 changes: 10 additions & 0 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
Expand Up @@ -515,6 +515,16 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVote

assertFalse(context.client.quorum().isResigned());

// Received fetch request from another voter, the fetch timer should be reset.
context.deliverRequest(context.fetchRequest(epoch, remoteId2, 0, 0, 0));
context.pollUntilRequest();

// Since the fetch timer is reset, the leader should not get resigned
context.time.sleep(context.fetchTimeoutMs / 2);
context.client.poll();

assertFalse(context.client.quorum().isResigned());

// Received fetch request from an observer, but the fetch timer should not be reset.
context.deliverRequest(context.fetchRequest(epoch, observerId3, 0, 0, 0));
context.pollUntilRequest();
Expand Down
28 changes: 14 additions & 14 deletions raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
Expand Up @@ -54,30 +54,30 @@ private LeaderState<?> newLeaderState(
long epochStartOffset
) {
return new LeaderState<>(
time,
localId,
epoch,
epochStartOffset,
voters,
voters,
accumulator,
logContext,
time,
fetchTimeoutMs
fetchTimeoutMs,
logContext
);
}

@Test
public void testRequireNonNullAccumulator() {
assertThrows(NullPointerException.class, () -> new LeaderState<>(
new MockTime(),
localId,
epoch,
0,
Collections.emptySet(),
Collections.emptySet(),
null,
logContext,
new MockTime(),
fetchTimeoutMs
fetchTimeoutMs,
logContext
));
}

Expand Down Expand Up @@ -460,27 +460,27 @@ public void testMajorityFollowerFetchTimeoutExpiration() {
int node4 = 4;
int observer5 = 5;
LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2, node3, node4), 0L);
assertFalse(state.hasMajorityFollowerFetchTimeoutExpired(time.milliseconds()));
assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds()));

// fetch timeout not exceeded, should not expire the timer
time.sleep(fetchTimeoutMs / 2);
assertFalse(state.hasMajorityFollowerFetchTimeoutExpired(time.milliseconds()));
assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds()));

// received fetch requests from 2 voter nodes, the timer should be reset
state.maybeResetMajorityFollowerFetchTimeout(node1, time.milliseconds());
state.maybeResetMajorityFollowerFetchTimeout(node2, time.milliseconds());
state.maybeResetMajorityFollowerFetchTimer(node1, time.milliseconds());
state.maybeResetMajorityFollowerFetchTimer(node2, time.milliseconds());

// Since the timer was reset, it won't expire this time.
time.sleep(fetchTimeoutMs / 2);
assertFalse(state.hasMajorityFollowerFetchTimeoutExpired(time.milliseconds()));
assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds()));

// received fetch requests from 1 voter and 1 observer nodes, the timer should not be reset.
state.maybeResetMajorityFollowerFetchTimeout(node3, time.milliseconds());
state.maybeResetMajorityFollowerFetchTimeout(observer5, time.milliseconds());
state.maybeResetMajorityFollowerFetchTimer(node3, time.milliseconds());
state.maybeResetMajorityFollowerFetchTimer(observer5, time.milliseconds());

// This time, the fetch timer will be expired
time.sleep(fetchTimeoutMs / 2);
assertTrue(state.hasMajorityFollowerFetchTimeoutExpired(time.milliseconds()));
assertTrue(state.hasMajorityFollowerFetchExpired(time.milliseconds()));
}

@Test
Expand Down

0 comments on commit f3efeb8

Please sign in to comment.