Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6082: Fence zookeeper updates with controller epoch zkVersion #5101

Closed
wants to merge 25 commits into from

Conversation

hzxa21
Copy link
Contributor

@hzxa21 hzxa21 commented May 30, 2018

This PR aims to enforce that the controller can only update zookeeper states after checking the controller epoch zkVersion. The check and zookeeper state updates are wrapped in the zookeeper multi() operations to ensure that they are done atomically. This PR is necessary to resolve issues related to multiple controllers (i.e. old controller updates zookeeper states before resignation, which is possible during controller failover based on the single threaded event queue model we have)

This PR includes the following changes:

  • Add MultiOp request and response in ZookeeperClient
  • Ensure all zookeeper updates done by controller are protected by checking the current controller epoch zkVersion
  • Modify test cases in KafkaZkClientTest to test mismatch controller epoch zkVersion

Tests Done:

  • Unit tests (with updated tests to test mismatch controller epoch zkVersion)
  • Existing integration tests + newly added test cases in ControllerIntegrationTest
  • Perf testing: KAFKA-6082: Fence zookeeper updates with controller epoch zkVersion #5101 (comment). Test results show that there is no performance overhead before and after this patch for common controller operations including controller failover, preferred replica leader election, and broker shutdown/startup.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma ijuma requested a review from junrao May 30, 2018 23:59
Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch.

In addition to make sure that stale controller can not update zookeeper, would it be useful to have a way for controller to skip the non-controller-election related events if after it notices that it can not update zookeeper due to zkVersion mismatch? This logic can probably be put in ControllerEventThread.

@@ -42,7 +42,7 @@ import scala.util.Try

object KafkaController extends Logging {
val InitialControllerEpoch = 1
val InitialControllerEpochZkVersion = 1
val InitialControllerEpochZkVersion = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change it to 0? Do we expect to initialize ControllerContext.epochZkVersion to -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the InitialControllerEpochZkVersion represents the initial value when the controller epoch znode first gets created so it should be set to 0 for consistency just like InitialControllerEpoch.

Also, during cluster initialization (i.e controller epoch znode does not exists), we explicitly set ControllerContext.epochZkVersion to InitialControllerEpochZkVersion, which was 1 before.
(https://github.com/apache/kafka/pull/5101/files/dcf94b0e78208ec1fc11ebe538f743063880a798#diff-ed90e8ecc5439a5ede5e362255d11be1L634)
This will cause problems for the first controller to update zk states after this patch because the actual znode version is 0. This is not a problem before because we didn't use ControllerContext.epochZkVersion to fence zk state updates and after controller failover, we will update it by reading controller epoch znode.

This change is only for readbility. Actually we can just remove the line to set ControllerContext.epochZkVersion to InitialControllerEpochZkVersion and keep InitialControllerEpochZkVersion to be 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. This makes sense.

successfulUpdates.put(partition, updatedLeaderAndIsr)
case Code.BADVERSION => updatesToRetry += partition
case _ => failed.put(partition, setDataResponse.resultException.get)
case Code.BADVERSION => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: we probably don't need { here so that the code style is consistent with the existing code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. Will fix that.

def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch], controllerEpochVersion: Int): Seq[MultiOpResponse] = {
val multiOpRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here SetDataRequest/SetDataResponse is replaced with MultiOpRequest/MultiOpResponse. This may cause problem for existing code (e.g. ZookeeperClient.send()) whose logic relies on the type of the request.

Instead of adding a new subclass of AsyncRequest, would it be better to modify the existing request (maybe the AsyncRequest) to include the expected controller epoch version so that, when the expected controller epoch exists, zookeeperClient.send() will take care of the version check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. The reason why I add the MultiOp subclass is that we can extend ZookeeperClient to handle arbitrary multi() operations, not specifically for checking controller epoch version and updating zk states. Also, I think ZookeeperClient should only be aware of zookeeper related context not the kafka related context (e.g. controller epoch version) for cleanness. Instead of modifying AsyncRequest, do you think it is better to add some helper functions in KafkaZkClient to wrap around the check and set/update/create logic?

Also, I am a little bit confused on what are the problems caused by ZookeeperClient.send() if we use zookeeper.multi() for MultiOpRequest. Can you give me more contexts on that?

Copy link
Member

@lindong28 lindong28 May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is reasonable not to have kafka specific thing in ZookeeperClient. On the other hand we don't have to -- we can provide anther zk path and expected version as parameters to these APIs, the API should return proper error without executing the original request if the version of the path is different from the expected version. This solution is probably not Kafka specific.

Not sure that we don't need arbitrary multi() operations in the near future. Currently we only need to check the zk version of another path when controller changes zookeeper state.

After checking the code, it seems that there is no current problem caused by using MultiOpRequest. But some information (e.g. CreateResponse.name and SetDataResponse.stat) is discarded in the response which may potentially be problem in the future. In general it seems more flexible to be able to use different case class for different requests so that we can have different parameters (as is the case now) and apply different processing logic to different case class. Just my opinion. If you like the current solution, maybe you can keep it and other committers can comment on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. That makes sense to me. I will update the PR to differentiate Set/Update/Create with check parameters provided in ZookeeperClient to expose different information. Thanks for the explanation.

@hzxa21
Copy link
Contributor Author

hzxa21 commented Jun 6, 2018

@lindong28 Dong, I have updated the PR to address the comments. Could you take a second look? Thanks!

@lindong28 lindong28 self-assigned this Jun 10, 2018
Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! Left some minor comments.

@@ -108,29 +111,32 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
/**
* Sets topic partition states for the given partitions.
* @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
* @param controllerEpochVersion expected controller epoch zkVersion to check.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the comment be expected controller epoch zkVersion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Will fix it.

}

def metadata: ResponseMetadata
}

case class ZkCheckResult(path: String, resultCode: Code) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the path field here? Can we remove this class and replace it with e.g. ZkVersionCheckResultCode: resultCode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path is needed to generate keeper exception with path information without assuming it to be controller epoch path. If we are going to generate the ControllerEpochZkVersionMismatch exception in KafkaZkClient instead of KeeperException in ZookeeperClient, I think we can remove the path and just keep the resultCode.

if (checkSucceed)
updatesToRetry += partition
else
failed.put(partition, setDataResponse.resultException.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general we want to the exception returned to the caller to uniquely identify the problem. But here we can output the BadVersionException if either the controller epoch zkversion is bad or the data znode zkversion is bad. It maybe confusing.

It is probably simpler to create a new exception, e.g. ControllerEpochZkVersionMismatchException and do the following before checking setDataResponse.resultCode:

if (setDataResponse.zkVersionCheckResultCode != Code.OK)
  failed.put(partition, new ControllerEpochZkVersionMismatchException())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. It is simpler and cleaner this way.

Btw, I think we can just use ControllerMoveException() in this case.

Also, related to your comment on my last commit, instead of putting the exception in some data structures, can we just simply throw the exception when the check fails? In this case, we can skip processing unnecessary controller event until we hit ControllerChange event. To optimize it further, we can also catch 'ContollerMoveException' explicitly in the ControllerEventThread and let the controller resigns immediately.

def checkInfo: Option[ZkCheckInfo]
}

case class ZkCheckInfo(checkPath: String, expectedZkVersion: Int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name it case class ZkVersionCheck(path: string, zkVersion: Int )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will do.

val path = request.path
val resultCode = Code.get(rc)

val checkReuslt = checkOpResult match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Will fix.

*/
def deleteLogDirEventNotifications(): Unit = {
def deleteLogDirEventNotifications(controllerEpochVersion: Int): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can name it to be controllerEpochZkVersion to be consistent with ControllerContext.epochZkVersion? Same for other uses of controllerEpochVersion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@@ -1370,16 +1405,26 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
* @return true if path gets deleted successfully, false if root path doesn't exist
* @throws KeeperException if there is an error while deleting the znodes
*/
def deleteRecursive(path: String): Boolean = {
def deleteRecursive(path: String, controllerEpochVersion: Int = -1): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current patch checks resultCode, and based on its value, adds additional logic to check checkResult.

Similar to the comment for updateLeaderAndIsr, could we simplify the logic here by checking deleteResponse.checkResult before the existing logic of checking the resultCode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it. Thanks for pointing out.


/** Return None if the result code is OK and KeeperException otherwise. */
def resultException: Option[KeeperException] =
if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path))
def resultException: Option[KeeperException] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we will return BadVersionException for two different scenarios. Can we output a unique exception if the controller znode has different version from what is expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

else
zooKeeper.multi(Seq(checkInfo.get.checkOp, Op.create(path, data, acl.asJava, createMode)).asJava, new MultiCallback {
override def processResult(rc: Int, path: String, ctx: scala.Any, opResults: util.List[OpResult]): Unit =
callback(generateAsyncResponseWithCheckResult(request, rc, ctx, opResults, responseMetadata(sendTimeMs)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generateAsyncResponseWithCheckResult() is called for CreateRequest, SetDataRequest and DeleteRequest. However, most of the code (or logic) in generateAsyncResponseWithCheckResult() is different for these three requests anyway. Would it be more intuitive and simpler to remove the method generateAsyncResponseWithCheckResult() and moves its request-specific logic in send()? We can put the logic that is common to all requests, e.g. the first part of generateAsyncResponseWithCheckResult(), in a method if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@ijuma ijuma added this to the 2.1.0 milestone Jun 15, 2018
@lindong28
Copy link
Member

Hey @hzxa21, is this patch ready for review?

@hzxa21
Copy link
Contributor Author

hzxa21 commented Jul 20, 2018

@lindong28 Yes. Can you help take a look?

@lindong28
Copy link
Member

@hzxa21 Sure. Can you rebase the patch to resolve the conflict?

@hzxa21
Copy link
Contributor Author

hzxa21 commented Jul 20, 2018

@lindong28 Rebased onto trunk. Thanks Dong!

Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. The patch looks much better. Left some comments.

@@ -32,7 +33,8 @@ object ControllerEventManager {
val ControllerEventThreadName = "controller-event-thread"
}
class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
eventProcessedListener: ControllerEvent => Unit) extends KafkaMetricsGroup {
eventProcessedListener: ControllerEvent => Unit,
controllerMoveListener: () => Unit) extends KafkaMetricsGroup {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename controllerMoveListener to controllerMovedListener so that it is more consistent with the existing name eventProcessedListener.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -70,7 +70,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti

// visible for testing
private[controller] val eventManager = new ControllerEventManager(config.brokerId,
controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())
controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => emptyEventQueueAndReelct())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type: should be emptyEventQueueAndReelect. Also it seems the name clearEventQueueAndReelect is more consistent with the existing method names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -86,6 +88,9 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
controllerEvent.process()
}
} catch {
case cme: ControllerMovedException =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: we typically just use e: ControllerMovedException here for simplicity. cme does not provide much information since most users would still need to read the actual type to understand what it is. If it makes sense, can you rename it here and in other places of the patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

type Response = GetDataResponse
}

case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None, checkInfo: Option[ZkVersionCheck] = None) extends AsyncRequest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the code may be more consistent and readable if we name the variable after this type. And zkVersionCheck seems more informative than the checkInfo. Can you rename the checkInfo here and in other places of the patch (including local variable)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @return SetDataResponse
*/
def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, Seq[Int]]): SetDataResponse = {
val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.NoVersion)
def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, Seq[Int]], controllerEpochZkVersion: Int): SetDataResponse = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to rename controllerEpochZkVersion to expectedControllerEpochZkVersion? The current method signature seems to suggest that the controllerEpochZkVersion will be written to the znode. If it makes sense, can you rename the variable here and in other places of the patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -592,6 +592,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(tp, reassignedPartitionContext)
} catch {
case cme: ControllerMovedException => throw cme
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case this patch causes any issue, it may be useful if we still print the message such as Error completing reassignment of partition .... If it makes sense, can you add the additional log based on the existing log (if exists) here and in other places where the ControllerMovedException is caught and thrown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1581,4 +1613,18 @@ object KafkaZkClient {
time, metricGroup, metricType)
new KafkaZkClient(zooKeeperClient, isSecure, time)
}


def controllerEpochVersionCheck(version: Int) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we specify zkVersion in the name, e.g. controllerZkVersionCheck? Also, can you add the return type to the method signature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val checkResultCode = checkOpResult match {
case _: CheckResult => Some(Code.OK)
case e: ErrorResult => Some(Code.get(e.getErr))
case _ => None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that checkOpResult should be either CheckResult or ErrorResult. Maybe we should throw IllegalStateException otherwise? By doing so we could simplify the signature of getMultiOpResults to (Code, OpResult). And we can also simplify the signature of e.g. CreateResponse such that zkVersionCheckResultCode is of type Code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -442,45 +493,51 @@ sealed trait AsyncRequest {
type Response <: AsyncResponse
def path: String
def ctx: Option[Any]
def checkInfo: Option[ZkVersionCheck]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler to name it zkVersionCheck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

}

case class ZkVersionCheck(checkPath: String, expectedZkVersion: Int) {
def checkOp = Op.check(checkPath, expectedZkVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the return type to the signature of checkOp()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@hzxa21
Copy link
Contributor Author

hzxa21 commented Aug 14, 2018

@lindong28 I have updated the PR to add more logging and address the naming issues. Could you take a look again? Thanks!

@hzxa21
Copy link
Contributor Author

hzxa21 commented Aug 21, 2018

@lindong28 Thanks for your comments. I have rebased the patch. Can you take a look?

Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @hzxa21. Left some more comments. Also, since this is a medium sized patch which changes the behavior of controller, can we have some tests for this patch?

@@ -86,6 +88,9 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
controllerEvent.process()
}
} catch {
case e: ControllerMovedException =>
info(s"Controller moved to another broker when processing $controllerEvent. Trigger controller move listener immediately", e)
controllerMovedListener
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this line invokes the callback. Maybe we should change it to controllerMovedListener.apply(). If the existing version does not actually execute this callback, then it means all existing test does not catch this issue. Then it may be worthwhile adding a test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it to controllerMovedListener.apply(). Will add a test in future commits.

@@ -598,6 +598,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(tp, reassignedPartitionContext)
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: It seems a bit confusing that we print more information (i.e. newReplicas) for ControllerMovedException than all other exception. It may be better to make the log information consistent and still print error(s"Error completing reassignment of partition $tp", e).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -618,6 +622,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
try {
partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
} catch {
case e: ControllerMovedException =>
error("Controller moved to another broker during preferred replica leader election")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still print error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e) for consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -650,7 +657,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
case _ =>
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
}
info(s"Epoch incremented to ${controllerContext.epoch}")
info(s"Epoch incremented to ${controllerContext.epoch} and epoch version is now ${controllerContext.epochZkVersion}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: Can we replace epoch version is now with epoch zk version is now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1519,6 +1530,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (wasActiveBeforeChange && !isActive) {
val curControllerEpoch = zkClient.getControllerEpoch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since KafkaController.incrementControllerEpoch() will always print controllerContext.epochZkVersion, the only extra information we are seeking here is the current zkversion of the controller epoch znode. It seems that we only need this information when the broker observes ControllerMovedException when it thinks it is controller. Since Reelect is triggered in every broker every time there is controller movement, it may not be very intuitive or necessary to print the extra log here in Relect.process().

Another thing to note that that we would like to know the zkversion of the controller epoch znode that causes the ControllerMovedException, but this zk version may have changed after the controller observes ControllerMovedException but before the controller processes Reelect event. So it is better to read the zkversion earlier (e.g. in ControllerEventThread.doWork()) than later. The best solution is probably to include the expected zkversion in the message of ControllerMovedException thrown by maybeThrowControllerMoveException().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I have removed the extra logs and included the expected zkVersion in the message of ControllerMovedException.

Some(ZkVersionCheck(ControllerEpochZNode.path, version))
}

def maybeThrowControllerMoveException(response: AsyncResponse): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that there may be other zookeeper operation other than controllerZkVersionCheck which can also check the zkversion of the corresponding znode, some response may also show zkVersionCheckResultCode != Code.OK and cause maybeThrowControllerMoveException to throw ControllerMovedException even if it is not for the controllerZkVersionCheck. Will this be a problem?

Also, any chance we can also include the expected zkversion in the message of ControllerMovedException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it is better to include the expected zkVersion in the zookeeper response and extract the information from the response when throwing ControllerMovedException. I have added zkVersionCheckResult: Option[ZkVersionCheckResult] to achieve this.

}, ctx.orNull)
case GetAclRequest(path, ctx) =>
case CreateRequest(path, data, acl, createMode, ctx, zkVersionCheck) =>
if (zkVersionCheck.isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For code style consistency, can you change the code to use one of the following styles:

if (zkVersionCheck.isEmpty) {
  ...
} else {
  ...
}

or

if (zkVersionCheck.isEmpty)
  // one line
else
  // oneline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}}, ctx.orNull)
case SetAclRequest(path, acl, version, ctx) =>
}
}, ctx.orNull)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: I am not sure what is the expected code style here. But if there is no clear standard and it is not very obvious, it is probably simpler to keep the existing style so that we avoid back-and-force change in the open source community.

@hzxa21
Copy link
Contributor Author

hzxa21 commented Aug 22, 2018

Thanks Dong for the review. I have addressed the comments and will add more tests for the controller behavior.

- Embed multi op logics in ZookeeperClient
- Ensure all zookeeper updates done by controller are protected by checking the current controller epoch zkVersion
- Modify test cases in KafkaZkClientTest to test mismatch controller epoch zkVersion
- Stop processing event when controller move nad trigger reelect immediately

Add more logging and improve variable naming
@hzxa21
Copy link
Contributor Author

hzxa21 commented Aug 22, 2018

Rebased.

Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @hzxa21. Looks pretty good now. Left some minor comments. It will be good to add tests before committing this patch.

@@ -1406,15 +1433,15 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
* @return true if path gets deleted successfully, false if root path doesn't exist
* @throws KeeperException if there is an error while deleting the znodes
*/
def deleteRecursive(path: String): Boolean = {
def deleteRecursive(path: String, expectedControllerEpochZkVersion: Int = -1): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: can we also replace -1 with ZkVersion.MatchAnyVersion?

response.zkVersionCheckResult match {
case Some(zkVersionCheckResult) =>
val zkVersionCheck = zkVersionCheckResult.zkVersionCheck
if (zkVersionCheck.checkPath.equals(ControllerEpochZNode.path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need extra indentation for the body of the if statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZooDefs seems to be unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CheckResult, ErrorResult}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckResult seems to be unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult, SetDataResult}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckResult and ErrorResult seems to be unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@lindong28
Copy link
Member

Hey @junrao, I have finished reviewing this patch except tests. I will wait for @hzxa21 to add tests. Would you like to review this patch as well?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hzxa21 : Thanks for the patch. There are 2 cases where more than one broker may act as a controller at the same time. The first case is when a controller's ZK session has expired. This is no longer an issue after KAFKA-5642 since the expired controller won't be able to write to ZK in the old ZK session. The second case is when the controller path in ZK is explicitly removed (e.g. by admin to force a controller change). This patch will provide better fencing in the second case, which is useful.

Made a pass of non-testing files. Looks good overall. Left a few comments.

if (zkVersionCheck.checkPath.equals(ControllerEpochZNode.path))
zkVersionCheckResult.opResult match {
case errorResult: ErrorResult =>
if (errorResult.getErr != Code.OK.intValue())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, technically, only when the error code is BADVERSION, it's an indication that the controller has moved. For other errors, we probably just want to propagate as they are to the caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. Fixed.

@@ -86,6 +88,9 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
controllerEvent.process()
}
} catch {
case e: ControllerMovedException =>
info(s"Controller moved to another broker when processing $controllerEvent. Trigger controller move listener immediately", e)
controllerMovedListener.apply()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this just be controllerMovedListener()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Fixed.

@@ -239,9 +239,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)

info("Deleting log dir event notifications")
zkClient.deleteLogDirEventNotifications()
zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to be a bit careful about bumping up the controller epoch at the beginning of onControllerFailover(). Currently, the reading and the incrementing of the controller epoch is done independently after the controller path has been created successfully. This can create the following problem. Broker A creates the controller path and is about to call onControllerFailover(). Admin deletes the controller path and broker B creates the controller path, reads the controller epoch and updates it to 1. Broker A reads the controller epoch and updates it to 2. Now broker B is the controller, but its controller epoch is outdated.

One way to address this issue is to use multi() when creating the controller path. To elect a new controller, a broker first reads the current controller epoch from ZK and then do a multi() to (1) write the controller path (2) do a conditional update to the controller epoch. Not sure if this is the best way though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. This is indeed a very dangerous race condition. If it happens, the current controller (Broker B) cannot update any zookeeper state due to controller epoch zkVersion mismatch and no other broker can become the controller because the current controller (Broker B) does not release the "lock" for \controller znode.

Wrapping \controller creation and \controller_epoch update in a zookeeper transaction can prevent this race condition and I think it is a safe option. I will make the change and see whether there will be performance overhead in the perf testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @hzxa21, it seems that what you and Jun suggested to do is to have a single multiops that 1) updates controller path, 2) read controller epoch and 3) updates controller epoch. Another alternative approach is to have a single multiops that 1) updates controller path and 2) reads controller epoch with its zkversoin. Then the controller can updates controller epoch with the addition zkversion check.

Do you think the alternative approach would avoid the race condition and ensure correctness? If so, I am wondering if the alternative would be easier to reason about. I find it a bit easier because the it follows the idea that all zookeeper write operation by controller will be based on the controller epoch zkversion check, except for the controller znode write operation which by design can not rely on the controller epoch zkversion check. And a multiop that does one write and one read seems simpler than a multiop that does write-read-write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 From a design and code readability perspective, I agree with what you have proposed (First atomic read \controller_epoch and create \controller, then update \controller_epoch). From the implementation perspective, zookeeper does not have a read Op meaning that we cannot perform read operation with the multi (see http://people.apache.org/~larsgeorge/zookeeper-1215258/build/docs/dev-api/org/apache/zookeeper/Op.html).

Basically, we use the time when a broker succeeds in incrementing the controller epoch as the "commit" point of the controller election and use the time when a broker succeeds in creating \controller znode as the "prepare" point. So for the correctness of the controller election "commit", we need to ensure \controller_epoch doesn't change from "prepare" to "commit". To achieve, we can implement the logic using zk multi following the steps:

  1. Read \controller_epoch to get the current controller epoch e1 with zkVersion v1
  2. Create \controller if \controller_epoch zkVersion matches v1 (use zk multi)
  3. Update \controller_epoch to be e1+1 if its zkVersion matches v1 (zk conditional set)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR updated to address this issue.

@@ -132,6 +133,9 @@ class PartitionStateMachine(config: KafkaConfig,
doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We log e here but not in line 260. It would be useful to be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1599,4 +1629,28 @@ object KafkaZkClient {
time, metricGroup, metricType)
new KafkaZkClient(zooKeeperClient, isSecure, time)
}


def controllerZkVersionCheck(version: Int): Option[ZkVersionCheck] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

Some(ZkVersionCheck(ControllerEpochZNode.path, version))
}

def maybeThrowControllerMoveException(response: AsyncResponse): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Left some comments.

@@ -70,7 +75,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti

// visible for testing
private[controller] val eventManager = new ControllerEventManager(config.brokerId,
controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())
controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => markInactiveAndResign())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would logic be more intuitive to just treat the ControllerMovedException as ControllerChange event and do maybeResign()? The code would be simpler since this approach doesn't need markInactiveAndResign.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Fixed.

@@ -86,6 +89,9 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
controllerEvent.process()
}
} catch {
case e: ControllerMovedException =>
info(s"Controller moved to another broker when processing $controllerEvent. Trigger controller move listener immediately", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: "controller move listener" -> "controllerMovedListener".

Also, it seems simpler to just remove Trigger controller move listener immediately as we typically do not log which method is executed next other than logging the event itself. Developer is expected look into the code and understand what happens next in the code after this event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -52,6 +52,11 @@ object KafkaController extends Logging {
override def process(): Unit = ()
}

private[controller] case class AwaitOnLatch(latch: CountDownLatch) extends ControllerEvent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove AwaitOnLatch if it is not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AwaitOnLatch is used in ControllerIntegrationTest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Could you add a comment above the case class `` that says Used only by test? This is similar to e.g. `ReplicaManager.markPartitionOffline(...)`.

maybeCreateControllerEpochZNode()

// Read /controller_epoch to get the current controller epoch and zkVersion
val (curEpoch, curEpochStat) = getControllerEpoch.get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line throws NoSuchElementException if controller epoch does not exist. It seems better to do getControllerEpoch.getOrElse(throw new IllegalStateException("...")).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

error(s"Error while creating ephemeral at ${ControllerZNode.path}, node already exists and owner " +
s"'${getDataResponse.stat.getEphemeralOwner}' does not match current session '${zooKeeperClient.sessionId}'")
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
case code@Code.OK =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more consistent with the other code in this method to do case Code.OK =>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

onControllerFailover()
} catch {
case e: ControllerMovedException =>
error(s"Error while electing or becoming controller on broker ${config.brokerId} " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When SRE deletes controller znode, multiple brokers may be doing elect() concurrently and all but one broker will find that the controller znode alread exists.

Prior to this patch, these brokers will log debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}") if controller znode exists and the controller id is not this broker.

After this patch, these brokers will log error(s"Error while creating ephemeral at ${ControllerZNode.path}, node already exists and owner ${getDataResponse.stat.getEphemeralOwner} does not match current session ${zooKeeperClient.sessionId}") and error(s"Error while electing or becoming controller on broker ${config.brokerId} because controller moved to another broker", e)
if controller znode exists and the controller id is not this broker.

Since we expect most brokers to find znode to be created by another broker during elect(), we probably want to keep the old behavior instead of having error level logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Fixed.

throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
case code@Code.OK =>
getControllerEpoch match {
case Some((epoch, stat)) if epoch == newControllerEpoch =>
Copy link
Member

@lindong28 lindong28 Sep 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we can enter this state only if broker executes registerControllerAndIncrementControllerEpoch() and finds that the controller znode has already been created by itself. The question is, is this possible?

Previously if broker tries to create controller znode and node already exists, the broker will simply read the controller id from the controller znode and move on. This patches added quite a few new logic in controllerNodeExistsHandler(), e.g. uses zk session id to detect whether the controller znode is created by this broker, handles the scenario that the controller znode is created by this broker. So the new code is more complicated than the previous version. Can you explain a bit why we need these new logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In short, the purpose of controllerNodeExistsHandler is to mimic checkedEphemeralCreate , which previously is used to create /controller ephemeral node. In CheckedEphemeral, we need to double check the owner of the node if we saw Code.NODEEXISTS.

I think the purpose of the check and the additional logic is to handle transient network connection loss while creating the ephemeral. Let's say our client sent a create request to zookeeper to create ephemeral znode and zookeeper receives this request and successfully creates the znode but fail to send back the response to our client because of transient network issue. In retryUntilConnected, our client tries to resend the request and gets the Code.NODEEXISTS. In this case, our client actually successfully creates and owns the znode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The creation of the ephemeral znode /controller is probably a bit different from the creation of other ephemeral znode. The broker which creates the ephemeral znode /controller is explicitly specified in the znode data. Thus the old approach, which reads the broker id from the controller znode after seeing Code.NODEEXISTS, seems OK. And that old approach seems to handle the network connection loss scenario described here. I am wondering if we can use the old approach since its logic looks simpler. What do you think?

Copy link
Member

@lindong28 lindong28 Sep 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, for the case getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId, we know this case may happen and the code will automatically recover from this. In this case it is probably better to log at warning level instead of error level. Here is a good explanation for how to choose log level. https://stackoverflow.com/questions/2031163/when-to-use-the-different-log-levels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little bit confused about what do you mean by the old approach. Are you referring to checkedEphemeralCreate? The logic in controllerNodeExistsHandler is essentially the same as checkedEphmeralCreate when the node already exists, except that controllerNodeExistsHandler will read /controller_epoch to get back the epoch zkVersion when the owner of /controller is the current broker.

Copy link
Member

@lindong28 lindong28 Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. I missed the fact that the controller znode was created using KafkaZkClient.checkedEphemeralCreate() which has the logic similar to what you are doing here.

It seems that the most important logic in the KafkaZkClient.checkedEphemeralCreate is to translate Code.NODEEXISTS to Code.OK for the znode creation operation if getDataResponse.stat.getEphemeralOwner == zooKeeperClient.sessionId. This logic was added in #3765 by Onur. My understanding is that, in case of connection issue between broker and zookeeper, it is possible for controller znode to be successfully created and yet the return code is Code.NODEEXISTS. KafkaZkClient.checkedEphemeralCreate will handle this scenario properly. It will be good for @onurkaraman to clarify whether this understanding is correct so that we can decide whether we should keep this logic.

Here is another question. With the current patch, if the controller znode creation has failed due to znode exists exception and then broker find that getDataResponse.stat.getEphemeralOwner == zooKeeperClient.sessionId, it seems registerControllerAndIncrementControllerEpoch() can return (newControllerEpoch, stat.getVersion) if epoch == newControllerEpoch. But is controller epoch incremented in this case? If not, then it seems something is wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-think about your previous suggestion for checking the payload of /controller only and I think it will work. I will check with Onur offline to understand more about checkedEphemeralCreate and confirm.

In terms of your second concern, if we already see getDataResponse.stat.getEphemeralOwner == zooKeeperClient.sessionId, that means /controller has been created successfully. Since the only code path to create /controller is within a zookeeper transaction along with the /controller_epoch update, we can infer that the controller epoch must get incremented in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with Onur offline, the purpose of getAfterNodeExists in CheckedEphemeral is indeed used to handle the case when zk connection loss happens. After digging around both zookeeper and kafka codes, we think it is safe to remove the extra complexity for controllerNodeExistsHandler in this PR when we make /controller creation and /controller_epoch update atomic.

So the logic will be:
1). Try to create /controller_epoch if not exists
2). Read /controller_epoch from zk
3). Atomically create /controller and update /controller_epoch
4). If 3) throws NodeExistsException, read /controller and if controller id in zk equals the current broker id and if controller epoch in zk equals the expected epoch, successfully finish controller election; Otherwise, throw ControllerMovedException.

if (errorCode == Code.BADVERSION)
// Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails
throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${zkVersionCheck.expectedZkVersion}")
else if (errorCode != Code.OK)
Copy link
Member

@lindong28 lindong28 Sep 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for error code to be Code.OK while zkVersionCheckResult.opResult is of type ErrorResult?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For example, if we wrap check + create in zookeeper multi, and multi fails due to create fails, the result of check will be of type ErrorResult with Code.OK as error code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I see.

response.zkVersionCheckResult match {
case Some(zkVersionCheckResult) =>
val zkVersionCheck = zkVersionCheckResult.zkVersionCheck
if (zkVersionCheck.checkPath.equals(ControllerEpochZNode.path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for ControllerEpochZNode.path to be different from zkVersionCheck.checkPath?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no. The check here is for general purpose and safety because the zkVersionCheck can apply on any znode if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

@hzxa21
Copy link
Contributor Author

hzxa21 commented Sep 2, 2018

@lindong28 Thanks for the review. I have updated the PR to address your comments. Appreciated if you can take a look again when available.

@hzxa21
Copy link
Contributor Author

hzxa21 commented Sep 4, 2018

Perf testing has finished. Overall, there is no significant overhead after fencing zookeeper updates for common controller events (ControllerFailOver, ControlledShutdown, BrokerStartUp, PreferredReplicaLeaderElection).

The environment:

  • 5 node zookeeper and 5 broker kafka cluster with brokers on different racks
  • 2,000 topics each with 50 partitions and RF = 1
  • 10k single partition topics with RF=1 + 10k 3 partitions topics with RF=2

Here are the results:

1. Controller fails over
Trunk (bf0675) - run 1

[2018-09-01 00:47:20,564] INFO [Controller id=1495] 1495 successfully elected as the controller (kafka.controller.KafkaController)
[2018-09-01 00:47:27,938] INFO [Controller id=1495] Starting the controller scheduler (kafka.controller.KafkaController)

Trunk (bf0675) - run 2

[2018-09-01 00:54:44,615] INFO [Controller id=1496] 1496 successfully elected as the controller (kafka.controller.KafkaController)
[2018-09-01 00:54:59,529] INFO [Controller id=1496] Starting the controller scheduler (kafka.controller.KafkaController)

KAFKA-6082 - run 1

[2018-09-01 00:23:45,949] INFO [Controller id=1495] 1495 successfully elected as the controller. Epoch incremented to 9 and epoch zk version is now 9 (kafka.controller.KafkaController)
[2018-09-01 00:23:53,251] INFO [Controller id=1495] Starting the controller scheduler (kafka.controller.KafkaController)

KAFKA-6082 - run 2

[2018-09-01 00:29:08,524] INFO [Controller id=1494] 1494 successfully elected as the controller. Epoch incremented to 10 and epoch zk version is now 10 (kafka.controller.KafkaController)
[2018-09-01 00:29:19,121] INFO [Controller id=1494] Starting the controller scheduler (kafka.controller.KafkaController)

Trunk avg = ~11s
KAFKA-6082 avg = ~9s

2. Preferred replica leader election
Trunk (bf0675) - ~2.4k leadership movements

[2018-09-03 23:25:12,201] INFO [Controller id=1497] Starting preferred replica leader election for partitions
[2018-09-03 23:25:12,692] INFO [Controller id=1497] Partition 10-1149-0 completed preferred replica leader election. New leader is 1496 (kafka.controller.KafkaController)

KAFKA-6082 - ~2.4k leadership movements

[2018-09-04 04:35:25,592] INFO [Controller id=1497] Starting preferred replica leader election for partitions
...
[2018-09-04 04:35:26,136] INFO [Controller id=1497] Partition 7-1801-0 completed preferred replica leader election. New leader is 1495 (kafka.controller.KafkaController

Trunk = 491ms
KAFKA-6082 = 544ms

Trunk (bf0675) - ~4.8k leadership movements

[2018-09-03 23:25:09,378] INFO [Controller id=1497] Starting preferred replica leader election for partitions ...
...
[2018-09-03 23:25:10,482] INFO [Controller id=1497] Partition 8-1915-0 completed preferred replica leader election. New leader is 1497 (kafka.controller.KafkaController)

KAFKA-6082 - broker 1497 (~4.8k leader)

[2018-09-04 04:35:22,313] TRACE [Controller id=1497] Starting preferred replica leader election for partitions
...
[2018-09-04 04:35:23,397] INFO [Controller id=1497] Partition 8-1915-0 completed preferred replica leader election. New leader is 1497 (kafka.controller.KafkaController)

Trunk = 1.104s
KAFKA-6082 = 1.084s

Trunk (bf0675) - ~6k leadership movements

[2018-09-03 23:25:10,521] INFO [Controller id=1497] Starting preferred replica leader election for partitions ...
...
[2018-09-03 23:25:11,860] INFO [Controller id=1497] Partition 5-1613-2 completed preferred replica leader election. New leader is 1494 (kafka.controller.KafkaController)

KAFKA-6082 - broker 1494 (~6k leadership movements)

[2018-09-04 04:35:23,431] INFO [Controller id=1497] Starting preferred replica leader election for
...
[2018-09-04 04:35:24,583] INFO [Controller id=1497] Partition 5-1613-2 completed preferred replica leader election. New leader is 1494 (kafka.controller.KafkaController)

Trunk = 1.339s
KAFKA-6082 = 1.152s

3. Controlled shutdown
Trunk (bf0675) - run 1

[2018-09-04 00:05:28,098] INFO [Controller id=1497] Shutting down broker 1495 (kafka.controller.KafkaController)
[2018-09-04 00:05:31,773] TRACE [Controller id=1497] All leaders = ...

Trunk (bf0675) - run 2

[2018-09-04 04:15:05,235] INFO [Controller id=1494] Shutting down broker 1496 (kafka.controller.KafkaController)
[2018-09-04 04:15:08,823] TRACE [Controller id=1494] All leaders = ...

KAFKA-6082 - run 1

[2018-09-04 04:51:10,175] INFO [Controller id=1497] Shutting down broker 1495 (kafka.controller.KafkaController)
[2018-09-04 04:51:13,825] TRACE [Controller id=1497] All leaders ...

KAFKA-6082 - run 2

[2018-09-04 05:11:32,197] INFO [Controller id=1497] Shutting down broker 1496 (kafka.controller.KafkaController)
[2018-09-04 05:11:35,846] TRACE [Controller id=1497] All leaders ...

Trunk avg = 3.63s
KAFKA-6082 avg = 3.65s

4. Broker start
Trunk (bf0675) - run 1

[2018-09-04 04:19:20,353] INFO [Controller id=1494] Newly added brokers: 1496, deleted brokers: , all live brokers: 1493,1494,1495,1496,1497 (kafka.controller.KafkaController)
[2018-09-04 04:19:25,233] DEBUG [Controller id=1494] Register BrokerModifications handler for ArrayBuffer(1496) (kafka.controller.KafkaController)

Trunk (bf0675) - run 2

[2018-09-04 00:09:49,320] INFO [Controller id=1497] Newly added brokers: 1495, deleted brokers: , all live brokers: 1493,1494,1495,1496,1497 (kafka.controller.KafkaController)
[2018-09-04 00:09:53,772] DEBUG [Controller id=1497] Register BrokerModifications handler for ArrayBuffer(1495) (kafka.controller.KafkaController)

KAFKA-6082 - run 1

[2018-09-04 04:53:49,998] INFO [Controller id=1497] Newly added brokers: 1495, deleted brokers: , all live brokers: 1493,1494,1495,1496,1497 (kafka.controller.KafkaController)
[2018-09-04 04:53:52,314] DEBUG [Controller id=1497] Register BrokerModifications handler for ArrayBuffer(1495) (kafka.controller.KafkaController)

KAFKA-6082 - run 2

[2018-09-04 05:13:23,159] INFO [Controller id=1497] Newly added brokers: 1496, deleted brokers: , all live brokers: 1493,1494,1495,1496,1497 (kafka.controller.KafkaController)
[2018-09-04 05:13:25,254] DEBUG [Controller id=1497] Register BrokerModifications handler for ArrayBuffer(1496) (kafka.controller.KafkaController)

Trunk avg = 4.67s
KAFKA-6082 avg = 2.21s

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hzxa21 : Thanks for the updated patch. Added a few more comments below.

debug(s"Try to create ${ControllerZNode.path} and increment controller epoch to $newControllerEpoch with expected controller epoch zkVersion $expectedControllerEpochZkVersion")
try {
val transaction = zooKeeperClient.createTransaction()
transaction.check(ControllerEpochZNode.path, expectedControllerEpochZkVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are doing a conditional setData in line 117, we don't need the check operation here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. Fixed.

transaction.create(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp),
acls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
transaction.setData(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion)
val results = transaction.commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the client loses a connection to a ZK server in the middle of an operation, the client will get a ConnectionLossException. Normally, we handle this by retrying through retryRequestsUntilConnected(). So, we probably need to create a similar routine to retry on ConnectionLossException when doing transaction.commit() too.

The controller path could have been created successfully when ConnectionLossException was incurred. A retry could result in either NodeExistsException or BadVersionException. You handled the former properly in the code below. We will need to do the same thing for the latter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
case _: BadVersionException =>
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For any other types of exceptions, we want to just propagate the KeeperException to the caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we don't catch other types of exceptions so the KeeperException is already propogated to the caller.

val timestamp = time.milliseconds()

// Create /controller_epoch if not exists
maybeCreateControllerEpochZNode()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the common case, the controller epoch path already exists. So, perhaps it would be better to always do getControllerEpoch first and then try maybeCreateControllerEpochZNode if we hit a NoNodeException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private def maybeCreateControllerEpochZNode(): Unit = {
createControllerEpochRaw(KafkaController.InitialControllerEpoch - 1).resultCode match {
case Code.OK =>
info(s"Successfully create ${ControllerEpochZNode.path} with initial epoch ${KafkaController.InitialControllerEpoch - 1}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The info level will be too verbose if we call maybeCreateControllerEpochZNode() on every controller election.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log will only get print when /controller_epoch is absent, which is typically when the cluster gets initialized, not on every controller election.


val latch = new CountDownLatch(1)

// Let the controller event thread await on a latch before the pre-defined logic is triggered and before it processes controller change.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"before the pre-defined logic is triggered and before it processes controller change." It seems that we just need one of the two before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@junrao
Copy link
Contributor

junrao commented Sep 5, 2018

@hzxa21 : Thanks for the perf results. They look good.

s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " +
s"Aborting controller startup procedure"))
if (controllerId == curControllerId) {
val (epoch, stat) = getControllerEpoch.getOrElse(throw new ControllerMovedException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I am not sure this is safe. The controller path could have been deleted and grabbed by another broker in the window between line 123 and here. Then, we would have grabbed the wrong controller epoch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. But in line 133 we will check the epoch value. If it is different from what we expected, we will throw ControllerMovedException. In the case of /controller gets deleted between line 123 and 127, and another broker becomes the controller, the controller epoch will increment accordingly, causing line 133 to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Ok. That's fine then.

// transaction succeeds based on the controller znode verification. Other rounds of controller
// election will result in larger epoch number written in zk.
if (epoch == newControllerEpoch)
(newControllerEpoch, stat.getVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we explicitly call return here? Otherwise, it seems that we are always throwing ControllerMovedException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah... My bad. Fixed.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hzxa21 : Thanks for the new update. Still a few more comments below.

throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
case _: BadVersionException =>
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
case _: ConnectionLossException => tryCreateControllerZNodeAndIncrementEpoch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On ConnectionLossException, it's not efficient to blindly retry immediately. Instead, it's better to wait until the ZK connection is ready before retry. You can check how this is done in retryRequestsUntilConnected().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for pointing this out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

info(s"Successfully create ${ControllerEpochZNode.path} with initial epoch ${KafkaController.InitialControllerEpoch - 1}")
(KafkaController.InitialControllerEpoch - 1, KafkaController.InitialControllerEpochZkVersion - 1)
case Code.NODEEXISTS =>
throw new ControllerMovedException(s"Controller moved to another broker while creating ${ControllerEpochZNode.path}. Aborting controller startup procedure")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that the controller_epoch path is created by another broker between getControllerEpoch() and createControllerEpochZNode(). In this case, we want to get the controller epoch again, instead of throwing ControllerMovedException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the controller_epoch path is created by another broker between getControllerEpoch() and createControllerEpochZNode(), I was thinking whether we can infer that other broker wins in this round of controller election even if it hasn't created the controller znode.

After a second thought, I think we should follow what you suggested for extra safety because if the broker fails to talk to zk for some reason, the cluster will get into a no-controller state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

createControllerEpochRaw(KafkaController.InitialControllerEpoch - 1).resultCode match {
case Code.OK =>
info(s"Successfully create ${ControllerEpochZNode.path} with initial epoch ${KafkaController.InitialControllerEpoch - 1}")
(KafkaController.InitialControllerEpoch - 1, KafkaController.InitialControllerEpochZkVersion - 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that every usage of KafkaController.InitialControllerEpoch and KafkaController.InitialControllerEpochZkVersion as 0 requires subtraction by 1. Could we just define them as 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

}
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
case _: BadVersionException =>
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to ZK doc, multi propagates the error from one of the operations. The BadVersionException could be the result of a retry after ConnectionLossException. So, it seems that we need to handle it in the same way as NodeExistsException.

Copy link
Contributor Author

@hzxa21 hzxa21 Sep 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong, I think there are two cases when we see BadVersionException here:

  1. Another round of controller election kicks in and the controller does switch. It is safe to throw ControllerMovedException in this case.
  2. The current broker loss zk connection after zk successfully finishes the transaction, but the controller znode is gone before the next retry. In this case, another round of controller election will be triggered by zk watcher handleDeleted. So I think it is also safe to throw ControllerMovedException here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about case 2, but with the controller path still there. The ZK multi api doesn't say that it will run the operations in a multi request in any particular order. So, during the retry on a ConnectionLossException, it may be possible that the conditional update of the controller epoch path is executed first and a BadVersionException is thrown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the prompt reply!
I actually checked both client and server side codes of zookeeper, the implementation honors the order and the thrown exception will correspond to the first error it sees. But since the muli api doesn't explicitly say that it is the case or it will maintain this guarantee in the future, I agree to also handle BadVersion the same way as NodeExists for safety given that the performance overhead is little.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1406,15 +1494,15 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
* @return true if path gets deleted successfully, false if root path doesn't exist
* @throws KeeperException if there is an error while deleting the znodes
*/
def deleteRecursive(path: String): Boolean = {
def deleteRecursive(path: String, expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing expectedControllerEpochZkVersion params in method comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.



private def controllerZkVersionCheck(version: Int): Option[ZkVersionCheck] = {
if (version < 0)
Copy link
Contributor

@omkreddy omkreddy Sep 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use InitialControllerEpochZkVersion constant in place of zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done.

@@ -39,10 +39,11 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
override def setUp() {
super.setUp()
zkClient.makeSurePersistentPathExists(TopicZNode.path(topic))
zkClient.makeSurePersistentPathExists(ControllerEpochZNode.path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this line is not required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. Removed.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hzxa21 : Thanks for the new updates. Just a couple of more comments below.

tryCreateControllerZNodeAndIncrementEpoch()
}

private def createControllerEpochZNode(): (Int, Int) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's better to name this maybeCreateControllerEpochZNode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
case _: BadVersionException =>
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about case 2, but with the controller path still there. The ZK multi api doesn't say that it will run the operations in a multi request in any particular order. So, during the retry on a ConnectionLossException, it may be possible that the conditional update of the controller epoch path is executed first and a BadVersionException is thrown?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hzxa21 : Thanks for the latest patch. LGTM. Just a minor comment below. I will let Dong make another pass of the latest patch before merging.

testControllerMove(() => zkClient.createPartitionReassignment(reassignment))
}

def testControllerMove(fun: () => Unit): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@hzxa21
Copy link
Contributor Author

hzxa21 commented Sep 6, 2018

@junrao Thanks so much for all of your comments and suggestions.

Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @hzxa21. Looks pretty good now. I left some comments regarding corner cases.

// transaction succeeds based on the controller znode verification. Other rounds of controller
// election will result in larger epoch number written in zk.
if (epoch == newControllerEpoch)
return (newControllerEpoch, stat.getVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two questions here:

  1. When would we enter a scenario that controllerId == curControllerId and epoch != newControllerEpoch)?

  2. Currently when this happens, checkControllerAndEpoch will throw ControllerMovedException(), which is caught in KafkaController.elect() and trigger maybeResign(). However maybeResign() will do nothing because controllerId == curControllerId and this broker is considered to be the active controller. In this case no other broker will be controller. And the current broker will not function properly as controller because it has not executed onControllerFailover().

So maybe we should throw IllegalStateException here if controllerId == curControllerId and epoch != newControllerEpoch)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Because we first get /controller and then get /controller_epoch (rather than do it atomically), it is possible that after we see controllerId == curControllerId and before we get /controller_epoch, another round of controller election is triggered. In this case, we will see epoch != newControllerEpoch.
  2. When we see epoch != newControllerEpoch, controllerId == curControllerId does not hold because another broker must become the controller. In this case, maybeResign will work fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. This makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hzxa21 : Actually, I am wondering if it's simpler to replace lines 116-124 with a check that the ephemeral owner of the controller path equals to the ZK session id (like we did before in checkedEphemeralCreate()). The controller path or the controller epoch path could change after that check. But that's fine and will be handled by the next ZK event on the controller path change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safe to replace the /controller payload check with session id check but we still need to read /controller_epoch to get back the corresponding zk version if we loss connection when doing the zk transaction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, to me, if the ephemeral owner of the controller path equals to the ZK session id, it means that at that particular time, the controller path and the controller epoch path are created by the current ZK session, and therefore the controller epoch used for creation should be valid. This seems to be equivalent as checking the value of the controller path and the controller epoch value. In both cases, after the check, the controller could change again. However, that will be handled by the ZK watcher event.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the approach using ephemeral owner has the same performance and correctness guarantee as the current approach which uses epoch from the znode data.

The approach using ephemeral owner is probably more intuitive/readable because it exactly handles the root cause of NodeExistsException | _: BadVersionException, i.e. checkControllerAndEpoch() should effectively translate NodeExistsException/BadVersionException to Code.OK if and only if ephemeral owner of the controller path equals to the ZK session id. And it is also more consistent with the existing logic in checkedEphemeralCreate().

@hzxa21 If it sounds reasonable, maybe we can have a minor followup patch (without requiring a JIRA ticket) to improve it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that the ephemeral owner approach will be cheaper since we only need to read the controller path, not both controller path and the controller epoch path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao From the correctness and performance point of view, checking /controller payload and checking /controller ephemeral owner is the same. The question is whether we need to read /controller_epoch. The reason why I do it is because if NodeExistsException| BadVersionException happens, we no longer have the Stat (new zkVersion) of /controller_epoch even though the /controller_epoch update succeeds in zookeeper server. We can avoid the extra read on /controller_epoch if we can assume that zkVersion is always incremented by one. Since /controller_epoch zkVersion is critical for us after this patch and zookeeper doc does not explicitly say that this assumption holds, I think it is safer to do one extra read during controller election.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hzxa21 : That's a great point. Thanks. Then we can just keep the code as it is.

s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " +
s"Aborting controller startup procedure"))
if (controllerId == curControllerId) {
val (epoch, stat) = getControllerEpoch.getOrElse(throw new ControllerMovedException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In maybeCreateControllerEpochZNode(), we throw IllegalStateException(...) if we first find controller epoch znode exists and then find it disappeared. Following the same logic, it is probably consistent and reasonable to throw IllegalStateException(...) if checkControllerAndEpoch(...) can not read controller epoch, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me. Will fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

onControllerResignation()
activeControllerId = -1
zkClient.deleteController()
if (!isActive)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If registerControllerAndIncrementControllerEpoch() has successfully written the broker id to controller znode but then an IllegalStateException is thrown (e.g. in the case controllerId == curControllerId and epoch != newControllerEpoch described in the other comment), an IllegalStateException will be thrown which is caught in elect() and triggerControllerMove() will be executed. However, since the activeControllerId has not been updated, isActive() is evaluated to false and triggerControllerMove() will do nothing.

Maybe we should first do activeControllerId = zkClient.getControllerId.getOrElse(-1) in triggerControllerMove().

Also, to be consistent with most other usage of isActive(), can we do something like the code below instead of using if/else?

    if (!isActive) {
      warn("Controller has already moved when trying to trigger controller movement")
      return
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Fixed.

val expectedControllerEpochZkVersion = controllerContext.epochZkVersion
activeControllerId = -1
onControllerResignation()
zkClient.deleteController(expectedControllerEpochZkVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: can we use controllerContext.epochZkVersion instead of using expectedControllerEpochZkVersion to be consistent with other usage of controllerContext.epochZkVersion in this patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In onControllerResignation, controllerContext will be reset. So here we need to keep the value before onControllerResignation and reuse it in deletion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense.

private def maybeCreateControllerEpochZNode(): (Int, Int) = {
createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match {
case Code.OK =>
info(s"Successfully create ${ControllerEpochZNode.path} with initial epoch ${KafkaController.InitialControllerEpoch}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: Successfully create => Successfully created

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@lindong28 lindong28 closed this in 297fb39 Sep 7, 2018
@lindong28
Copy link
Member

Thanks much for the patch @hzxa21. LGTM. Merged to trunk.

@hzxa21
Copy link
Contributor Author

hzxa21 commented Sep 7, 2018

@lindong28 Thanks a lot for the reviews.

@ijuma
Copy link
Contributor

ijuma commented Sep 9, 2018

Did we check the performance impact of this change? If so, it would be great to include the details in the PR description.

@junrao
Copy link
Contributor

junrao commented Sep 11, 2018

@ijuma : There were some perf results. See the above comment on Sep. 4.

@ijuma
Copy link
Contributor

ijuma commented Sep 11, 2018

Thanks @junrao. That's great. I suggest adding a summary of the results to the PR description.

@hzxa21
Copy link
Contributor Author

hzxa21 commented Sep 11, 2018

@ijuma Thanks for the suggestion. I have updated the description to include the perf test.

jonlee2 pushed a commit to linkedin/kafka that referenced this pull request Jan 8, 2019
… zkVersion

This PR aims to enforce that the controller can only update zookeeper states after checking the controller epoch zkVersion. The check and zookeeper state updates are wrapped in the zookeeper multi() operations to ensure that they are done atomically. This PR is necessary to resolve issues related to multiple controllers (i.e. old controller updates zookeeper states before resignation, which is possible during controller failover based on the single threaded event queue model we have)

This PR includes the following changes:
- Add MultiOp request and response in ZookeeperClient
- Ensure all zookeeper updates done by controller are protected by checking the current controller epoch zkVersion
- Modify test cases in KafkaZkClientTest to test mismatch controller epoch zkVersion

Tests Done:
- Unit tests (with updated tests to test mismatch controller epoch zkVersion)
- Existing integration tests

Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Dong Lin <lindong28@gmail.com>, Manikumar Reddy O <manikumar.reddy@gmail.com>

Closes apache#5101 from hzxa21/KAFKA-6082
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
This PR aims to enforce that the controller can only update zookeeper states after checking the controller epoch zkVersion. The check and zookeeper state updates are wrapped in the zookeeper multi() operations to ensure that they are done atomically. This PR is necessary to resolve issues related to multiple controllers (i.e. old controller updates zookeeper states before resignation, which is possible during controller failover based on the single threaded event queue model we have)

This PR includes the following changes:
- Add MultiOp request and response in ZookeeperClient
- Ensure all zookeeper updates done by controller are protected by checking the current controller epoch zkVersion
- Modify test cases in KafkaZkClientTest to test mismatch controller epoch zkVersion

Tests Done:
- Unit tests (with updated tests to test mismatch controller epoch zkVersion)
- Existing integration tests

Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Dong Lin <lindong28@gmail.com>, Manikumar Reddy O <manikumar.reddy@gmail.com>

Closes apache#5101 from hzxa21/KAFKA-6082
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants