Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15021; Skip leader epoch bump on ISR shrink #13765

Merged
merged 8 commits into from Jun 7, 2023

Conversation

jsancio
Copy link
Member

@jsancio jsancio commented May 25, 2023

When the KRaft controller removes a replica from the ISR because of the controlled shutdown there is no need for the leader epoch to be increased by the KRaft controller. This is accurate as long as the topic partition leader doesn't add the removed replica back to the ISR.

This change also fixes a bug when computing the HWM. When computing the HWM, replicas that are not eligible to join the ISR but are caught up should not be included in the computation. Otherwise, the HWM will never increase for replica.lag.time.max.ms because the shutting down replica is not sending FETCH request. Without this additional fix PRODUCE requests would timeout if the request timeout is greater than replica.lag.time.max.ms.

Because of the bug above the KRaft controller needs to check the MV to guarantee that all brokers support this bug fix before skipping the leader epoch bump.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jsancio jsancio requested review from dajac and hachikuji May 25, 2023 17:45
@jsancio jsancio added the kraft label May 25, 2023
@jsancio jsancio changed the title KAFKA-15021; Skip leader epoch bump KAFKA-15021; Skip leader epoch bump on ISR shrink May 25, 2023
@@ -1087,12 +1087,14 @@ class Partition(val topicPartition: TopicPartition,
// avoid unnecessary collection generation
val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
var newHighWatermark = leaderLogEndOffset
remoteReplicasMap.values.foreach { replica =>
remoteReplicasMap.foreachEntry { (replicaId, replica) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use remoteReplicasMap.values here and use the replica.brokerId similar to the maximalIsr.contains call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a test in PartitionTest to assert that the HWM is incremented when there is a replica that is fenced but caught up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done and Done. I confirmed that the test I added fails for the "fenced" and "shutdown" variant against trunk.

@divijvaidya
Copy link
Contributor

Hey @jsancio
With this change, we are changing the semantics of what a leadership epoch means. Prior to this change, leadership epoch is a version number representing membership of an ISR. As soon as membership changes, this version changes. After this change, the definition has changed to - leadership epoch is a version number that represents member of an ISR "in some cases". As you can see, the new definition has added ifs and buts to the simple definition above. Hence, I am not in favour of changing this.

To achieve the objective that you desired, there is another way without changing the definition, i.e. change how the components reacts when the version/epoch is changes. We can choose to not restart the fetcher threads on each replica when an shrink ISR with leadership epoch change arrives to it for processing.

Thoughts?

@dajac
Copy link
Contributor

dajac commented May 30, 2023

To achieve the objective that you desired, there is another way without changing the definition, i.e. change how the components reacts when the version/epoch is changes. We can choose to not restart the fetcher threads on each replica when an shrink ISR with leadership epoch change arrives to it for processing.

@divijvaidya This does not help. Restarting the fetcher threads is just a mean to provide them the new leader epoch that they have to use. Until they get it, they can't replicate. This is the annoying part. If you don't restart the fetchers and update the leader epoch "live", you still have that period of time during which the followers don't have the correct leader epoch. Note that the bump also have an impact on producers/consumers as they have to refresh their metadata. Overall, I think that the goal is to only bump the leader epoch on leadership changes to avoid all those disturbances.

@divijvaidya
Copy link
Contributor

Overall, I think that the goal is to only bump the leader epoch on leadership changes to avoid all those disturbances.

Yes, that is fair too. The definition of leadership epoch in which case changes to - it represents the version of the leader after a re-election. In this case, we should also remove the epoch change during ISR expansion as well. My point is, let's keep the definition as state of ISR (current) or state of leader (in which case we remove epoch change for both expansion and shrink).

Aside for it, out of curiosity, is there any other version which represents the state of ISR in Kafka? Does replica epoch changes on every change to ISR?

@dajac
Copy link
Contributor

dajac commented May 30, 2023

This change also fixes a bug when computing the HWM. When computing the HWM, replicas that are not eligible to join the ISR but are caught up should not be included in the computation. Otherwise, the HWM will never increase for replica.lag.time.max.ms because the shutting down replica is not sending FETCH request. Without this additional fix PRODUCE requests would timeout if the request timeout is greater than replica.lag.time.max.ms.

@jsancio I think that the real issue is in Partition.makeLeader. As you can see there, we only reset the followers' states when the leader epoch is bumped. I suppose that this is why you stumbled upon this issue with having shutting down replicas holding back advancing the HWM. The issue is that the shutting down replica's state is not reset so it remains caught-up for replica.lag.time.max.ms. I think that we need to update Partition.makeLeader to always update the followers' states. Obviously, we also need your changes to not consider fenced and shutting down replicas in the HWM computation.

Because of the bug above the KRaft controller needs to check the MV to guarantee that all brokers support this bug fix before skipping the leader epoch bump.

I wonder if we really need this if we change Partition.makeLeader as explained. It seems to me that the change in Partition.makeLeader and Partition.maybeIncrementLeaderHW together should be backward compatible. What do you think?

@dajac
Copy link
Contributor

dajac commented May 30, 2023

Yes, that is fair too. The definition of leadership epoch in which case changes to - it represents the version of the leader after a re-election. In this case, we should also remove the epoch change during ISR expansion as well. My point is, let's keep the definition as state of ISR (current) or state of leader (in which case we remove epoch change for both expansion and shrink).

Yeah, I agree that we need to do both in order to remain consistent.

Aside for it, out of curiosity, is there any other version which represents the state of ISR in Kafka? Does replica epoch changes on every change to ISR?

There is the partition epoch which is incremented whenever the partition is updated. This includes ISR changes.

@jsancio
Copy link
Member Author

jsancio commented Jun 1, 2023

Thanks for your feedback @divijvaidya @dajac. I am replying to both your comments in this message.

With this change, we are changing the semantics of what a leadership epoch means. Prior to this change, leadership epoch is a version number representing membership of an ISR. As soon as membership changes, this version changes.

@divijvaidya, the old code was increasing the leader epoch when with the ISR shrinks but not when the ISR expands. My understanding that we were doing this because the old replica manager used leader epoch bump to invalidate old fetchers. During shutdown the fetchers needed to be invalidated to avoid having them rejoin the ISR. With KIP-841, this is no longer necessary as we can reject brokers that are shutting down from joining the ISR and modifying the HWM.

Part of the code for doing this already exists, what we missing and what part of this PR fixes, is considering this state when advancing the HWM. The partition leader should not include shutting down replicas that are not in the ISR when determining the HWM.

After this change, the definition has changed to - leadership epoch is a version number that represents member of an ISR "in some cases". As you can see, the new definition has added ifs and buts to the simple definition above. Hence, I am not in favour of changing this.

@divijvaidya, for correctness, the main requirement is that the leader epoch is increase whenever the leader changes. This is needed for log truncation and reconciliation. For log consistency, log truncation and reconciliation assumes that the tuples (offset, epoch) are unique per topic partition and that if the tuple (offset, epoch) match in two replicas then their log up to that offset also match. In my opinion, for correctness Kafka doesn't require that the leader epoch is increased when the ISR changes.

As you can see there, we only reset the followers' states when the leader epoch is bumped. I suppose that this is why you stumbled upon this issue with having shutting down replicas holding back advancing the HWM. The issue is that the shutting down replica's state is not reset so it remains caught-up for replica.lag.time.max.ms. I think that we need to update Partition.makeLeader to always update the followers' states. Obviously, we also need your changes to not consider fenced and shutting down replicas in the HWM computation.

@dajac, yes, I thought about this when I was implemented this feature. I decided against it because the follower (shutting down replica) is technically "caught up" to the leader we simply don't want the leader to wait for the replica when computing the HWM since we know it will soon be shutting down its fetchers.

I wonder if we really need this if we change Partition.makeLeader as explained. It seems to me that the change in Partition.makeLeader and Partition.maybeIncrementLeaderHW together should be backward compatible. What do you think?

@dajac, we need the MV check in the controller even with your suggestion. The question is "When is it beneficial for the controller to not increase the leader epoch if a replica is removed from the ISR because of shutdown?" This is only the case when the controller knows that the brokers have the replica manager fixes in this PR. That is guarantee to be the case if the MV is greater than the MV introduced in this PR.

If the brokers don't contain the fixes in this PR and the controller doesn't bump the leader epoch, PRODUCE requests will timeout because the HWM increase will be delayed.

Copy link
Contributor

@splett2 splett2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, one thing I was wondering about this change since I am not that familiar with what the metadata version gates - does the change in the PR allow leader epochs to go backwards?

Consider a sequence like the following:

  1. Initial partition state, leader epoch 0, ISR [0, 1, 2] and metadata version 3.5 on broker and controller. I suppose this is a PartitionRecord.
  2. Shrink the ISR to [0, 1], leader epoch is bumped to 1. This results in a PartitionChangeRecord
  3. Shrink the ISR to [0], leader epoch is bumped to 2. This results in a PartitionChangeRecord.
  4. Publish a message to the leader [0], leader assigns (epoch 2, offset 0).
  5. Update metadata version to 3.6 and restart [0]. When 0 replays the PartitionChangeRecords from steps 2, 3, the controller will end up with a leader epoch of 0 unless a PartitionRecord snapshot is generated before the restart.
  6. Publish a message to leader [0], leader assigns (epoch 0, offset 1), we get a backwards epoch.

Same thing applies for controller restarts/etc after MV bump.

If what I described is an issue then the PartitionChangeRecord version may need to be updated so that the controller quorum (or broker metadata log replayer) knows whether a PartitionChangeRecord was persisted with implicit leader epoch bumps on ISR shrink or not so that on record replay the controller can rebuild the correct leader epoch.

Disclaimer: I'm not familiar with KRaft internals, so this is a sort of handwavey guess of how things may go wrong.

@jsancio
Copy link
Member Author

jsancio commented Jun 2, 2023

Actually, one thing I was wondering about this change since I am not that familiar with what the metadata version gates - does the change in the PR allow leader epochs to go backwards?

@splett2, the important observation is that this PR doesn't change the semantic of replaying PartitionRecord and PartitionChangeRecord with respect to leader epoch bump. When replaying PartitionChangeRecord the state machines (controller and broker) will increase the leader epoch if the field Leader is set. This holds true before and after this PR. What this PR changes is when the controller sets the Leader field in PartitionChangeRecord.

I should also point out that the MV check is not require for correctness. It is there for performance so that the PRODUCE request doesn't timeout and so that the Kafka producer doesn't have to retry the PRODUCE message.

@splett2
Copy link
Contributor

splett2 commented Jun 2, 2023

@jsancio
That makes sense. Sounds good to me in that case.

@divijvaidya
Copy link
Contributor

the old code was increasing the leader epoch when with the ISR shrinks but not when the ISR expands

Thank you, I didn't realise that.

Next,

  1. We change leader epoch when a replica is removed and shrinks the ISR (without a leader-re-election). Is that correct? Is yes, then should we also removing the logic to increment the epoch in such situations to keep the definition of leader epoch consistent?

  2. Is a similar change required for Zk code path?

@jsancio
Copy link
Member Author

jsancio commented Jun 2, 2023

  1. We change leader epoch when a replica is removed and shrinks the ISR (without a leader-re-election). Is that correct? Is yes, then should we also removing the logic to increment the epoch in such situations to keep the definition of leader epoch consistent?

Yes. That is what the change to PartittionChangeBuilder does. Can you point me to what code exactly are you referring to?

  1. Is a similar change required for Zk code path?

We need to keep the old behavior in ZK deployments because the ZK controller doesn't implement KIP-841 which is required for this fix to work.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio Thanks for the clarification. That makes sense to me. I left a few minor comments for consideration.

Comment on lines 1094 to 1095
((replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) &&
isReplicaIsrEligible(replica.brokerId)) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth extracting this condition into an helper method (e.g. isIsrEligibleAndCaughtUp)? That would simplify the condition.

Copy link
Member Author

@jsancio jsancio Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I called it shouldWaitForReplicaToJoinIsr since I think this is what the leader is trying to do.

@@ -357,6 +363,51 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSendToPartitionWithFollowerShutdown(quorum: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: *ShouldNotTimeout? it would be great to capture the issue in the test name or to add a comment about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done and done.

@@ -1456,6 +1456,105 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(alterPartitionListener.failures.get, 1)
}

@ParameterizedTest
@ValueSource(strings = Array("fenced", "shutdown", "unfenced"))
def testHWMIncreasesWithFencedOrShutdownFollower(brokerState: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/HWM/HighWatermark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done and I added a comment to the last check in the test.

*
* In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager
* that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader
* bump is not required when the ISR shrinks.
*/
void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, do we bump the leader epoch when the ISR is expanded? My understanding is that we don't.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. The Replica.contains check is subtle but it returns true if the second list is a subset of the first list. I added a comment about this.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

@jsancio jsancio merged commit 8ad0ed3 into apache:trunk Jun 7, 2023
1 check failed
@jsancio jsancio deleted the kafka--15021-skip-leader-epoch-bump branch June 7, 2023 14:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants