Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ private void updateLeaderEndOffsetAndTimestamp(
long currentTimeMs
) {
final LogOffsetMetadata endOffsetMetadata = log.endOffset();

if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet())) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}
Expand Down Expand Up @@ -1512,6 +1511,7 @@ private CompletableFuture<FetchResponseData> handleFetchRequest(
FetchRequest.replicaId(request),
fetchPartition.replicaDirectoryId()
);

FetchResponseData response = tryCompleteFetchRequest(
requestMetadata.listenerName(),
requestMetadata.apiVersion(),
Expand Down Expand Up @@ -2933,7 +2933,7 @@ private long maybeSendRequests(
return minBackoffMs;
}

private long maybeSendRequest(
private long maybeSendRequests(
long currentTimeMs,
Set<ReplicaKey> remoteVoters,
Function<Integer, Node> destinationSupplier,
Expand Down Expand Up @@ -3120,13 +3120,10 @@ private long maybeSendBeginQuorumEpochRequests(
)
);

timeUntilNextBeginQuorumSend = maybeSendRequest(
Set<ReplicaKey> needToSendBeginQuorumRequests = state.needToSendBeginQuorumRequests(currentTimeMs);
timeUntilNextBeginQuorumSend = maybeSendRequests(
currentTimeMs,
voters
.voterKeys()
.stream()
.filter(key -> key.id() != quorum.localIdOrThrow())
.collect(Collectors.toSet()),
needToSendBeginQuorumRequests,
nodeSupplier,
this::buildBeginQuorumEpochRequest
);
Expand Down Expand Up @@ -3208,7 +3205,7 @@ private long maybeSendVoteRequests(
if (!state.epochElection().isVoteRejected()) {
VoterSet voters = partitionState.lastVoterSet();
boolean preVote = quorum.isProspective();
return maybeSendRequest(
return maybeSendRequests(
currentTimeMs,
state.epochElection().unrecordedVoters(),
voterId -> voters
Expand Down
23 changes: 23 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,29 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) {
beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
}

/**
* Determines the set of replicas that should receive a {@code BeginQuorumEpoch} request
* based on the elapsed time since their last fetch.
* <p>
* For each remote voter (excluding the local node), if the time since the last
* fetch exceeds the configured {@code beginQuorumEpochTimeoutMs}, the replica
* is considered to need a new quorum epoch request.
*
* @param currentTimeMs the current system time in milliseconds
* @return an unmodifiable set of {@link ReplicaKey} objects representing replicas
* that need to receive a {@code BeginQuorumEpoch} request
*/
public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
return voterStates.values()
.stream()
.filter(
state -> state.replicaKey.id() != localVoterNode.voterKey().id() &&
currentTimeMs - state.lastFetchTimestamp >= beginQuorumEpochTimeoutMs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This algorithm works because the max fetch wait time is hardcoded to 500ms which is much smaller than fetch timeout / 2 for most configurations. https://github.com/apache/kafka/pull/20318/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R260

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, but there is an issue which is the value of beginQuorumEpochTimeoutMs can be set from users.
If the values is too small, I think the condition is not meet.

By the way, there is any reason we need to hardcode max fetch wait time?
In my mind, we can make it configurable and enforce user to set bigger than QUORUM_FETCH_TIMEOUT_MS_CONFIG

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have that requirement today. Meaning the the fetch timeout should be greater than 2 times the hard-coded max wait time.

I think it is okay to keep it hard-coded for now since KRaft is only used for the cluster metadata partition. When we start using KRaft for more than one partition, like normal produce and consume, we need to fix this hardcoded fetch wait time.

Copy link
Collaborator Author

@TaiJuWu TaiJuWu Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have that requirement today. Meaning the the fetch timeout should be greater than 2 times the hard-coded max wait time.

If nobody can handle this, may I have chance to pick it up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning the the fetch timeout should be greater than 2 times the hard-coded max wait time.

Perhaps we should change atLeast(0) to atLeast(1000) to address that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discussion in #18998 (comment) is similar regarding adding an new lower bound to the quorum-related configs. @TaiJuWu could you please open a JIRA and KIP for these?

)
.map(ReplicaState::replicaKey)
.collect(Collectors.toUnmodifiableSet());
}

/**
* Get the remaining time in milliseconds until the checkQuorumTimer expires.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -618,7 +619,7 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio
int epoch = context.currentEpoch();
assertEquals(OptionalInt.of(localId), context.currentLeader());

// begin epoch requests should be sent out every beginQuorumEpochTimeoutMs
// begin epoch requests sent out every beginQuorumEpochTimeoutMs if replicas have not fetched
context.time.sleep(context.beginQuorumEpochTimeoutMs);
context.client.poll();
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2));
Expand All @@ -633,6 +634,53 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2));
}

@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) throws Exception {
ReplicaKey localKey = replicaKey(randomReplicaId(), true);
int remoteId1 = localKey.id() + 1;
int remoteId2 = localKey.id() + 2;
ReplicaKey replicaKey1 = replicaKey(remoteId1, withKip853Rpc);
ReplicaKey replicaKey2 = replicaKey(remoteId2, withKip853Rpc);

RaftClientTestContext context = new RaftClientTestContext.Builder(localKey.id(), localKey.directoryId().get())
.withKip853Rpc(withKip853Rpc)
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(localKey, replicaKey1, replicaKey2)),
KRaftVersion.KRAFT_VERSION_1
)
.build();

context.unattachedToLeader();
int epoch = context.currentEpoch();
assertEquals(OptionalInt.of(localKey.id()), context.currentLeader());

// begin epoch requests sent out every beginQuorumEpochTimeoutMs if replicas have not fetched
context.time.sleep(context.beginQuorumEpochTimeoutMs);
context.client.poll();
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2));

long partialDelay = context.beginQuorumEpochTimeoutMs / 3;
context.time.sleep(context.beginQuorumEpochTimeoutMs / 3);
context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 0));
context.pollUntilResponse();

context.time.sleep(context.beginQuorumEpochTimeoutMs - partialDelay);
context.client.poll();
// leader will not send BeginQuorumEpochRequest again for replica 1 since fetchRequest was received
// before beginQuorumEpochTimeoutMs time has elapsed
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId2));

context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 0));
context.pollUntilResponse();
context.time.sleep(context.beginQuorumEpochTimeoutMs);
context.client.poll();
// leader should send BeginQuorumEpochRequest to a node if beginQuorumEpochTimeoutMs time has elapsed
// without receiving a fetch request from that node
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2));
}


@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean withKip853Rpc) throws Exception {
Expand Down