-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18991: FetcherThread should match leader epochs between fetch request and fetch state #19223
base: trunk
Are you sure you want to change the base?
Conversation
… fetch request and fetch state JIRA: KAFKA-18991
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky : Thanks for the PR. The code LGTM. Could we add a test case?
Hi @junrao |
val fetchPartitionData = sessionPartitions.get(topicPartition) | ||
if (fetchPartitionData != null && | ||
fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && | ||
fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == currentFetchState.currentLeaderEpoch).orElse(true) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the line#368 can be streamlined to currentFetchState.currentLeaderEpoch
due to this new condition.
val logAppendInfoOpt = processPartitionData(
topicPartition,
currentFetchState.fetchOffset,
fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch), // this line
partitionData
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky : Thanks for the updated PR. A couple of more comments.
fetcher.mockLeader.setLeaderState(partition, leaderState) | ||
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState) | ||
|
||
val partitionData = Map(partition -> new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0, 1048576, Optional.of(0), Optional.of(0))).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use initEpoch instead of 0?
val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0, new SimpleRecord("a".getBytes)) | ||
val leaderState = PartitionState(Seq(batch), leaderEpoch = initEpoch, highWatermark = 1L) | ||
fetcher.mockLeader.setLeaderState(partition, leaderState) | ||
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These three lines seem unneeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think only L1173 is redundant.
Technically, these lines don't affect the test result if initEpoch != newEpoch
.
But since these lines implement the mock mechanism of fetch, these mocks will return a non-empty FetchResponse
as essential safeguards:
- If
initEpoch
is accidentally set equal tonewEpoch
, the response will be handled, and the leader epoch will be bumped as the leader epoch of fetchResponsethe test to fail. - If
AbstractFetcherThread#L334
is accidentally removed, the response will be handled, and the leader epoch will be bumped as the leader epoch of fetchResponsethe test to fail.
Given that, we should keep these lines. WDYT?
JIRA: KAFKA-18991
This PR fixes a potential issue where the
FetchResponse
returnsdivergingEndOffsets
with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss.In detail:
processFetchRequest
gets the requested leader epoch of partition data bytopicPartition
and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored.