Skip to content

Commit

Permalink
KAFKA-15078; KRaft leader replys with snapshot for offset 0 (#13845)
Browse files Browse the repository at this point in the history
If the follower has an empty log, fetches with offset 0, it is more
efficient for the leader to reply with a snapshot id (redirect to
FETCH_SNAPSHOT) than for the follower to continue fetching from the log
segments.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
  • Loading branch information
jsancio committed Jun 28, 2023
1 parent 2f71708 commit 3a246b1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
11 changes: 10 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Expand Up @@ -1017,7 +1017,16 @@ private FetchResponseData tryCompleteFetchRequest(
long fetchOffset = request.fetchOffset();
int lastFetchedEpoch = request.lastFetchedEpoch();
LeaderState<T> state = quorum.leaderStateOrThrow();
ValidOffsetAndEpoch validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);

Optional<OffsetAndEpoch> latestSnapshotId = log.latestSnapshotId();
final ValidOffsetAndEpoch validOffsetAndEpoch;
if (fetchOffset == 0 && latestSnapshotId.isPresent()) {
// If the follower has an empty log and a snapshot exist, it is always more efficient
// to reply with a snapshot id (FETCH_SNAPSHOT) instead of fetching from the log segments.
validOffsetAndEpoch = ValidOffsetAndEpoch.snapshot(latestSnapshotId.get());
} else {
validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
}

final Records records;
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
Expand Down
Expand Up @@ -303,7 +303,53 @@ public void testFetchRequestOffsetLessThanLogStart() throws Exception {
context.client.poll();

// Send Fetch request less than start offset
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, epoch, 0));
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, snapshotId.offset() - 2, snapshotId.epoch(), 0));
context.pollUntilResponse();
FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode()));
assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch());
assertEquals(localId, partitionResponse.currentLeader().leaderId());
assertEquals(snapshotId.epoch(), partitionResponse.snapshotId().epoch());
assertEquals(snapshotId.offset(), partitionResponse.snapshotId().endOffset());
}

@Test
public void testFetchRequestOffsetAtZero() throws Exception {
// When the follower sends a FETCH request at offset 0, reply with snapshot id if it exists
int localId = 0;
int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withAppendLingerMs(1)
.build();

context.becomeLeader();
int epoch = context.currentEpoch();

List<String> appendRecords = Arrays.asList("a", "b", "c");
context.client.scheduleAppend(epoch, appendRecords);
context.time.sleep(context.appendLingerMs());
context.client.poll();

long localLogEndOffset = context.log.endOffset().offset;
assertTrue(
appendRecords.size() <= localLogEndOffset,
String.format("Record length = %s, log end offset = %s", appendRecords.size(), localLogEndOffset)
);

// Advance the highWatermark
context.advanceLocalLeaderHighWatermarkToLogEndOffset();

// Generate a snapshot at the LEO
OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch);
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(snapshotId, 0).get()) {
assertEquals(snapshotId, snapshot.snapshotId());
snapshot.freeze();
}

// Send Fetch request for offset 0
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, 0, 0));
context.pollUntilResponse();
FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode()));
Expand Down

0 comments on commit 3a246b1

Please sign in to comment.