diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 9ad7b6ff7b63..82be66ae2cbe 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig, val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && - controllerContext.allTopics.contains(tp.topic)) + controllerContext.allTopics.contains(tp.topic) && + canPreferredReplicaBeLeader(tp) + ) onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered) } } } + private def canPreferredReplicaBeLeader(tp: TopicPartition): Boolean = { + val assignment = controllerContext.partitionReplicaAssignment(tp) + val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, tp)) + val isr = controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr + PartitionLeaderElectionAlgorithms + .preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet) + .nonEmpty + } + private def processAutoPreferredReplicaLeaderElection(): Unit = { if (!isActive) return try { diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index c4b5f478d195..c7a1cd5b840f 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -433,8 +433,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic creation") - servers(1).shutdown() - servers(1).awaitShutdown() + servers(otherBrokerId).shutdown() + servers(otherBrokerId).awaitShutdown() TestUtils.waitUntilTrue(() => { val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) leaderIsrAndControllerEpochMap.contains(tp) &&