Skip to content

Commit

Permalink
KAFKA-15489: resign leadership when no fetch or fetch snapshot from m…
Browse files Browse the repository at this point in the history
…ajority voters (apache#14428)

In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.

The commit include:
1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election.

2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests.

Reviewers: José Armando García Sancio <jsancio@apache.org>
  • Loading branch information
showuon authored and clolov committed Apr 5, 2024
1 parent 04b9fd6 commit 5abff99
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 8 deletions.
Expand Up @@ -61,6 +61,7 @@ public FetchSnapshotRequestData data() {
*/
public static FetchSnapshotRequestData singleton(
String clusterId,
int replicaId,
TopicPartition topicPartition,
UnaryOperator<FetchSnapshotRequestData.PartitionSnapshot> operator
) {
Expand All @@ -70,6 +71,7 @@ public static FetchSnapshotRequestData singleton(

return new FetchSnapshotRequestData()
.setClusterId(clusterId)
.setReplicaId(replicaId)
.setTopics(
Collections.singletonList(
new FetchSnapshotRequestData.TopicSnapshot()
Expand Down
14 changes: 10 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Expand Up @@ -1257,7 +1257,8 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
* - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range
*/
private FetchSnapshotResponseData handleFetchSnapshotRequest(
RaftRequest.Inbound requestMetadata
RaftRequest.Inbound requestMetadata,
long currentTimeMs
) {
FetchSnapshotRequestData data = (FetchSnapshotRequestData) requestMetadata.data;

Expand Down Expand Up @@ -1340,6 +1341,9 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(

UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize));

LeaderState<T> state = quorum.leaderStateOrThrow();
state.updateCheckQuorumForFollowingVoter(data.replicaId(), currentTimeMs);

return FetchSnapshotResponse.singleton(
log.topicPartition(),
responsePartitionSnapshot -> {
Expand Down Expand Up @@ -1701,7 +1705,7 @@ private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
break;

case FETCH_SNAPSHOT:
responseFuture = completedFuture(handleFetchSnapshotRequest(request));
responseFuture = completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
break;

default:
Expand Down Expand Up @@ -1875,6 +1879,7 @@ private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch snapsh

FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
clusterId,
quorum().localIdOrSentinel(),
log.topicPartition(),
snapshotPartition -> {
return snapshotPartition
Expand Down Expand Up @@ -1990,7 +1995,8 @@ private long pollLeader(long currentTimeMs) {
LeaderState<T> state = quorum.leaderStateOrThrow();
maybeFireLeaderChange(state);

if (shutdown.get() != null || state.isResignRequested()) {
long timeUntilCheckQuorumExpires = state.timeUntilCheckQuorumExpires(currentTimeMs);
if (shutdown.get() != null || state.isResignRequested() || timeUntilCheckQuorumExpires == 0) {
transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
return 0L;
}
Expand All @@ -2006,7 +2012,7 @@ private long pollLeader(long currentTimeMs) {
this::buildBeginQuorumEpochRequest
);

return Math.min(timeUntilFlush, timeUntilSend);
return Math.min(timeUntilFlush, Math.min(timeUntilSend, timeUntilCheckQuorumExpires));
}

private long maybeSendVoteRequests(
Expand Down
59 changes: 59 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.slf4j.Logger;

Expand All @@ -44,6 +46,7 @@
*/
public class LeaderState<T> implements EpochState {
static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
static final double CHECK_QUORUM_TIMEOUT_FACTOR = 1.5;

private final int localId;
private final int epoch;
Expand All @@ -55,17 +58,23 @@ public class LeaderState<T> implements EpochState {
private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
private final Logger log;
private final BatchAccumulator<T> accumulator;
// The set includes all of the followers voters that FETCH or FETCH_SNAPSHOT during the current checkQuorumTimer interval.
private final Set<Integer> fetchedVoters = new HashSet<>();
private final Timer checkQuorumTimer;
private final int checkQuorumTimeoutMs;

// This is volatile because resignation can be requested from an external thread.
private volatile boolean resignRequested = false;

protected LeaderState(
Time time,
int localId,
int epoch,
long epochStartOffset,
Set<Integer> voters,
Set<Integer> grantingVoters,
BatchAccumulator<T> accumulator,
int fetchTimeoutMs,
LogContext logContext
) {
this.localId = localId;
Expand All @@ -79,6 +88,55 @@ protected LeaderState(
this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters));
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null");
// use the 1.5x of fetch timeout to tolerate some network transition time or other IO time.
this.checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);
this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs);
}

/**
* Get the remaining time in milliseconds until the checkQuorumTimer expires.
* This will happen if we didn't receive a valid fetch/fetchSnapshot request from the majority of the voters within checkQuorumTimeoutMs.
*
* @param currentTimeMs the current timestamp in millisecond
* @return the remainingMs before the checkQuorumTimer expired
*/
public long timeUntilCheckQuorumExpires(long currentTimeMs) {
checkQuorumTimer.update(currentTimeMs);
long remainingMs = checkQuorumTimer.remainingMs();
if (remainingMs == 0) {
log.info(
"Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.",
checkQuorumTimeoutMs,
fetchedVoters);
}
return remainingMs;
}

/**
* Reset the checkQuorumTimer if we've received fetch/fetchSnapshot request from the majority of the voter
*
* @param id the node id
* @param currentTimeMs the current timestamp in millisecond
*/
public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) {
updateFetchedVoters(id);
// The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1
int majority = voterStates.size() / 2;
if (fetchedVoters.size() >= majority) {
fetchedVoters.clear();
checkQuorumTimer.update(currentTimeMs);
checkQuorumTimer.reset(checkQuorumTimeoutMs);
}
}

private void updateFetchedVoters(int id) {
if (id == localId) {
throw new IllegalArgumentException("Received a FETCH/FETCH_SNAPSHOT request from the leader itself.");
}

if (isVoter(id)) {
fetchedVoters.add(id);
}
}

public BatchAccumulator<T> accumulator() {
Expand Down Expand Up @@ -287,6 +345,7 @@ public boolean updateReplicaState(
fetchOffsetMetadata,
leaderEndOffsetOpt
);
updateCheckQuorumForFollowingVoter(replicaId, currentTimeMs);

return isVoter(state.nodeId) && maybeUpdateHighWatermark();
}
Expand Down
2 changes: 2 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Expand Up @@ -453,12 +453,14 @@ public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumul
// we typically expect the state machine to be caught up anyway.

LeaderState<T> state = new LeaderState<>(
time,
localIdOrThrow(),
epoch(),
epochStartOffset,
voters,
candidateState.grantingVoters(),
accumulator,
fetchTimeoutMs,
logContext
);
durableTransitionTo(state);
Expand Down
4 changes: 2 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
Expand Up @@ -68,8 +68,8 @@ public class RaftConfig {

public static final String QUORUM_FETCH_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "fetch.timeout.ms";
public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time without a successful fetch from " +
"the current leader before becoming a candidate and triggering an election for voters; Maximum time without " +
"receiving fetch from a majority of the quorum before asking around to see if there's a new epoch for leader.";
"the current leader before becoming a candidate and triggering an election for voters; Maximum time " +
"a leader can go without receiving valid fetch or fetchSnapshot request from a majority of the quorum before resigning.";
public static final int DEFAULT_QUORUM_FETCH_TIMEOUT_MS = 2_000;

public static final String QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG = QUORUM_PREFIX + "election.backoff.max.ms";
Expand Down
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -701,6 +702,97 @@ public void testFetchSnapshotRequestAsLeader() throws Exception {
assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer());
}

@Test
public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters() throws Exception {
int localId = 0;
int voter1 = 1;
int voter2 = 2;
int observerId3 = 3;
Set<Integer> voters = Utils.mkSet(localId, voter1, voter2);
OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
List<String> records = Arrays.asList("foo", "bar");

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a"))
.build();

int resignLeadershipTimeout = context.checkQuorumTimeoutMs;
context.becomeLeader();
int epoch = context.currentEpoch();

FetchSnapshotRequestData voter1FetchSnapshotRequest = fetchSnapshotRequest(
context.clusterId.toString(),
voter1,
context.metadataPartition,
epoch,
snapshotId,
Integer.MAX_VALUE,
0
);

FetchSnapshotRequestData voter2FetchSnapshotRequest = fetchSnapshotRequest(
context.clusterId.toString(),
voter2,
context.metadataPartition,
epoch,
snapshotId,
Integer.MAX_VALUE,
0
);

FetchSnapshotRequestData observerFetchSnapshotRequest = fetchSnapshotRequest(
context.clusterId.toString(),
observerId3,
context.metadataPartition,
epoch,
snapshotId,
Integer.MAX_VALUE,
0
);

context.advanceLocalLeaderHighWatermarkToLogEndOffset();
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(snapshotId, 0).get()) {
assertEquals(snapshotId, snapshot.snapshotId());
snapshot.append(records);
snapshot.freeze();
}

// fetch timeout is not expired, the leader should not get resigned
context.time.sleep(resignLeadershipTimeout / 2);
context.client.poll();
assertFalse(context.client.quorum().isResigned());

// voter1 sends fetchSnapshotRequest, the fetch timer should be reset
context.deliverRequest(voter1FetchSnapshotRequest);
context.client.poll();
context.assertSentFetchSnapshotResponse(context.metadataPartition);

// Since the fetch timer is reset, the leader should not get resigned
context.time.sleep(resignLeadershipTimeout / 2);
context.client.poll();
assertFalse(context.client.quorum().isResigned());

// voter2 sends fetchSnapshotRequest, the fetch timer should be reset
context.deliverRequest(voter2FetchSnapshotRequest);
context.client.poll();
context.assertSentFetchSnapshotResponse(context.metadataPartition);

// Since the fetch timer is reset, the leader should not get resigned
context.time.sleep(resignLeadershipTimeout / 2);
context.client.poll();
assertFalse(context.client.quorum().isResigned());

// An observer sends fetchSnapshotRequest, but the fetch timer should not be reset.
context.deliverRequest(observerFetchSnapshotRequest);
context.client.poll();
context.assertSentFetchSnapshotResponse(context.metadataPartition);

// After this sleep, the fetch timeout should expire since we don't receive fetch request from the majority voters within fetchTimeoutMs
context.time.sleep(resignLeadershipTimeout / 2);
context.client.poll();
assertTrue(context.client.quorum().isResigned());
}

@Test
public void testPartialFetchSnapshotRequestAsLeader() throws Exception {
int localId = 0;
Expand Down Expand Up @@ -1589,6 +1681,7 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
context.deliverRequest(
fetchSnapshotRequest(
context.clusterId.toString(),
otherNodeId,
context.metadataPartition,
epoch,
new OffsetAndEpoch(0, 0),
Expand All @@ -1603,6 +1696,7 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
context.deliverRequest(
fetchSnapshotRequest(
null,
otherNodeId,
context.metadataPartition,
epoch,
new OffsetAndEpoch(0, 0),
Expand All @@ -1617,6 +1711,7 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
context.deliverRequest(
fetchSnapshotRequest(
"",
otherNodeId,
context.metadataPartition,
epoch,
new OffsetAndEpoch(0, 0),
Expand All @@ -1631,6 +1726,7 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
context.deliverRequest(
fetchSnapshotRequest(
"invalid-uuid",
otherNodeId,
context.metadataPartition,
epoch,
new OffsetAndEpoch(0, 0),
Expand Down Expand Up @@ -1756,11 +1852,12 @@ private static FetchSnapshotRequestData fetchSnapshotRequest(
int maxBytes,
long position
) {
return fetchSnapshotRequest(null, topicPartition, epoch, offsetAndEpoch, maxBytes, position);
return fetchSnapshotRequest(null, -1, topicPartition, epoch, offsetAndEpoch, maxBytes, position);
}

private static FetchSnapshotRequestData fetchSnapshotRequest(
String clusterId,
int replicaId,
TopicPartition topicPartition,
int epoch,
OffsetAndEpoch offsetAndEpoch,
Expand All @@ -1773,6 +1870,7 @@ private static FetchSnapshotRequestData fetchSnapshotRequest(

FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
clusterId,
replicaId,
topicPartition,
snapshotPartition -> {
return snapshotPartition
Expand Down

0 comments on commit 5abff99

Please sign in to comment.