Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3096; Leader is not set to -1 when it is shutdown if followers are down #765

Closed
wants to merge 10 commits into from
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Expand Up @@ -137,7 +137,7 @@ object KafkaController extends Logging {
Json.parseFull(controllerInfoString) match {
case Some(m) =>
val controllerInfo = m.asInstanceOf[Map[String, Any]]
return controllerInfo.get("brokerid").get.asInstanceOf[Int]
controllerInfo.get("brokerid").get.asInstanceOf[Int]
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
}
} catch {
Expand All @@ -146,7 +146,7 @@ object KafkaController extends Logging {
warn("Failed to parse the controller info as json. "
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
try {
return controllerInfoString.toInt
controllerInfoString.toInt
} catch {
case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
}
Expand All @@ -167,7 +167,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
// kafka server
private val autoRebalanceScheduler = new KafkaScheduler(1)
var deleteTopicManager: TopicDeletionManager = null
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(zkUtils, controllerContext, config)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
Expand Down Expand Up @@ -471,7 +471,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
// trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange()
// filter out the replicas that belong to topics that are being deleted
var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
// handle dead replicas
replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
Expand Down
108 changes: 55 additions & 53 deletions core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
Expand Up @@ -19,7 +19,7 @@ package kafka.controller
import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
import kafka.log.LogConfig
import kafka.utils.Logging
import kafka.utils.{ZkUtils, ReplicationUtils, Logging}
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.server.{ConfigType, KafkaConfig}

Expand All @@ -44,55 +44,61 @@ trait PartitionLeaderSelector {
* 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
* 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException
* Replicas to receive LeaderAndIsr request = live assigned replicas
* Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
*/
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
class OfflinePartitionLeaderSelector(zkUtils: ZkUtils, controllerContext: ControllerContext, config: KafkaConfig)
extends PartitionLeaderSelector with Logging {
this.logIdent = "[OfflinePartitionLeaderSelector]: "

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) =>
val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokerIds = controllerContext.liveBrokerIds
val liveAssignedReplicas = assignedReplicas.filter(liveBrokerIds.contains)
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(liveBrokerIds.contains)
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
case true =>
val newLeaderAndIsr =
if (liveBrokersInIsr.isEmpty) {
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
var zkUpdateSucceeded = false
while (!zkUpdateSucceeded) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure why we added the while loop here since there is already a while loop in PartitionStateMachine.electLeaderForPartition().

val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topicAndPartition.topic,
topicAndPartition.partition, currentLeaderAndIsr.copy(leader = LeaderAndIsr.NoLeader), controllerContext.epoch,
currentLeaderAndIsr.zkVersion)
zkUpdateSucceeded = updateSucceeded
}
throw new NoReplicaOnlineException(s"No broker in ISR for partition $topicAndPartition is alive." +
s" Live brokers are: [${liveBrokerIds.mkString(",")}], ISR brokers are: [${currentLeaderAndIsr.isr.mkString(",")}]")
}

debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
.format(topicAndPartition, liveAssignedReplicas.mkString(",")))
liveAssignedReplicas.isEmpty match {
case true =>
throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicas.head
warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
.format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
debug(s"No broker in ISR is alive for $topicAndPartition. Pick the leader from the live assigned replicas:" +
s" [${liveAssignedReplicas.mkString(",")}]")
liveAssignedReplicas.headOption.map { newLeader =>
ControllerStats.uncleanLeaderElectionRate.mark()
warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live brokers" +
s" [${liveAssignedReplicas.mkString(",")}]. There's potential data loss.")
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
}.getOrElse {
throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live brokers" +
s" are: [${liveBrokerIds.mkString(",")}], Assigned replicas are: [${assignedReplicas.mkString(",")}]")
}
case false =>
val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
val newLeader = liveReplicasInIsr.head
debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
.format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
}
else {
val newLeader = liveAssignedReplicas.find(liveBrokersInIsr.contains).getOrElse {
throw new IllegalStateException(s"Could not find any broker in both live assigned replicas" +
s" [${liveAssignedReplicas.mkString}] and ISR [${liveBrokersInIsr.mkString(",")}]")
}
debug(s"Some broker in ISR is alive for $topicAndPartition. Select $newLeader from ISR" +
s" [${liveBrokersInIsr.mkString(",")}] to be the leader.")
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
}
info(s"Selected new leader and ISR $newLeaderAndIsr for offline partition $topicAndPartition")
(newLeaderAndIsr, liveAssignedReplicas)
case None =>
throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
throw new NoReplicaOnlineException(s"Partition $topicAndPartition doesn't have replicas assigned to it")
}
}
}
Expand All @@ -114,18 +120,17 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
currentLeaderAndIsr.isr.contains(r))
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {
aliveReassignedInSyncReplicas.headOption match {
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
case None =>
reassignedInSyncReplicas.size match {
case 0 =>
throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
throw new NoReplicaOnlineException("List of reassigned replicas for partition" +
s" $topicAndPartition is empty. Current leader and ISR: [$currentLeaderAndIsr]")
case _ =>
throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
throw new NoReplicaOnlineException("None of the reassigned replicas for partition" +
s" $topicAndPartition are in-sync with the leader. Current leader and ISR: [$currentLeaderAndIsr]")
}
}
}
Expand All @@ -146,18 +151,18 @@ with Logging {
// check if preferred replica is the current leader
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if (currentLeader == preferredReplica) {
throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
.format(preferredReplica, topicAndPartition))
throw new LeaderElectionNotNeededException(s"Preferred replica $preferredReplica is already the current leader" +
s" for partition $topicAndPartition")
} else {
info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
" Trigerring preferred replica leader election")
info(s"Current leader $currentLeader for partition $topicAndPartition is not the preferred replica." +
" Triggering preferred replica leader election")
// check if preferred replica is not the current leader and is alive and in the isr
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
} else {
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
throw new StateChangeFailedException(s"Preferred replica $preferredReplica for partition $topicAndPartition" +
s" is either not alive or not in the ISR. Current leader and ISR: [$currentLeaderAndIsr]")
}
}
}
Expand All @@ -182,19 +187,16 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)

val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
val liveAssignedReplicas = assignedReplicas.filter(liveOrShuttingDownBrokerIds.contains)

val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
val newLeaderOpt = newIsr.headOption
newLeaderOpt match {
val newIsr = currentLeaderAndIsr.isr.filter(!controllerContext.shuttingDownBrokerIds.contains(_))
newIsr.headOption match {
case Some(newLeader) =>
debug("Partition %s : current leader = %d, new leader = %d"
.format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
debug(s"Partition $topicAndPartition : current leader = $currentLeader, new leader = $newLeader")
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
case None =>
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
throw new StateChangeFailedException(s"No other replicas in ISR [${currentLeaderAndIsr.isr.mkString(",")}] for" +
s" $topicAndPartition besides shutting down brokers [${controllerContext.shuttingDownBrokerIds.mkString(",")}]")
}
}
}
Expand Down
17 changes: 5 additions & 12 deletions core/src/main/scala/kafka/controller/PartitionStateMachine.scala
Expand Up @@ -26,7 +26,6 @@ import kafka.utils.{Logging, ReplicationUtils}
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.controller.Callbacks.CallbackBuilder
import kafka.utils.CoreUtils._

/**
Expand Down Expand Up @@ -117,8 +116,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
for((topicAndPartition, partitionState) <- partitionState
if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
(new CallbackBuilder).build)
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
} catch {
Expand All @@ -137,16 +135,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* @param targetState The state that the partitions should be moved to
*/
def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
callbacks: Callbacks = (new CallbackBuilder).build) {
leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) {
info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
try {
brokerRequestBatch.newBatch()
partitions.foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
} catch {
case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
// TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
}
Expand Down Expand Up @@ -176,8 +173,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* @param targetState The end state that the partition should be moved to
*/
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
leaderSelector: PartitionLeaderSelector,
callbacks: Callbacks) {
leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
if (!hasStarted.get)
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
Expand All @@ -187,7 +183,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
try {
targetState match {
case NewPartition =>
// pre: partition did not exist before this
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
partitionState.put(topicAndPartition, NewPartition)
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
Expand All @@ -213,15 +208,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
// post: partition has a leader
case OfflinePartition =>
// pre: partition should be in New or Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, OfflinePartition)
// post: partition has no alive leader
case NonExistentPartition =>
// pre: partition should be in Offline state
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
Expand Down
Expand Up @@ -149,8 +149,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
*
* ReplicaDeletionSuccessful -> NonExistentReplica
* -- remove the replica from the in memory partition replica assignment cache


*
* @param partitionAndReplica The replica for which the state transition is invoked
* @param targetState The end state that the replica should be moved to
*/
Expand Down