Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5028: convert kafka controller to a single-threaded event queue model #2816

Closed
wants to merge 19 commits into from

Conversation

onurkaraman
Copy link
Contributor

The goal of this ticket is to improve controller maintainability by simplifying the controller's concurrency semantics. The controller code has a lot of shared state between several threads using several concurrency primitives. This makes the code hard to reason about.

This ticket proposes we convert the controller to a single-threaded event queue model. We add a new controller thread which processes events held in an event queue. Note that this does not mean we get rid of all threads used by the controller. We merely delegate all work that interacts with controller local state to this single thread. With only a single thread accessing and modifying the controller local state, we no longer need to worry about concurrent access, which means we can get rid of the various concurrency primitives used throughout the controller.

Performance is expected to match existing behavior since the bulk of the existing controller work today already happens sequentially in the ZkClient’s single ZkEventThread.

@onurkaraman
Copy link
Contributor Author

This PR passed the system tests:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/826/

@asfbot
Copy link

asfbot commented Apr 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2782/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2778/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2778/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman : Thanks for the patch. A few quick comments.

private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
private val partitionReassignmentListener = new PartitionReassignmentListener(this, controllerEventQueue)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this, controllerEventQueue)
private val isrChangeNotificationListener = new IsrChangeNotificationListener(this, controllerEventQueue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are quite a few listeners. The first thing that they all do is do read the current data from ZK and figure out the changes. Could we at least share that part of the code for all listeners?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sharing this logic at the listener layer would complicate the concurrency semantics since figuring out what has changed would require looking at controller local state while the listener is being executed from the ZkEventThread. This is what this PR is trying to avoid.

Even if we push the shared logic to the ControllerEvent layer, it doesn't seem like we'd actually simplify things since each listener reads and observes what's changed differently.

info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
info("starting the controller scheduler")
kafkaScheduler.startup()
kafkaScheduler.schedule("controller-metric-task", () => controllerEventQueue.put(UpdateMetrics), period = 10, unit = TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update metrics in a scheduler? It seems that those metrics can only change after the processing of each event. If so, we can just update the metrics at the end of each event processing.

Copy link
Contributor Author

@onurkaraman onurkaraman Apr 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting UpdateMetrics from an event to a method run at the end of every event uncovered a bug in existing code:
If a topic is created while the replica set is offline, the partition would be defined in partitionReplicaAssignment but not in partitionLeadershipInfo. Computing the PreferredReplicaImbalanceCount during this time will result in NoSuchElementException when looking up the partition on the partitionLeadershipInfo.

newGauge(
  "PreferredReplicaImbalanceCount",
  new Gauge[Int] {
    def value(): Int = {
      inLock(controllerContext.controllerLock) {
        if (!isActive)
          0
        else
          controllerContext.partitionReplicaAssignment.count {
            case (topicPartition, replicas) => 
              (controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head 
              && (!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic))
              )
          }
      }
    }
  }
)

This actually breaks tests like AdminTest.testBasicPreferredReplicaElection which creates a topic before starting the cluster.

I think the fix could be to only use partitions that are in both partitionReplicaAssignment and partitionLeadershipInfo when computing PreferredReplicaImbalanceCount.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The approach sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

kafkaScheduler.startup()
kafkaScheduler.schedule("controller-metric-task", () => controllerEventQueue.put(UpdateMetrics), period = 10, unit = TimeUnit.SECONDS)
if (config.deleteTopicEnable) {
kafkaScheduler.schedule("topic-deletion-progress-check-task", () => controllerEventQueue.put(TopicDeletionProgressCheck),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, adding scheduled event into the queue just for checking seems over killing. Would it be better to just do this check in main event loop? This applies to other scheduled tasks like leader balancing. If we do this, we can get rid the scheduler thread completely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we can probably get rid of this scheduled thread since the topic deletion check only needs to be triggered on other events that we are already tracking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metric updates are now done after every event in the main thread.

info("Controller startup complete")
}
controllerEventQueue.put(Startup)
controllerEventQueue.put(Elect)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this through the queue? It seems the event thread could just do these two steps at the beginning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible, but I'd rather we try to stick to the following pattern if we can:
all actions that modify controller state should be done as a ControllerEvent processed by the ControllerThread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that once the Startup event is enqueued, additional events could be added to the queue by ZK event thread. Those new events could in theory show up before the Elect event. So, if there is sth that we really want the ControllerThread to complete at the beginning. The safest thing is probably to do that in the thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up merging the actions into Startup to prevent interleavings.

}
def handleNewSession(): Unit = {
controllerEventQueue.put(Resign(getControllerID()))
controllerEventQueue.put(Elect)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, instead of doing this in 2 separate events, could we just do resign and elect in a single event? Ditto in ControllerChangeListener.handleDataDeleted().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible but would prefer to not do it in this patch.

Here are the places where Resign and Elect are added to the queue:

  1. KafkaController.startup adds Elect to the queue.
  2. SessionExpirationListener.handleNewSession adds Resign and Elect to the queue.
  3. ControllerChangeListener.handleDataChange adds Resign to the queue.
  4. ControllerChangeListener.handleDataDeleted adds Resign and Elect to the queue.

I tried refactoring such that the above maintains the same behavior as today's KafkaController.startup, SessionExpirationListener.handleNewSession, LeaderChangeListener.handleDataChange, and LeaderChangeListener.handleDataDeleted. I'd like to try to maintain existing behavior in this patch wherever possible to minimize regressions.

If we merge the two into a single event, we'd end up doing some actions that would work but might not make sense:

  1. KafkaController.startup would unnecessarily clear all state even though state is guaranteed to already be empty.
  2. ControllerChangeListener.handleDataChange would run the election algorithm even though handleDataChange getting triggered should indicate that a new controller has already been elected without znode deletion (setData on /controller could have been used). It's not clear to me if there is some merit to doing this, as later controller changes would hopefully get picked up by ZkClient and enqueued.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to be a bit careful with adding the two events separately. This is because if another event is enqueued btw the 2 events, we may miss the processing of the event since the controller is not ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point.

In the current PR, only the ZkClient's ZkEventThread enqueues Resign and Elect. This happens in ControllerChangeListener.handleDataDeleted and SessionExpirationListener.handleNewSession. Since ZkEventThread can't interleave itself, this leaves us with only two types of threads that can interleave an event between a Resign and Elect:

  1. the scheduler can interleave an UpdateMetrics, TopicDeletionProgressCheck, or AutoPreferredReplicaLeaderElection.
  2. the RequestSendThreads can interleave a TopicDeletionStopReplicaResult from the StopReplicaRequest callback.

I'm not too worried about missing one of the above events interleaved between resignation and election since an elected controller should be able to pick up where things left off by reading zookeeper state.

I am worried however about processing interleaved events while not the controller. For instance, I just noticed a bug in the PR where TopicDeletionProgressCheck and TopicDeletionStopReplicaResult don't first check isActive. I'll update the PR now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR which adds the isActive checks to TopicDeletionProgressCheck and TopicDeletionStopReplicaResult.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple way to solve this issue is to change the queue element to be a list of ControllerEvents. That way, one can add multiple events atomically.

Copy link
Contributor Author

@onurkaraman onurkaraman Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this would solve the issue.

Personally, I'd rather we just stick to a thread that processes a queue of events rather than a queue of queues. It seems simpler to reason about.

If interleaves are still a concern, we can either replace Elect and Resign with the merged equivalent or we can just keep Elect and Resign and add another event type that does both.

@asfbot
Copy link

asfbot commented Apr 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2847/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2843/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2843/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Great that we're starting to fix issues in the Controller. I had a quick look and left a couple of simple comments.

One important question: what's our testing strategy for these changes? How do we ensure that we are not regressing?

}
def handleNewSession(): Unit = {
controllerEventQueue.put(Resign(getControllerID()))
controllerEventQueue.put(Elect)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple way to solve this issue is to change the queue element to be a list of ControllerEvents. That way, one can add multiple events atomically.


case class PartitionModifications(topic: String) extends ControllerEvent {
override def process(): Unit = {
if (!isActive) return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to do the !isActive check before we call this method? We did something like that in the previous listener classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup there is a reason. I also wanted to extract this check but there were two issues with doing so:

  1. most but not all event types should do the isActive check (think Startup, Elect, Resign).
  2. different event types handle the isActive check differently. Most simply return if not active, while others such as the ControlledShutdown prepare a response stating that the controller has moved.

Because of these differences, I couldn't for example naively extract the check out to the ControllerEventThread's doWork. Some options I had considered but ultimately decided against:

  • pattern match in the ControllerEventThread's doWork checking event types and conditionally call isActive.
  • categorize the event types into those that should have the isActive check and those that shouldn't. With this categorization, we can let the ControllerEventThread's doWork do the pattern match based on these new types and conditionally run the check.
  • categorize the event types into those that should have the isActive check and those that shouldn't. With this categorization, we can let these new parent types do the check themselves upfront.

I essentially decided against all of these due to the unique behavior of the ControlledShutdown event.

@onurkaraman
Copy link
Contributor Author

@ijuma in terms of testing, I'm relying on existing unit, integration, and system tests.

All our PRs already show unit/integration test results, and I provided a link to a passing system test I ran when I first opened the PR.

@asfbot
Copy link

asfbot commented Apr 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2904/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2899/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2900/
Test PASSed (JDK 8 and Scala 2.12).

@onurkaraman
Copy link
Contributor Author

Regarding the earlier comment on testing, I made a separate ticket and PR that adds controller integration tests:
ticket: https://issues.apache.org/jira/browse/KAFKA-5069
PR: #2853

I put the test in a separate PR with the intent of having the integration tests checked in before this PR so we can test for regressions when switching over to the single-threaded event queue model.

@asfbot
Copy link

asfbot commented Apr 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3006/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3001/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3002/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman : Thanks for rebasing. A few more comments.

autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
kafkaScheduler.schedule("auto-leader-rebalance-task", () => controllerEventQueue.put(AutoPreferredReplicaLeaderElection),
delay = 5, period = config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of issues with modeling the leader balancing tasks as new events in the queue. (1) If the controller event thread is busy for some reason, more than one rebalance event could be queued up, which adds unnecessary load to the event thread. (2) Auto leader balancing is a performance optimization, but is not critical. If there are other real ZK events (e.g., broker down), we want to be able to process those more critical events before leader balancing. If an auto leader balancing event is already in the queue, it will be a bit hard to take it out when a more important event is enqueued.

An alternative approach is to just do the periodic check in the ControllerEventThread.

kafkaScheduler.startup()
kafkaScheduler.schedule("controller-metric-task", () => controllerEventQueue.put(UpdateMetrics), period = 10, unit = TimeUnit.SECONDS)
if (config.deleteTopicEnable) {
kafkaScheduler.schedule("topic-deletion-progress-check-task", () => controllerEventQueue.put(TopicDeletionProgressCheck),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we can probably get rid of this scheduled thread since the topic deletion check only needs to be triggered on other events that we are already tracking.

override def doWork(): Unit = {
val controllerEvent = controllerEventQueue.take()
try {
controllerEvent.process()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to check if the controller is active here before processing each event, instead of doing that check in every event. The only exception is if the event is reelecting the controller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a bit uncomfortable with manually doing the check in every event. It seems easy to introduce problems when adding new events or changing existing ones. If reelecting the controller is really the only case that needs special treatment, then Jun's suggestion makes sense.

Otherwise, we could have a processIfActive method in ControllerEvent and by default process would do the isActive check, but process could be overridden if necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not quite just reelection.

ControlledShutdown, Startup, ControllerChange, and Reelect all have custom behavior, either not checking isActive at all or doing something special like providing a callback with a Failure containing a ControllerMovedException in the case of ControlledShutdown.

override def process(): Unit = {
registerSessionExpirationListener()
registerControllerChangeListener()
isRunning = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we are moving to a single threaded model, it seems we don't really need isRunning. If the ControllerEventThread is running, it implies isRunning is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I just removed isRunning.

class TopicDeletionManager(controller: KafkaController,
initialTopicsToBeDeleted: Set[String] = Set.empty,
initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
class TopicDeletionManager(controller: KafkaController, controllerEventQueue: LinkedBlockingQueue[ControllerEvent], initialTopicsToBeDeleted: Set[String] = Set.empty, initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to adjust the comments before the class accordingly. For example, 3.3 is probably no longer valid since preferred replica election can't be mixed with topic deletion any more because of the single threaded model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

class TopicDeletionListener(protected val controller: KafkaController, controllerEventQueue: LinkedBlockingQueue[ControllerEvent]) extends IZkChildListener with Logging {
override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
import scala.collection.JavaConverters._
controllerEventQueue.put(controller.TopicDeletion(currentChilds.asScala.toSet))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, there is a slight change of behavior here. Earlier, if a topic deletion is initiated, it will be processed immediately. Now, the topic deletion will only be processed when the scheduler adds a topic deletion event. This means that topic deletion could be delayed by up to 5 seconds, which is a degradation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the PR to actually start doing topic deletion work in the TopicDeletion event instead of waiting for the scheduled event to make progress.

info("ZK expired, but the current controller id %d is the same as this broker id, skip re-elect".format(config.brokerId))
}
def handleNewSession(): Unit = {
controllerEventQueue.put(Resign(getControllerID()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in all other listeners, we pass in controllerEventQueue. Here, we access controllerEventQueue directly. Does SessionExpirationListener need to be a local Class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. I just moved it out.

info("Controller startup complete")
}
controllerEventQueue.put(Startup)
controllerEventQueue.put(Elect)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that once the Startup event is enqueued, additional events could be added to the queue by ZK event thread. Those new events could in theory show up before the Elect event. So, if there is sth that we really want the ControllerThread to complete at the beginning. The safest thing is probably to do that in the thread.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman : Thanks for the patch. A few more comments.

@@ -658,13 +580,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
topicDeletionManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we are moving to a single threaded model, it seems this can be simplified. We know the preferred leader election can't interleave with topic deletion. So, the only thing we need to do is to avoid balancing those topics pending for deletion.

/**
* This is the zookeeper listener that triggers all the state transitions for a replica
*/
class BrokerChangeListener(protected val controller: KafkaController, controllerEventQueue: LinkedBlockingQueue[ControllerEvent]) extends IZkChildListener with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried about passing along the controllerEventQueue to each of the listeners since we may lose track of who can add new events to the queue. Since we are passing in KafkaController anyway, we could probably expose a public method like addToControllerEventQueue() in Controller and avoid passing in controllerEventQueue. The listener will call addToControllerEventQueue() to enqueue events. This way, it's much easier to track who is calling addToControllerEventQueue().

controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if (preferredReplicaElectionInProgress || partitionReassignmentInProgress)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, since we know preferred leader election can't be interleaving, there is not need to check preferredReplicaElectionInProgress.

In general, we probably don't need to maintain ControllerContext.partitionsUndergoingPreferredReplicaElection. It's intended to track partitions undergoing preferred replica election. In a single threaded model, we don't really need that information in ControllerContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one place where having ControllerContext. partitionsUndergoingPreferredReplicaElection still kind of helps: onControllerFailover.

The current behavior is to first load all zookeeper state into ControllerContext in initializeControllerContext and only later trigger in-progress actions like partition reassignment, preferred replica election, and topic deletion. Without being in ControllerContext, we'd need to pass partitionsUndergoingPreferredReplicaElection back out from initializeControllerContext into maybeTriggerPreferredReplicaElection, which is a bit awkward.

I do like all of the potential simplifications we can get regarding preferred replica election as you state in a few of the comments, but I'd rather we introduce those changes in a separate patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is there a particular reason that you want to do the simplification in a separate patch? Sometimes we defer cleanups to avoid the rebasing overhead. However, in this case, no one else is touching the controller code. So this shouldn't be an issue. Also, could you preserve the commit history when updating the patch? That will make it easier to review the delta changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again it's to minimize risk. I'd like to keep this PR as much as possible just a mechanical refactoring of existing code and to leave other changes to a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we can do these in a followup patch if you think it's more convenient. I just don't want us to forget about those simplification improvements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we file subtasks under the umbrella JIRA for things that we decide to do in follow-up PRs. That way, we can make sure we don't forget them.

@@ -178,7 +148,6 @@ class TopicDeletionManager(controller: KafkaController,
.format(replicasThatFailedToDelete.mkString(","), topics))
controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
markTopicIneligibleForDeletion(topics)
resumeTopicDeletionThread()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment before markTopicIneligibleForDeletion() needs to be adjusted since we don't need to call that method due to preferred leader election.

// shutdown partition state machine
partitionStateMachine.shutdown()
deregisterTopicChangeListener()
partitionModificationsListeners.keys.foreach(deregisterPartitionModificationsListener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of maintaining partitionModificationsListeners, could we just do the deregistration based on ControllerContext.allTopics like what we did in controllerFailover()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to maintain some sort of mapping from path to PartitionModificationsListener objects somewhere, or at the very least some way of associating an existing IZkDataListener instance to its corresponding path because ZkClient's unsubscribe variants take both the path and corresponding listener instance. For example:

public void unsubscribeDataChanges(String path, IZkDataListener dataListener)

I've considered defining wrapper listener classes comprised of a zkclient, raw zkclient listener, and path such that you can just call MyWrapperListener.subscribe()/unsubscribe() and it'll internally call zkClient.subscribeDataChanges(path, rawZkClientListener) but decided to leave it out of this patch to minimize the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for the explanation. We can leave the code as it is then.

deregisterTopicDeletionListener()
// shutdown replica state machine
replicaStateMachine.shutdown()
deregisterBrokerChangeListener()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we de-register all listeners together after line 291?

Errors.NONE, partitionsRemaining)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
} else {
request.requestObj.handleError(controlledShutdownResult.failed.get, requestChannel, request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request.requestObj can just be controlledShutdownRequest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@asfbot
Copy link

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3073/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3068/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3069/
Test FAILed (JDK 8 and Scala 2.12).

@onurkaraman
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3080/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3075/
Test FAILed (JDK 7 and Scala 2.10).

@onurkaraman
Copy link
Contributor Author

This PR passed the system tests after rebasing:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/274/

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman : Thanks for the patch. A few more comments. Some of them can be addressed in a followup patch if you prefer. Also, could you rebase?

controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
override def process(): Unit = {
scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if the controller is not active, we don't want to schedule the leader balancing task right? So we probably want to schedule this after checkAndTriggerPartitionRebalance() is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we aren't letting the scheduler periodically inject the event into the queue, any single manual event injection we miss means that there will be no later injection until we become re-elected as controller. This is another reason why I wanted the scheduler to do the periodic event injections.

We risk skipping the next AutoPreferredReplicaLeaderElection if we naively put it after checkAndTriggerPartitionRebalance(), as checkAndTriggerPartitionRebalance() could throw an exception.

Putting the schedule line before the checkAndTriggerPartitionRebalance() prevents this from happening.

We can alternatively wrap the call to checkAndTriggerPartitionRebalance() in a try/finally and put the schedule call in the finally block as well.

I'm open to doing the try/finally, keeping the code as is, or reverting the logic to just let the scheduler periodically inject the event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this in a finally clause sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
private def checkAndTriggerPartitionRebalance(): Unit = {
trace("checking need to trigger partition rebalance")
// get all the active brokers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not accurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it.

* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
*/
if(activeControllerId.get() != -1) {
debug("Broker %d has been elected as leader, so stopping the election process.".format(activeControllerId.get()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we don't have a generic ZookeeperLeaderElector. The log can be changed to "elected as the controller" to make it clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

controllerContext.zkUtils.zkConnection.getZookeeper,
controllerContext.zkUtils.isSecure)
zkCheckedEphemeral.create()
info(config.brokerId + " successfully elected as leader")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above. Probably change to "elected as the controller". Same in line 1582 and 1585.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (wasActiveBeforeChange && !isActive) {
onControllerResignation()
}
elect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this logic is changed slightly from before. Before, the code is the following. So, if this broker is the controller after reading the controller id from ZK, we actually skip elect().

      if (controllerElector.getControllerID() != config.brokerId) {
        onControllerResignation()
        inLock(controllerContext.controllerLock) {
          controllerElector.elect
        }
      } else {
        // This can happen when there are multiple consecutive session expiration and handleNewSession() are called multiple
        // times. The first call may already register the controller path using the newest ZK session. Therefore, the
        // controller path will exist in subsequent calls to handleNewSession().
        info("ZK expired, but the current controller id %d is the same as this broker id, skip re-elect".format(config.brokerId))
      }
    }

partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
} catch {
case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
topicDeletionManager.resumeDeletionForTopics(partitions.map(_.topic))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the preferred leader balancing won't be run concurrently now, it seems that we don't need to resume topic deletion after rebalance since that won't change the failure state of a replica.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

case class TopicDeletionStopReplicaResult(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicDeletionStopReplicaResult => TopicDeletionStopReplicaResultEvent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the other events end in "Event" so to stay consistent, I'd rather keep it as is.

* (though this is not strictly required since it holds the controller lock for the entire duration from start to end)
* 3.1 broker hosting one of the replicas for that topic goes down
* 3.2 partition reassignment for partitions of that topic is in progress
* 3.3 preferred replica election for partitions of that topic is in progress
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.3 is no longer true since a preferred replica election can't be in process when a topic deletion event is being handled. Ditto in 4.2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -156,7 +128,7 @@ class TopicDeletionManager(controller: KafkaController,
val topicsToResumeDeletion = topics & topicsToBeDeleted
if(topicsToResumeDeletion.nonEmpty) {
topicsIneligibleForDeletion --= topicsToResumeDeletion
resumeTopicDeletionThread()
resumeDeletions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done in a followup patch, but it doesn't seem that resumeDeletionForTopics() and resumeDeletionForTopics() need to be called due to preferred leader election.

… model

The goal of this ticket is to improve controller maintainability by simplifying the controller's concurrency semantics. The controller code has a lot of shared state between several threads using several concurrency primitives. This makes the code hard to reason about.

This ticket proposes we convert the controller to a single-threaded event queue model. We add a new controller thread which processes events held in an event queue. Note that this does not mean we get rid of all threads used by the controller. We merely delegate all work that interacts with controller local state to this single thread. With only a single thread accessing and modifying the controller local state, we no longer need to worry about concurrent access, which means we can get rid of the various concurrency primitives used throughout the controller.

Performance is expected to match existing behavior since the bulk of the existing controller work today already happens sequentially in the ZkClient’s single ZkEventThread.
…llerChange. merge Resign and Elect into Reelect.
…ll as directly use the controlledShutdownRequest for handling errors
…anager at the end of preferred replica leader election
@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3209/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3219/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3212/
Test PASSed (JDK 7 and Scala 2.10).

@junrao
Copy link
Contributor

junrao commented Apr 27, 2017

@onurkaraman : Thanks for the patch. LGTM. Let's address the remaining minor comments in a followup patch.

@asfgit asfgit closed this in bb663d0 Apr 27, 2017
@ijuma
Copy link
Contributor

ijuma commented Apr 27, 2017

Yay :)

@guozhangwang
Copy link
Contributor

@onurkaraman This is a great improvement! I'm wondering if we can make a pass over the open JIRAs that are related to locking issues in controller side and mark all of them resolved in the next release :)

@onurkaraman
Copy link
Contributor Author

@guozhangwang sounds good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants