Skip to content

Commit

Permalink
KAFKA-6082; Fence zookeeper updates with controller epoch zkVersion
Browse files Browse the repository at this point in the history
This PR aims to enforce that the controller can only update zookeeper states after checking the controller epoch zkVersion. The check and zookeeper state updates are wrapped in the zookeeper multi() operations to ensure that they are done atomically. This PR is necessary to resolve issues related to multiple controllers (i.e. old controller updates zookeeper states before resignation, which is possible during controller failover based on the single threaded event queue model we have)

This PR includes the following changes:
- Add MultiOp request and response in ZookeeperClient
- Ensure all zookeeper updates done by controller are protected by checking the current controller epoch zkVersion
- Modify test cases in KafkaZkClientTest to test mismatch controller epoch zkVersion

Tests Done:
- Unit tests (with updated tests to test mismatch controller epoch zkVersion)
- Existing integration tests

Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Dong Lin <lindong28@gmail.com>, Manikumar Reddy O <manikumar.reddy@gmail.com>

Closes #5101 from hzxa21/KAFKA-6082
  • Loading branch information
hzxa21 authored and lindong28 committed Sep 7, 2018
1 parent 847780e commit 297fb39
Show file tree
Hide file tree
Showing 19 changed files with 680 additions and 298 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class Partition(val topic: String,
* the controller sends it a start replica command containing the leader for each partition that the broker hosts.
* In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "

private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class ControllerContext {
var controllerChannelManager: ControllerChannelManager = null

var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
var epoch: Int = KafkaController.InitialControllerEpoch - 1
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
var epoch: Int = KafkaController.InitialControllerEpoch
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
var allTopics: Set[String] = Set.empty
private var partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.yammer.metrics.core.Gauge
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.utils.CoreUtils.inLock
import kafka.utils.ShutdownableThread
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.utils.Time

import scala.collection._
Expand All @@ -32,12 +33,14 @@ object ControllerEventManager {
val ControllerEventThreadName = "controller-event-thread"
}
class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
eventProcessedListener: ControllerEvent => Unit) extends KafkaMetricsGroup {
eventProcessedListener: ControllerEvent => Unit,
controllerMovedListener: () => Unit) extends KafkaMetricsGroup {

@volatile private var _state: ControllerState = ControllerState.Idle
private val putLock = new ReentrantLock()
private val queue = new LinkedBlockingQueue[ControllerEvent]
private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
// Visible for test
private[controller] val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
private val time = Time.SYSTEM

private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
Expand Down Expand Up @@ -86,6 +89,9 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
controllerEvent.process()
}
} catch {
case e: ControllerMovedException =>
info(s"Controller moved to another broker when processing $controllerEvent.", e)
controllerMovedListener()
case e: Throwable => error(s"Error processing event $controllerEvent", e)
}

Expand Down
161 changes: 71 additions & 90 deletions core/src/main/scala/kafka/controller/KafkaController.scala

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions core/src/main/scala/kafka/controller/PartitionStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code

Expand Down Expand Up @@ -132,6 +133,9 @@ class PartitionStateMachine(config: KafkaConfig,
doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
throw e
case e: Throwable => error(s"Error while moving some partitions to $targetState state", e)
}
}
Expand Down Expand Up @@ -250,8 +254,11 @@ class PartitionStateMachine(config: KafkaConfig,
partition -> leaderIsrAndControllerEpoch
}.toMap
val createResponses = try {
zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, controllerContext.epochZkVersion)
} catch {
case e: ControllerMovedException =>
error("Controller moved to another broker when trying to create the topic partition state znode", e)
throw e
case e: Exception =>
partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
Seq.empty
Expand Down Expand Up @@ -361,7 +368,7 @@ class PartitionStateMachine(config: KafkaConfig,
val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
successfulUpdates.foreach { case (partition, leaderAndIsr) =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.zookeeper.KeeperException.Code

import scala.collection.mutable
Expand Down Expand Up @@ -106,6 +107,9 @@ class ReplicaStateMachine(config: KafkaConfig,
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
throw e
case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
}
}
Expand Down Expand Up @@ -299,7 +303,7 @@ class ReplicaStateMachine(config: KafkaConfig,
leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
}
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
val exception = new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $partition since the leader and isr path in zookeeper is empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TopicDeletionManager(controller: KafkaController,
} else {
// if delete topic is disabled clean the topic entries under /admin/delete_topics
info(s"Removing $initialTopicsToBeDeleted since delete topic is disabled")
zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
}
}

Expand Down Expand Up @@ -251,9 +251,9 @@ class TopicDeletionManager(controller: KafkaController,
controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica)
topicsToBeDeleted -= topic
topicsWithDeletionStarted -= topic
zkClient.deleteTopicZNode(topic)
zkClient.deleteTopicConfigs(Seq(topic))
zkClient.deleteTopicDeletions(Seq(topic))
zkClient.deleteTopicZNode(topic, controllerContext.epochZkVersion)
zkClient.deleteTopicConfigs(Seq(topic), controllerContext.epochZkVersion)
zkClient.deleteTopicDeletions(Seq(topic), controllerContext.epochZkVersion)
controllerContext.removeTopic(topic)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class ReplicaManager(val config: KafkaConfig,
}

/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
private val localBrokerId = config.brokerId
private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
new Partition(tp.topic, tp.partition, time, this)))
Expand Down
Loading

0 comments on commit 297fb39

Please sign in to comment.