-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-18991: FetcherThread should match leader epochs between fetch request and fetch state #19223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… fetch request and fetch state JIRA: KAFKA-18991
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky : Thanks for the PR. The code LGTM. Could we add a test case?
Hi @junrao |
val fetchPartitionData = sessionPartitions.get(topicPartition) | ||
if (fetchPartitionData != null && | ||
fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && | ||
fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == currentFetchState.currentLeaderEpoch).orElse(true) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the line#368 can be streamlined to currentFetchState.currentLeaderEpoch
due to this new condition.
val logAppendInfoOpt = processPartitionData(
topicPartition,
currentFetchState.fetchOffset,
fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch), // this line
partitionData
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky : Thanks for the updated PR. A couple of more comments.
fetcher.mockLeader.setLeaderState(partition, leaderState) | ||
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState) | ||
|
||
val partitionData = Map(partition -> new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0, 1048576, Optional.of(0), Optional.of(0))).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use initEpoch instead of 0?
val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0, new SimpleRecord("a".getBytes)) | ||
val leaderState = PartitionState(Seq(batch), leaderEpoch = initEpoch, highWatermark = 1L) | ||
fetcher.mockLeader.setLeaderState(partition, leaderState) | ||
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These three lines seem unneeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think only L1173 is redundant.
Technically, these lines don't affect the test result if initEpoch != newEpoch
.
But since these lines implement the mock mechanism of fetch, these mocks will return a non-empty FetchResponse
as essential safeguards:
- If
initEpoch
is accidentally set equal tonewEpoch
, the response will be handled, and the leader epoch will be bumped as the leader epoch of fetchResponsethe test to fail. - If
AbstractFetcherThread#L334
is accidentally removed, the response will be handled, and the leader epoch will be bumped as the leader epoch of fetchResponsethe test to fail.
Given that, we should keep these lines. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky : Thanks for the updated PR. Just one minor comment.
val leaderState = PartitionState(Seq(batch), leaderEpoch = initEpoch, highWatermark = 1L) | ||
fetcher.mockLeader.setLeaderState(partition, leaderState) | ||
|
||
val partitionData = Map(partition -> new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0, 1048576, Optional.of(initEpoch), Optional.of(0))).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use initEpoch
for lastFetchedEpoch
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky : Thanks for the updated PR. LGTM
@frankvicky : Could you provide a PR for the 3.9 and 4.0 branch as well? |
should we backport KAFKA-18723 first? |
The fixed version for KAFKA-18723 already includes 3.9.1 and 4.0.1? |
I grep the git history , and KAFKA-18723 is not in 4.0/3.9.
|
…equest and fetch state (apache#19223) This PR fixes a potential issue where the `FetchResponse` returns `divergingEndOffsets` with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss. In detail: `processFetchRequest` gets the requested leader epoch of partition data by `topicPartition` and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored. Reviewers: Jun Rao <junrao@gmail.com>
…equest and fetch state (apache#19223) This PR fixes a potential issue where the `FetchResponse` returns `divergingEndOffsets` with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss. In detail: `processFetchRequest` gets the requested leader epoch of partition data by `topicPartition` and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored. Reviewers: Jun Rao <junrao@gmail.com>
…equest and fetch state (apache#19223) This PR fixes a potential issue where the `FetchResponse` returns `divergingEndOffsets` with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss. In detail: `processFetchRequest` gets the requested leader epoch of partition data by `topicPartition` and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored. Reviewers: Jun Rao <junrao@gmail.com>
…equest and fetch state (#19223) This PR fixes a potential issue where the `FetchResponse` returns `divergingEndOffsets` with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss. In detail: `processFetchRequest` gets the requested leader epoch of partition data by `topicPartition` and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored. Reviewers: Jun Rao <junrao@gmail.com>
…equest and fetch state (#19223) This PR fixes a potential issue where the `FetchResponse` returns `divergingEndOffsets` with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss. In detail: `processFetchRequest` gets the requested leader epoch of partition data by `topicPartition` and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored. Reviewers: Jun Rao <junrao@gmail.com>
JIRA: KAFKA-18991
This PR fixes a potential issue where the
FetchResponse
returnsdivergingEndOffsets
with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss.In detail:
processFetchRequest
gets the requested leader epoch of partition data bytopicPartition
and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored.