Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15489: resign leadership when no fetch from majority voters #14428

Merged
merged 12 commits into from Nov 30, 2023

Conversation

showuon
Copy link
Contributor

@showuon showuon commented Sep 23, 2023

In KIP-595, we expect to piggy-back on the quorum.fetch.timeout.ms config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.

This PR did:

  1. Added a fetchTimer with fetchTimeout in LeaderState, and check if expired each time when leader poll. If expired, resigning the leadership and start a new election.
  2. Added fetchedVoters in LeaderState, and update the value each time received a fetchRequest, and clear it and reset fetchTimer if the majority of fetchRequest received.
  3. Added tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon
Copy link
Contributor Author

showuon commented Sep 23, 2023

@hachikuji @cmccabe @jsancio , call for review. Thanks.

@satishd
Copy link
Member

satishd commented Sep 23, 2023

cc @mumrah

@jsancio jsancio added the kraft label Sep 25, 2023
@jsancio jsancio self-assigned this Sep 25, 2023
this.fetchTimer = time.timer(fetchTimeoutMs);
}

public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these are pretty lengthy and arguably unintuitive method names, could we add a short comment on what each method does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to hasMajorityFollowerFetchExpired. Let me know if you have any better suggestion. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was more suggesting that this method might benefit from a comment which describes behavior. I guess the info log explains it well enough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Added comments on the methods. Thanks.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @showuon

@@ -485,6 +485,49 @@ public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio
context.listener.currentLeaderAndEpoch());
}

@Test
public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also tests the opposite. That the leader doesn't resign if the majority of the replicas (including the leader) have fetch in the last fetchTimeoutMs?

Can we also add test(s) under KafkaRaftClientSnapshotTest that show that the leader also considers FETCH_SNAPSHOT requests for determining network connectivity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add tests later.

Copy link
Contributor Author

@showuon showuon Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test in KafkaRaftClientSnapshotTest.

Can we also tests the opposite. That the leader doesn't resign if the majority of the replicas (including the leader) have fetch in the last fetchTimeoutMs?

I didn't follow you. I've verified:

3 controller cluster

- 1/2 fetch time
leadership not get reassigned
- fetch from one voter 
--- timer reset ---
- 1/2 fetch time
leadership not get reassigned
- fetch from another voter
--- timer reset ---

- 1/2 fetch time
leadership not get reassigned
- fetch from the observer
- 1/2 fetch time 
--- expired ---
leadership should get reassigned

I think I've verified what you want. Let me know if I need to add other things. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This test covers all of the cases I was thinking about.

raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
Comment on lines 106 to 118
public void maybeResetMajorityFollowerFetchTimeout(int id, long currentTimeMs) {
updateFetchedVoters(id);
if (fetchedVoters.size() >= majority) {
fetchedVoters.clear();
fetchTimer.update(currentTimeMs);
fetchTimer.reset(fetchTimeoutMs);
}
}

private void updateFetchedVoters(int id) {
if (isVoter(id)) {
fetchedVoters.add(id);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

I think that the invariant that we need to hold is that for any time span of fetchTimeoutMs the majority of the replicas have performed a successful FETCH and FETCH_SNAPSHOT. Note that ReplicaState already contains the lastFetchTimestamp.

The part that is not clear to me is when or how to wake up the leader for a poll. We need to update KafkaRaftClient::pollLeader so that the replicas' last fetch time is taken into account when blocking on the messageQueue.poll.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that ReplicaState already contains the lastFetchTimestamp.

I'm trying to re-use the lastFetchTimestamp in ReplicaState today, but found it won't work as expected since the default value for it is -1, which means, when a note becomes a leader, all the lastFetchTimestamp of follower nodes are -1. Using current timer way is more readable IMO.

The part that is not clear to me is when or how to wake up the leader for a poll. We need to update KafkaRaftClient::pollLeader so that the replicas' last fetch time is taken into account when blocking on the messageQueue.poll

Good question. My thought is, we add some buffer to tolerate the operation time. Like when checking shrinkISR, we give a 1.5x of the timeout to make things easier, instead of calculating the accurate timestamp. So, I'm thinking we use fetchTimeout * 1.5. WDYT?

@showuon
Copy link
Contributor Author

showuon commented Oct 3, 2023

@jsancio @ahuang98 , I've addressed the comments. Please take a look again. Thanks.

Copy link
Contributor

@ahuang98 ahuang98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @showuon, I'm not sure if I follow the thread here so I don't feel comfortable approving just yet.
[Edit] I discussed with Jose offline and the comment makes sense to me now. I'm okay with the alternative of an added buffer time

I also had two other concerns/requests -
Perhaps the 1.5x of the timeout you suggested would also help address this concern - I'm wondering if we may start causing leaders to resign when followers are slow/backlogged and make the situation worse? E.g. if we have multiple followers that need to catch up via a large fetch snapshot, they are unable to fetch again prior to the timeout expiring, and cause the current leader to resign. I don't believe this would be very disruptive but wanted to check folks had considered this/similar situation.

I think we can also modify QUORUM_FETCH_TIMEOUT_MS_DOC to be slightly more explicit too (i.e. Maximum time a leader can go without receiving valid fetch or fetchsnapshot request from a majority of the quorum before resigning or something slightly different if we choose to use 1.5x)

raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
@showuon
Copy link
Contributor Author

showuon commented Oct 12, 2023

I'm wondering if we may start causing leaders to resign when followers are slow/backlogged and make the situation worse? E.g. if we have multiple followers that need to catch up via a large fetch snapshot, they are unable to fetch again prior to the timeout expiring, and cause the current leader to resign. I don't believe this would be very disruptive but wanted to check folks had considered this/similar situation.

Yes, with 1.5x of timeout, this issue should be resolved. Also, if one follower is slow due to whatever reason, and doesn't fetch again within fetch timeout, it'll also start a new election. That's already the current implementation.

I think we can also modify QUORUM_FETCH_TIMEOUT_MS_DOC to be slightly more explicit too (i.e. Maximum time a leader can go without receiving valid fetch or fetchsnapshot request from a majority of the quorum before resigning or something slightly different if we choose to use 1.5x)

Doc updated. I don't think we need to mention anything about 1.5x because that's the implementation detail.

@showuon
Copy link
Contributor Author

showuon commented Nov 7, 2023

@jsancio @ahuang98 , do you have any other comments? I'd like to include this fix into v3.5.2 if possible. Thanks.

@jsancio
Copy link
Member

jsancio commented Nov 27, 2023

Excuse the delays @showuon . I'll review this today and this week!

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @showuon ,

Thanks for the changes. They look good to me in general. One potential issue with this implementation is that the leader doesn't check that the fetching voters are making progress.

Just because the leader returned a successful response to FETCH and FETCH_SNAPSHOT doesn't mean that the followers were able to handle the response correctly.

For example, imagine the case where the log end offset (LEO) is at 1000 and all of the followers are continuously fetching at offset 0 without ever increasing their fetch offset. This can happen if the followers encounter an error when processing the FETCH or FETCH_SNAPSHOT response.

In this scenario the leader will never be able to increase the HWM. I think that this scenario is specific to KRaft and doesn't exists in Raft because KRaft is pull vs Raft which is push.

What do you think? Do you agree? If so should we address this issue in this PR or create an issue for this and fix it in a future PR?

raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
@@ -485,6 +485,49 @@ public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio
context.listener.currentLeaderAndEpoch());
}

@Test
public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This test covers all of the cases I was thinking about.

@showuon
Copy link
Contributor Author

showuon commented Nov 28, 2023

Hi @showuon ,

Thanks for the changes. They look good to me in general. One potential issue with this implementation is that the leader doesn't check that the fetching voters are making progress.

Just because the leader returned a successful response to FETCH and FETCH_SNAPSHOT doesn't mean that the followers were able to handle the response correctly.

For example, imagine the case where the log end offset (LEO) is at 1000 and all of the followers are continuously fetching at offset 0 without ever increasing their fetch offset. This can happen if the followers encounter an error when processing the FETCH or FETCH_SNAPSHOT response.

In this scenario the leader will never be able to increase the HWM. I think that this scenario is specific to KRaft and doesn't exists in Raft because KRaft is pull vs Raft which is push.

What do you think? Do you agree? If so should we address this issue in this PR or create an issue for this and fix it in a future PR?

@jsancio , Good catch! Yes, that's indeed a potential problem. This PR has been pending for a long time, let's focus on the current issue in this PR first. I've filed: KAFKA-15911 for the potential issue.
I've addressed all your comments. Please take a look again. Thanks.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one minor suggestion.

I took at look at the build and there seem to be a lot of failures. Can you confirm that they are not related to this change?

raft/src/main/java/org/apache/kafka/raft/LeaderState.java Outdated Show resolved Hide resolved
@showuon
Copy link
Contributor Author

showuon commented Nov 29, 2023

I took at look at the build and there seem to be a lot of failures. Can you confirm that they are not related to this change?

No, they doesn't look related. Let's check the latest build results later.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the improvement. There are a lot of test failures but they seem unrelated. Do you agree @showuon ?

@jsancio jsancio merged commit 37416e1 into apache:trunk Nov 30, 2023
1 check failed
@showuon
Copy link
Contributor Author

showuon commented Dec 1, 2023

Yes, I agree. Thanks for helping merge it.

ex172000 pushed a commit to ex172000/kafka that referenced this pull request Dec 15, 2023
…ajority voters (apache#14428)

In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.

The commit include:
1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election.

2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests.

Reviewers: José Armando García Sancio <jsancio@apache.org>
anurag-harness pushed a commit to anurag-harness/kafka that referenced this pull request Feb 9, 2024
…ajority voters (apache#14428)

In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.

The commit include:
1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election.

2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests.

Reviewers: José Armando García Sancio <jsancio@apache.org>
anurag-harness added a commit to anurag-harness/kafka that referenced this pull request Feb 9, 2024
…ajority voters (apache#14428) (#315)

In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.

The commit include:
1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election.

2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests.

Reviewers: José Armando García Sancio <jsancio@apache.org>

Co-authored-by: Luke Chen <showuon@gmail.com>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
…ajority voters (apache#14428)

In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.

The commit include:
1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election.

2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests.

Reviewers: José Armando García Sancio <jsancio@apache.org>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
…ajority voters (apache#14428)

In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.

The commit include:
1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election.

2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests.

Reviewers: José Armando García Sancio <jsancio@apache.org>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
…ajority voters (apache#14428)

In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.

The commit include:
1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election.

2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests.

Reviewers: José Armando García Sancio <jsancio@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants