-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7164: Follower should truncate after every leader epoch change #5436
Conversation
Currently, we skip the steps to make a replica a follower if the leader does not change, inlcuding truncating the follower log if necessary. This can cause problems if the follower has missed one or more leader updates. Change the logic to only skip the steps if the new epoch is the same or one greater than the old epoch. Tested with unit tests that verify the behavior of Partition.scala and that show log truncation when the follower's log is ahead of the leader's, the follower has missed an epoch update, and the follower receives a LeaderAndIsrRequest making it a follower
retest this please |
@hachikuji @apovzner @ijuma kafka.api.MetricsTest.testMetrics passes consistently locally and isn't related to my change. If anyone's concerned about that test please let me know. |
@@ -326,12 +326,14 @@ class Partition(val topic: String, | |||
|
|||
/** | |||
* Make the local replica the follower by setting the new leader and ISR to empty | |||
* If the leader replica id does not change, return false to indicate the replica manager | |||
* If the leader replica id does not change and the new epooch is equal or one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "epooch" typo
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest0, (_, followers) => assertEquals(0, followers.head.partitionId)) | ||
assertTrue(countdownLatch.await(1000L, TimeUnit.MILLISECONDS)) | ||
|
||
// Make local partition a follower - because epoch increased by only 1 and leader did not change, truncation should not trigger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my humble opinion (and from what I've read in TDD books), this should two separate tests. Splitting and naming the tests more explicitly would help the developer be more clear on what has broken if the test fails.
So if testBecomeFollowerWhenLeaderIsUnchanged
fails, you don't know what's wrong exactly.
Where as if testDoesNotTruncateLogOnUnchangedLeaderAndSameOrCloseEpoch
or testTruncatesLogOnUnchangedLeaderAndFarOffEpoch
fails, you immediately know what has caused the failure.
Another thing is that these explicit method names serve as good documentation. It's always great when you can open a test class and realize what it is and is not testing by simply reading the method names
I'd like to hear other's thoughts on this too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I tend to agree, though I'm mostly satisfied if the test cases exist at all. But yeah, more focused test cases tend to be smaller and easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Left a few minor comments.
@@ -343,7 +345,8 @@ class Partition(val topic: String, | |||
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch | |||
zkVersion = partitionStateInfo.basePartitionState.zkVersion | |||
|
|||
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) { | |||
// If the leader is unchanged and the epochs are no more than one change apart, indicate that no follower changes are required | |||
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId && (leaderEpoch == oldLeaderEpoch || leaderEpoch == oldLeaderEpoch + 1)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can replace leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId
with leaderReplicaIdOpt.exists(newLeaderBrokerId)
.
*/ | ||
def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { | ||
inWriteLock(leaderIsrUpdateLock) { | ||
val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt) | ||
val newLeaderBrokerId: Int = partitionStateInfo.basePartitionState.leader | ||
val oldLeaderEpoch: Int = leaderEpoch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we typically skip the type for local variables and rely on type inference. Could probably fix this above too.
@@ -343,7 +345,8 @@ class Partition(val topic: String, | |||
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch | |||
zkVersion = partitionStateInfo.basePartitionState.zkVersion | |||
|
|||
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) { | |||
// If the leader is unchanged and the epochs are no more than one change apart, indicate that no follower changes are required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can elaborate on this a little bit. I think the main point is that if we have somehow missed a leader epoch, then we have also missed a required truncation.
@@ -326,12 +326,14 @@ class Partition(val topic: String, | |||
|
|||
/** | |||
* Make the local replica the follower by setting the new leader and ISR to empty | |||
* If the leader replica id does not change, return false to indicate the replica manager | |||
* If the leader replica id does not change and the new epooch is equal or one | |||
* greater (that is, no updates have been missed), return false to indicate the replica manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To indicate what?! Don't leave us hanging.
producerStateManager = new ProducerStateManager(new TopicPartition(topic, 0), new File(new File(config.logDirs.head), s"$topic-0"), 30000), | ||
logDirFailureChannel = mockLogDirFailureChannel) { | ||
|
||
override def leaderEpochCache: LeaderEpochCache = new LeaderEpochCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use a mock for the epoch cache as well?
|
||
// Make local partition a follower - because epoch increased by more than 1, truncation should trigger even though leader does not change | ||
val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, | ||
collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 1, 3, aliveBrokerIds.asJava, 0, aliveBrokerIds.asJava, false)).asJava, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be a little easier to follow this if we create local variables for controller epoch, leaderId, etc.
Thanks for the feedback @stanislavkozlovski and @hachikuji. I've pushed the following changes to address your comments: Improved comments to explain the new behavior and reason for it |
LGTM. Thanks for addressing the comments @bob-barrett! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Left a few additional comments.
} | ||
|
||
private def prepareReplicaManagerAndLogManager( | ||
topicPartition: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: start arg list on previous line?
@@ -343,7 +346,9 @@ class Partition(val topic: String, | |||
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch | |||
zkVersion = partitionStateInfo.basePartitionState.zkVersion | |||
|
|||
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) { | |||
// If the leader is unchanged and the epochs are no more than one change apart, indicate that no follower changes are required | |||
// Otherwise, we missed a leader epoch update, so we may need to truncate the log by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this:
Otherwise, we missed a leader epoch update, which means the leader's log may have been truncated prior to the current epoch.
// Initialize partition state to follower, with leader = 1, leaderEpoch = 1 | ||
val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, topicPartition)) | ||
partition.getOrCreateReplica(followerBrokerId) | ||
partition.makeFollower(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderBrokerId, leaderEpoch, aliveBrokerIds.asJava, zkVersion, aliveBrokerIds.asJava, false), correlationId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a minor suggestion, but a helper would make this a little easier to read. Most of the fields are constants for the purpose of these test cases. For example, something like this:
def leaderAndIsrPartitionState(leaderEpoch): LeaderAndIsrRequst.PartitionState = {
new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderBrokerId, leaderEpoch,
aliveBrokerIds.asJava, zkVersion, aliveBrokerIds.asJava, false)
}
|
||
// Make local partition a follower - because epoch did not change, truncation should not trigger | ||
val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, controllerEpoch, | ||
collection.immutable.Map(new TopicPartition(topic, topicPartition) -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderBrokerId, leaderEpoch, aliveBrokerIds.asJava, zkVersion, aliveBrokerIds.asJava, false)).asJava, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: some of these lines are pretty long. I think 120 characters is the recommended limit.
Thanks @hachikuji! I've pushed fixes for your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
…change (#5436) Currently, we skip the steps to make a replica a follower if the leader does not change, including truncating the follower log if necessary. This can cause problems if the follower has missed one or more leader updates. Change the logic to only skip the steps if the new epoch is the same or one greater than the old epoch. Tested with unit tests that verify the behavior of `Partition` and that show log truncation when the follower's log is ahead of the leader's, the follower has missed an epoch update, and the follower receives a `LeaderAndIsrRequest` making it a follower. Reviewers: Stanislav Kozlovski <familyguyuser192@windowslive.com>, Jason Gustafson <jason@confluent.io>
…change (#5436) Currently, we skip the steps to make a replica a follower if the leader does not change, including truncating the follower log if necessary. This can cause problems if the follower has missed one or more leader updates. Change the logic to only skip the steps if the new epoch is the same or one greater than the old epoch. Tested with unit tests that verify the behavior of `Partition` and that show log truncation when the follower's log is ahead of the leader's, the follower has missed an epoch update, and the follower receives a `LeaderAndIsrRequest` making it a follower. Reviewers: Stanislav Kozlovski <familyguyuser192@windowslive.com>, Jason Gustafson <jason@confluent.io>
…change (apache#5436) Currently, we skip the steps to make a replica a follower if the leader does not change, including truncating the follower log if necessary. This can cause problems if the follower has missed one or more leader updates. Change the logic to only skip the steps if the new epoch is the same or one greater than the old epoch. Tested with unit tests that verify the behavior of `Partition` and that show log truncation when the follower's log is ahead of the leader's, the follower has missed an epoch update, and the follower receives a `LeaderAndIsrRequest` making it a follower. Reviewers: Stanislav Kozlovski <familyguyuser192@windowslive.com>, Jason Gustafson <jason@confluent.io>
…change (apache#5436) Currently, we skip the steps to make a replica a follower if the leader does not change, including truncating the follower log if necessary. This can cause problems if the follower has missed one or more leader updates. Change the logic to only skip the steps if the new epoch is the same or one greater than the old epoch. Tested with unit tests that verify the behavior of `Partition` and that show log truncation when the follower's log is ahead of the leader's, the follower has missed an epoch update, and the follower receives a `LeaderAndIsrRequest` making it a follower. Reviewers: Stanislav Kozlovski <familyguyuser192@windowslive.com>, Jason Gustafson <jason@confluent.io>
Currently, we skip the steps to make a replica a follower if the leader does not change, inlcuding truncating the follower log if necessary. This can cause problems if the follower has missed one or more leader updates. Change the logic to only skip the steps if the new epoch is the same or one greater than the old epoch. Tested with unit tests that verify the behavior of Partition.scala and that show log truncation when the follower's log is ahead of the leader's, the follower has missed an epoch update, and the follower receives a LeaderAndIsrRequest making it a follower
Committer Checklist (excluded from commit message)