New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9866: Avoid election for topics where preferred leader is not in ISR #8524
KAFKA-9866: Avoid election for topics where preferred leader is not in ISR #8524
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Can we add a test?
After discussing online, we figured there isn't an easy way to test this scenario. There's significant work to be done to make KafkaController unit test-able Good catch on the I think this change looks good. Let's wait for @hachikuji or @junrao to take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leonardge : Thanks for the PR. Just one comment below.
@@ -1068,7 +1068,9 @@ class KafkaController(val config: KafkaConfig, | |||
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && | |||
controllerContext.partitionsBeingReassigned.isEmpty && | |||
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && | |||
controllerContext.allTopics.contains(tp.topic)) | |||
controllerContext.allTopics.contains(tp.topic) && | |||
controllerContext.partitionLeadershipInfo.get(tp).forall(l => l.leaderAndIsr.isr.contains(leaderBroker)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The preferred leader election also checks for live brokers. So, perhaps we could just call PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection() here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leonardge : Thanks for the updated PR. One more followup comment.
PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection( | ||
controllerContext.partitionReplicaAssignment(tp), | ||
controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr, | ||
controllerContext.liveBrokerIds.toSet).nonEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Election.leaderForPreferredReplica(), liveReplicas is computed as the following. So, we probably want to be consistent here.
val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! And after using the mentioned implementation the code block gets cluttered so I extracted it into a helper method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leonardge : Thanks for the PR. Just a minor comment below.
onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered) | ||
} | ||
} | ||
} | ||
|
||
private def isPreferredLeaderInSync(tp: TopicPartition): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a more accurate name is canPreferredReplicaBeLeader()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leonardge : Thanks for the latest update. LGTM. Waiting for the tests to pass.
onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered) | ||
} | ||
} | ||
} | ||
|
||
private def canPreferredReplicaBeLeader(tp: TopicPartition): Boolean = { | ||
val assignment = controllerContext.partitionReplicaAssignment(tp) | ||
val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, tp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also do controllerContext.isReplicaOnline(leaderBroker, tp)
in the caller. Do we also need it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We can get rid of the isReplicaOnline() check in the caller. @leonardge : Could you summit a followup minor PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing!
This is a minor follower up PR of #8524 Reviewer: Jun Rao <junrao@gmail.com>
…t-for-generated-requests * apache-github/trunk: (366 commits) MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters. (apache#7982) MINOR: document how to escape json parameters to ducktape tests (apache#8546) KAFKA-9885; Evict last members of a group when the maximum allowed is reached (apache#8525) KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (apache#8524) KAFKA-9839; Broker should accept control requests with newer broker epoch (apache#8509) KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574) MINOR: Partition is under reassignment when adding and removing (apache#8364) MINOR: reduce allocations in log start and recovery checkpoints (apache#8467) MINOR: Remove unused foreign-key join class (apache#8547) HOTFIX: Fix broker bounce system tests (apache#8532) KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap. (apache#8224) KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol (apache#8326) MINOR: equals() should compare all fields for generated classes (apache#8539) KAFKA-9844; Fix race condition which allows more than maximum number of members(apache#8454) KAFKA-9823: Remember the sent generation for the coordinator request (apache#8445) KAFKA-9883: Add better error message when REST API forwards a request and leader is not known (apache#8536) KAFKA-9907: Switch default build to Scala 2.13 (apache#8537) MINOR: Some html fixes in Streams DSL documentation (apache#8503) MINOR: Enable fatal warnings with scala 2.13 (apache#8429) KAFKA-9852: Change the max duration that calls to the buffer pool can block from 2000ms to 10ms to reduce overall test runtime (apache#8464) ...
In this commit we made sure that the auto leader election only happens after the newly starter broker is in the
isr
.No accompany tests are added due to the fact that:
private
method and no public facing change is madeCommitter Checklist (excluded from commit message)