Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18991: FetcherThread should match leader epochs between fetch request and fetch state #19223

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from

Conversation

frankvicky
Copy link
Contributor

JIRA: KAFKA-18991

This PR fixes a potential issue where the FetchResponse returns divergingEndOffsets with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss.

In detail:
processFetchRequest gets the requested leader epoch of partition data by topicPartition and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored.

… fetch request and fetch state

JIRA: KAFKA-18991
@github-actions github-actions bot added triage PRs from the community core Kafka Broker small Small PRs labels Mar 18, 2025
@frankvicky frankvicky marked this pull request as draft March 18, 2025 11:40
@frankvicky frankvicky marked this pull request as ready for review March 18, 2025 13:31
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky : Thanks for the PR. The code LGTM. Could we add a test case?

@github-actions github-actions bot removed the triage PRs from the community label Mar 20, 2025
@frankvicky
Copy link
Contributor Author

Hi @junrao
Thanks for the review.
This is a tricky one to test.
I'm still researching how to create a condition that matches the patch's purpose.

val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null &&
fetchPartitionData.fetchOffset == currentFetchState.fetchOffset &&
fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == currentFetchState.currentLeaderEpoch).orElse(true) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the line#368 can be streamlined to currentFetchState.currentLeaderEpoch due to this new condition.

                      val logAppendInfoOpt = processPartitionData(
                        topicPartition,
                        currentFetchState.fetchOffset,
                        fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch), // this line
                        partitionData
                      )

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky : Thanks for the updated PR. A couple of more comments.

fetcher.mockLeader.setLeaderState(partition, leaderState)
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)

val partitionData = Map(partition -> new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0, 1048576, Optional.of(0), Optional.of(0))).asJava
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use initEpoch instead of 0?

val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0, new SimpleRecord("a".getBytes))
val leaderState = PartitionState(Seq(batch), leaderEpoch = initEpoch, highWatermark = 1L)
fetcher.mockLeader.setLeaderState(partition, leaderState)
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three lines seem unneeded?

Copy link
Contributor Author

@frankvicky frankvicky Mar 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think only L1173 is redundant.

Technically, these lines don't affect the test result if initEpoch != newEpoch.
But since these lines implement the mock mechanism of fetch, these mocks will return a non-empty FetchResponse as essential safeguards:

  • If initEpoch is accidentally set equal to newEpoch, the response will be handled, and the leader epoch will be bumped as the leader epoch of fetchResponsethe test to fail.
  • If AbstractFetcherThread#L334 is accidentally removed, the response will be handled, and the leader epoch will be bumped as the leader epoch of fetchResponsethe test to fail.

Given that, we should keep these lines. WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker small Small PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants