From 10071378d38bab1bcf00aefa0ef678fdb77c8f5b Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 15 Dec 2015 13:15:22 -0800 Subject: [PATCH 1/3] KAFKA-2547: Make DynamicConfigManager to use the ZkNodeChangeNotificationListener introduced as part of KAFKA-2211 --- .../ZkNodeChangeNotificationListener.scala | 21 +++- .../security/auth/SimpleAclAuthorizer.scala | 18 ---- .../kafka/server/DynamicConfigManager.scala | 96 ++++--------------- .../server/DynamicConfigChangeTest.scala | 10 +- 4 files changed, 42 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index a600d5de8bdf0..6f2c93fd4149b 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -17,7 +17,8 @@ package kafka.common import kafka.utils.{Time, SystemTime, ZkUtils, Logging} -import org.I0Itec.zkclient.{IZkChildListener, ZkClient} +import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState import scala.collection.JavaConverters._ /** @@ -37,7 +38,7 @@ trait NotificationHandler { * The caller/user of this class should ensure that they use zkClient.subscribeStateChanges and call processAllNotifications * method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary to ensure that if zk session * is terminated and reestablished any missed notification will be processed immediately. - * @param zkClient + * @param zkUtils * @param seqNodeRoot * @param seqNodePrefix * @param notificationHandler @@ -58,6 +59,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, def init() { zkUtils.makeSurePersistentPathExists(seqNodeRoot) zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener) + zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener) processAllNotifications() } @@ -125,5 +127,20 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, } } + object ZkStateChangeListener extends IZkStateListener { + + override def handleNewSession() { + processAllNotifications + } + + override def handleSessionEstablishmentError(error: Throwable) { + fatal("Could not establish session with zookeeper", error) + } + + override def handleStateChanged(state: KeeperState) { + //no op + } + } + } diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index d0d226cc9f12d..76e5e19f0aeb9 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -104,8 +104,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging { zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath) aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler) aclChangeListener.init() - - zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener) } override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { @@ -274,20 +272,4 @@ class SimpleAclAuthorizer extends Authorizer with Logging { updateCache(resource, acls) } } - - object ZkStateChangeListener extends IZkStateListener { - - override def handleNewSession() { - aclChangeListener.processAllNotifications - } - - override def handleSessionEstablishmentError(error: Throwable) { - fatal("Could not establish session with zookeeper", error) - } - - override def handleStateChanged(state: KeeperState) { - //no op - } - } - } diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index cb4b8f18391fd..cbc8a255d493f 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -17,15 +17,18 @@ package kafka.server +import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener} +import kafka.security.auth.Resource import kafka.utils.Json import kafka.utils.Logging import kafka.utils.SystemTime import kafka.utils.Time import kafka.utils.ZkUtils +import org.apache.zookeeper.Watcher.Event.KeeperState import scala.collection._ import kafka.admin.AdminUtils -import org.I0Itec.zkclient.{IZkChildListener, ZkClient} +import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} /** @@ -70,68 +73,28 @@ object ConfigType { * on startup where a change might be missed between the initial config load and registering for change notifications. * */ + class DynamicConfigManager(private val zkUtils: ZkUtils, private val configHandlers: Map[String, ConfigHandler], private val changeExpirationMs: Long = 15*60*1000, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L - /** - * Begin watching for config changes - */ - def startup() { - zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath) - zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener) - processAllConfigChanges() - } - - /** - * Process all config changes - */ - private def processAllConfigChanges() { - val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath) - import JavaConversions._ - processConfigChanges((configChanges: mutable.Buffer[String]).sorted) - } - - /** - * Process the given list of config changes - */ - private def processConfigChanges(notifications: Seq[String]) { - if (notifications.size > 0) { - info("Processing config change notification(s)...") - val now = time.milliseconds - for (notification <- notifications) { - val changeId = changeNumber(notification) - - if (changeId > lastExecutedChange) { - val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification - - val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode) - processNotification(jsonOpt) - } - lastExecutedChange = changeId - } - purgeObsoleteNotifications(now, notifications) - } - } - - def processNotification(jsonOpt: Option[String]) = { - if(jsonOpt.isDefined) { - val json = jsonOpt.get + object ConfigChangedNotificaitonHandler extends NotificationHandler { + override def processNotification(json: String) = { Json.parseFull(json) match { case None => // There are no config overrides. - // Ignore non-json notifications because they can be from the deprecated TopicConfigManager + // Ignore non-json notifications because they can be from the deprecated TopicConfigManager case Some(mapAnon: Map[_, _]) => val map = mapAnon collect - { case (k: String, v: Any) => k -> v } + { case (k: String, v: Any) => k -> v } require(map("version") == 1) val entityType = map.get("entity_type") match { case Some(ConfigType.Topic) => ConfigType.Topic case Some(ConfigType.Client) => ConfigType.Client case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." + - " Received: " + json) + " Received: " + json) } val entity = map.get("entity_name") match { @@ -143,43 +106,20 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, configHandlers(entityType).processConfigChanges(entity, entityConfig) case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" + - "{\"version\" : 1," + - " \"entity_type\":\"topic/client\"," + - " \"entity_name\" : \"topic_name/client_id\"}." + - " Received: " + json) - } - } - } - - private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { - for(notification <- notifications.sorted) { - val (jsonOpt, stat) = zkUtils.readDataMaybeNull(ZkUtils.EntityConfigChangesPath + "/" + notification) - if(jsonOpt.isDefined) { - val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification - if (now - stat.getCtime > changeExpirationMs) { - debug("Purging config change notification " + notification) - zkUtils.deletePath(changeZnode) - } else { - return - } + "{\"version\" : 1," + + " \"entity_type\":\"topic/client\"," + + " \"entity_name\" : \"topic_name/client_id\"}." + + " Received: " + json) } } } - /* get the change number from a change notification znode */ - private def changeNumber(name: String): Long = name.substring(AdminUtils.EntityConfigChangeZnodePrefix.length).toLong + private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificaitonHandler) //TODO: null should be replaced with config handlers. /** - * A listener that applies config changes to logs + * Begin watching for config changes */ - object ConfigChangeListener extends IZkChildListener { - override def handleChildChange(path: String, chillins: java.util.List[String]) { - try { - import JavaConversions._ - processConfigChanges(chillins: mutable.Buffer[String]) - } catch { - case e: Exception => error("Error processing config change:", e) - } - } + def startup(): Unit = { + configChangeListener.init() } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 8984f175bfcd1..e75d67ce268ca 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -109,12 +109,12 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { val configManager = new DynamicConfigManager(zkUtils, Map(ConfigType.Topic -> handler)) // Notifications created using the old TopicConfigManager are ignored. - configManager.processNotification(Some("not json")) + configManager.ConfigChangedNotificaitonHandler.processNotification("not json") // Incorrect Map. No version try { val jsonMap = Map("v" -> 1, "x" -> 2) - configManager.processNotification(Some(Json.encode(jsonMap))) + configManager.ConfigChangedNotificaitonHandler.processNotification(Json.encode(jsonMap)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -123,7 +123,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Version is provided. EntityType is incorrect try { val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x") - configManager.processNotification(Some(Json.encode(jsonMap))) + configManager.ConfigChangedNotificaitonHandler.processNotification(Json.encode(jsonMap)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -133,7 +133,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // EntityName isn't provided try { val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic) - configManager.processNotification(Some(Json.encode(jsonMap))) + configManager.ConfigChangedNotificaitonHandler.processNotification(Json.encode(jsonMap)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -142,7 +142,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Everything is provided val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x") - configManager.processNotification(Some(Json.encode(jsonMap))) + configManager.ConfigChangedNotificaitonHandler.processNotification(Json.encode(jsonMap)) // Verify that processConfigChanges was only called once EasyMock.verify(handler) From a13b963896cade76776eb4b398e3e673544e7321 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Thu, 17 Dec 2015 14:58:37 -0800 Subject: [PATCH 2/3] Addressing comments from Reviewers. --- .../common/ZkNodeChangeNotificationListener.scala | 2 +- .../main/scala/kafka/server/DynamicConfigManager.scala | 5 ++--- .../unit/kafka/server/DynamicConfigChangeTest.scala | 10 +++++----- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 6f2c93fd4149b..07d8c0c0d93f3 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -138,7 +138,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, } override def handleStateChanged(state: KeeperState) { - //no op + debug(s"new zookeeper state: ${state}") } } diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index cbc8a255d493f..eb406affa7242 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -73,14 +73,13 @@ object ConfigType { * on startup where a change might be missed between the initial config load and registering for change notifications. * */ - class DynamicConfigManager(private val zkUtils: ZkUtils, private val configHandlers: Map[String, ConfigHandler], private val changeExpirationMs: Long = 15*60*1000, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L - object ConfigChangedNotificaitonHandler extends NotificationHandler { + object ConfigChangedNotificationHandler extends NotificationHandler { override def processNotification(json: String) = { Json.parseFull(json) match { case None => // There are no config overrides. @@ -114,7 +113,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, } } - private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificaitonHandler) //TODO: null should be replaced with config handlers. + private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler) /** * Begin watching for config changes diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index e75d67ce268ca..d1ad3a3e2ab14 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -109,12 +109,12 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { val configManager = new DynamicConfigManager(zkUtils, Map(ConfigType.Topic -> handler)) // Notifications created using the old TopicConfigManager are ignored. - configManager.ConfigChangedNotificaitonHandler.processNotification("not json") + configManager.ConfigChangedNotificationHandler.processNotification("not json") // Incorrect Map. No version try { val jsonMap = Map("v" -> 1, "x" -> 2) - configManager.ConfigChangedNotificaitonHandler.processNotification(Json.encode(jsonMap)) + configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -123,7 +123,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Version is provided. EntityType is incorrect try { val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x") - configManager.ConfigChangedNotificaitonHandler.processNotification(Json.encode(jsonMap)) + configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -133,7 +133,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // EntityName isn't provided try { val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic) - configManager.ConfigChangedNotificaitonHandler.processNotification(Json.encode(jsonMap)) + configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -142,7 +142,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Everything is provided val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x") - configManager.ConfigChangedNotificaitonHandler.processNotification(Json.encode(jsonMap)) + configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap)) // Verify that processConfigChanges was only called once EasyMock.verify(handler) From 1722c76326370fe42b4f570e6ed44229396ebed7 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Wed, 6 Jan 2016 10:53:40 -0800 Subject: [PATCH 3/3] Addressing review comments. --- .../scala/kafka/common/ZkNodeChangeNotificationListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 688c586fd4ac3..baddecc149012 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -152,7 +152,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, } override def handleStateChanged(state: KeeperState) { - debug(s"new zookeeper state: ${state}") + debug(s"New zookeeper state: ${state}") } }