From 32d54551746d303b977a5deb1e1cbc19d8dc33f2 Mon Sep 17 00:00:00 2001 From: Balint Molnar Date: Fri, 9 Jun 2017 13:15:39 +0200 Subject: [PATCH 1/2] KAFKA-5388 Replace zkClient.subscribe*Changes method with an equivalent zkUtils method --- .../ZkNodeChangeNotificationListener.scala | 6 +-- .../consumer/ZookeeperConsumerConnector.scala | 8 ++-- .../consumer/ZookeeperTopicEventWatcher.scala | 10 ++--- .../kafka/controller/KafkaController.scala | 38 +++++++++---------- .../scala/kafka/server/KafkaHealthcheck.scala | 2 +- .../main/scala/kafka/server/KafkaServer.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 26 ++++++++++++- 7 files changed, 57 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 960f69025a3a6..6b4d8547e7c70 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -40,7 +40,7 @@ trait NotificationHandler { * notificationHandler's processNotification() method with the child's data as argument. As part of processing these changes it also * purges any children with currentTime - createTime > changeExpirationMs. * - * The caller/user of this class should ensure that they use zkClient.subscribeStateChanges and call processAllNotifications + * The caller/user of this class should ensure that they use zkUtils.subscribeStateChanges and call processAllNotifications * method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary to ensure that if zk session * is terminated and reestablished any missed notification will be processed immediately. * @param zkUtils @@ -64,8 +64,8 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, */ def init() { zkUtils.makeSurePersistentPathExists(seqNodeRoot) - zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener) - zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener) + zkUtils.subscribeChildChanges(seqNodeRoot, NodeChangeListener) + zkUtils.subscribeStateChanges(ZkStateChangeListener) processAllNotifications() } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index ba2fce1e0c339..a9c2c239eeb15 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -682,7 +682,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // We log a warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers // are up. warn("no brokers found when trying to rebalance.") - zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener) + zkUtils.subscribeChildChanges(BrokerIdsPath, loadBalancerListener) true } else { @@ -963,14 +963,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, }) // listener to consumer and partition changes - zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener) + zkUtils.subscribeStateChanges(sessionExpirationListener) - zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) + zkUtils.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkUtils.zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) + zkUtils.subscribeDataChanges(topicPath, topicPartitionChangeListener) } // explicitly trigger load balancing for this consumer diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index 1a86227b6cf55..ba73deb890bad 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -34,17 +34,15 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils, val topicEventListener = new ZkTopicEventListener() zkUtils.makeSurePersistentPathExists(ZkUtils.BrokerTopicsPath) - zkUtils.zkClient.subscribeStateChanges( - new ZkSessionExpireListener(topicEventListener)) + zkUtils.subscribeStateChanges(new ZkSessionExpireListener(topicEventListener)) - val topics = zkUtils.zkClient.subscribeChildChanges( - ZkUtils.BrokerTopicsPath, topicEventListener) + val topics = zkUtils.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) // call to bootstrap topic list topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics) } - private def stopWatchingTopicEvents() { zkUtils.zkClient.unsubscribeAll() } + private def stopWatchingTopicEvents() { zkUtils.unsubscribeAll() } def shutdown() { lock.synchronized { @@ -90,7 +88,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils, lock.synchronized { if (zkUtils != null) { info("ZK expired: resubscribing topic event listener to topic registry") - zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) + zkUtils.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) } } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e00f1ddead16d..02103fd6e59d6 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -541,7 +541,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met reassignedReplicas.toSet) reassignedPartitionContext.isrChangeListener = isrChangeListener // register listener on the leader and isr path to wait until they catch up with the current leader - zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) + zkUtils.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) } def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, @@ -643,11 +643,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met } private def registerSessionExpirationListener() = { - zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager)) + zkUtils.subscribeStateChanges(new SessionExpirationListener(this, eventManager)) } private def registerControllerChangeListener() = { - zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager)) + zkUtils.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager)) } private def initializeControllerContext() { @@ -842,70 +842,70 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met } private def registerBrokerChangeListener() = { - zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) + zkUtils.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) } private def deregisterBrokerChangeListener() = { - zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) + zkUtils.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) } private def registerTopicChangeListener() = { - zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener) + zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener) } private def deregisterTopicChangeListener() = { - zkUtils.zkClient.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener) + zkUtils.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener) } def registerPartitionModificationsListener(topic: String) = { partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, eventManager, topic)) - zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) + zkUtils.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) } def deregisterPartitionModificationsListener(topic: String) = { - zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) + zkUtils.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) partitionModificationsListeners.remove(topic) } private def registerTopicDeletionListener() = { - zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, topicDeletionListener) + zkUtils.subscribeChildChanges(DeleteTopicsPath, topicDeletionListener) } private def deregisterTopicDeletionListener() = { - zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, topicDeletionListener) + zkUtils.unsubscribeChildChanges(DeleteTopicsPath, topicDeletionListener) } private def registerPartitionReassignmentListener() = { - zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener) + zkUtils.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener) } private def deregisterPartitionReassignmentListener() = { - zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener) + zkUtils.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener) } private def registerIsrChangeNotificationListener() = { debug("Registering IsrChangeNotificationListener") - zkUtils.zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) + zkUtils.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) } private def deregisterIsrChangeNotificationListener() = { debug("De-registering IsrChangeNotificationListener") - zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) + zkUtils.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) } private def registerPreferredReplicaElectionListener() { - zkUtils.zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) + zkUtils.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) } private def deregisterPreferredReplicaElectionListener() { - zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) + zkUtils.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) } private def deregisterPartitionReassignmentIsrChangeListeners() { controllerContext.partitionsBeingReassigned.foreach { case (topicAndPartition, reassignedPartitionsContext) => val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition) - zkUtils.zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener) + zkUtils.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener) } } @@ -922,7 +922,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) { // stop watching the ISR changes for this partition - zkUtils.zkClient.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), + zkUtils.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) } // read the current list of reassigned partitions from zookeeper diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 0b62fca028b0c..b108bf65731aa 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -46,7 +46,7 @@ class KafkaHealthcheck(brokerId: Int, private[server] val sessionExpireListener = new SessionExpireListener def startup() { - zkUtils.zkClient.subscribeStateChanges(sessionExpireListener) + zkUtils.subscribeStateChanges(sessionExpireListener) register() } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0a87750d9495b..a19cc4017a50e 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -332,7 +332,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP secureAclsEnabled) zkClientForChrootCreation.makeSurePersistentPathExists(chroot) info(s"Created zookeeper path $chroot") - zkClientForChrootCreation.zkClient.close() + zkClientForChrootCreation.close() } val zkUtils = ZkUtils(config.zkConnect, diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 837b567414588..8c7b8ff8a8350 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -29,7 +29,7 @@ import kafka.server.ConfigType import kafka.utils.ZkUtils._ import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException} import org.I0Itec.zkclient.serialize.ZkSerializer -import org.I0Itec.zkclient.{ZkClient, ZkConnection} +import org.I0Itec.zkclient.{ZkClient, ZkConnection, IZkDataListener, IZkChildListener, IZkStateListener} import org.apache.kafka.common.config.ConfigException import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} import org.apache.zookeeper.KeeperException.Code @@ -635,6 +635,30 @@ class ZkUtils(val zkClient: ZkClient, zkClient.deleteRecursive(path) } + def subscribeDataChanges(path: String, listener: IZkDataListener): Unit = { + zkClient.subscribeDataChanges(path, listener) + } + + def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit = { + zkClient.unsubscribeDataChanges(path, dataListener) + } + + def subscribeStateChanges(listener: IZkStateListener): Unit = { + zkClient.subscribeStateChanges(listener) + } + + def subscribeChildChanges(path: String, listener: IZkChildListener): java.util.List[String] = { + zkClient.subscribeChildChanges(path, listener) + } + + def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit = { + zkClient.unsubscribeChildChanges(path, childListener) + } + + def unsubscribeAll(): Unit = { + zkClient.unsubscribeAll() + } + def readData(path: String): (String, Stat) = { val stat: Stat = new Stat() val dataStr: String = zkClient.readData(path, stat) From d3383e4bdfd964f40403bfa106ede5c0a4a566bb Mon Sep 17 00:00:00 2001 From: Balint Molnar Date: Tue, 18 Jul 2017 15:40:26 +0200 Subject: [PATCH 2/2] Address review comments, use scala Seq[String] instead of java List --- .../scala/kafka/consumer/ZookeeperTopicEventWatcher.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index ba73deb890bad..dd1c6aa24a60c 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -39,7 +39,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils, val topics = zkUtils.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) // call to bootstrap topic list - topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics) + topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics.asJava) } private def stopWatchingTopicEvents() { zkUtils.unsubscribeAll() } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 8c7b8ff8a8350..b0c3ebf454e07 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -647,8 +647,8 @@ class ZkUtils(val zkClient: ZkClient, zkClient.subscribeStateChanges(listener) } - def subscribeChildChanges(path: String, listener: IZkChildListener): java.util.List[String] = { - zkClient.subscribeChildChanges(path, listener) + def subscribeChildChanges(path: String, listener: IZkChildListener): Seq[String] = { + zkClient.subscribeChildChanges(path, listener).asScala } def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit = {