From 4cd2b6bd83558e536b2f330deec177c3dd28e8ff Mon Sep 17 00:00:00 2001 From: tuyang Date: Thu, 8 Dec 2016 11:49:11 +0800 Subject: [PATCH 1/6] fix Controller resigned but it also acts as a controller for a long time bug --- .../kafka/controller/KafkaController.scala | 19 +++++++++++++++++++ .../controller/PartitionStateMachine.scala | 15 +++++++++++++++ .../controller/ReplicaStateMachine.scala | 5 +++++ 3 files changed, 39 insertions(+) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index d3137c3c55d5..c32b61c9ae66 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1246,6 +1246,11 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL */ @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { + // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order + if(!controller.isActive()) { + return + } + debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" .format(dataPath, data)) val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString) @@ -1290,6 +1295,11 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: */ @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { + // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order + if(!controller.isActive()) { + return + } + inLock(controllerContext.controllerLock) { debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data)) val topicAndPartition = TopicAndPartition(topic, partition) @@ -1343,6 +1353,10 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = { import scala.collection.JavaConverters._ + // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order + if(!controller.isActive()) { + return + } inLock(controller.controllerContext.controllerLock) { debug("[IsrChangeNotificationListener] Fired!!!") @@ -1416,6 +1430,11 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD */ @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { + // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order + if(!controller.isActive()) { + return + } + debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s" .format(dataPath, data.toString)) inLock(controllerContext.controllerLock) { diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index a5285c38838b..4cacde737449 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -411,6 +411,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @throws(classOf[Exception]) def handleChildChange(parentPath : String, children : java.util.List[String]) { + // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order + if(!controller.isActive()) { + return + } + inLock(controllerContext.controllerLock) { if (hasStarted.get) { try { @@ -453,6 +458,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { */ @throws(classOf[Exception]) def handleChildChange(parentPath : String, children : java.util.List[String]) { + // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order + if(!controller.isActive()) { + return + } + inLock(controllerContext.controllerLock) { var topicsToBeDeleted = children.asScala.toSet debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(","))) @@ -504,6 +514,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @throws(classOf[Exception]) def handleDataChange(dataPath : String, data: Object) { + // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order + if(!controller.isActive()) { + return + } + inLock(controllerContext.controllerLock) { try { info(s"Partition modification triggered $data for path $dataPath") diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index a26f95a87ca4..f8ee93f504d9 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -351,6 +351,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { class BrokerChangeListener() extends IZkChildListener with Logging { this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: " def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { + // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order + if(!controller.isActive()) { + return + } + info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.asScala.sorted.mkString(","))) inLock(controllerContext.controllerLock) { if (hasStarted.get) { From c1d2128abe5ad96f4354eb82fce036486e30c2d8 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 19 Dec 2016 12:27:37 +0000 Subject: [PATCH 2/6] Introduce `ControllerZkListener` a couple of subclasses to reduce code duplication Also include a few clean-ups --- .../controller/ControllerZkListener.scala | 62 +++++++++++ .../kafka/controller/KafkaController.scala | 105 ++++++------------ .../controller/PartitionStateMachine.scala | 67 ++++------- .../controller/ReplicaStateMachine.scala | 19 ++-- .../main/scala/kafka/server/KafkaApis.scala | 4 +- .../kafka/server/ZookeeperLeaderElector.scala | 7 +- 6 files changed, 131 insertions(+), 133 deletions(-) create mode 100644 core/src/main/scala/kafka/controller/ControllerZkListener.scala diff --git a/core/src/main/scala/kafka/controller/ControllerZkListener.scala b/core/src/main/scala/kafka/controller/ControllerZkListener.scala new file mode 100644 index 000000000000..cef99c8af29d --- /dev/null +++ b/core/src/main/scala/kafka/controller/ControllerZkListener.scala @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.controller + +import kafka.utils.Logging +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener} + +import scala.collection.JavaConverters._ + +trait ControllerZkListener extends Logging { + logIdent = s"[$logName on Controller " + controller.config.brokerId + "]: " + protected def logName: String + protected def controller: KafkaController +} + +trait ControllerZkChildListener extends IZkChildListener with ControllerZkListener { + @throws(classOf[Exception]) + final def handleChildChange(parentPath: String, currentChildren: java.util.List[String]): Unit = { + // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved + if (controller.isActive) + doHandleChildChange(parentPath, currentChildren.asScala) + } + + @throws(classOf[Exception]) + def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit +} + +trait ControllerZkDataListener extends IZkDataListener with ControllerZkListener { + @throws[Exception] + final def handleDataChange(dataPath: String, data: AnyRef): Unit = { + // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved + if (controller.isActive) + doHandleDataChange(dataPath, data) + } + + @throws[Exception] + def doHandleDataChange(dataPath: String, data: AnyRef): Unit + + @throws[Exception] + final def handleDataDeleted(dataPath: String): Unit = { + // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved + if (controller.isActive) + doHandleDataDeleted(dataPath) + } + + @throws[Exception] + def doHandleDataDeleted(dataPath: String): Unit +} diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c32b61c9ae66..ee2b5527dc48 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -16,8 +16,6 @@ */ package kafka.controller -import java.util - import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException} import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse} @@ -39,7 +37,7 @@ import kafka.utils.CoreUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time -import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.{IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException} import java.util.concurrent.locks.ReentrantLock @@ -152,7 +150,7 @@ object KafkaController extends Logging { } } -class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { +class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger @@ -187,7 +185,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat new Gauge[Int] { def value(): Int = { inLock(controllerContext.controllerLock) { - if (!isActive()) + if (!isActive) 0 else controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader)) @@ -201,7 +199,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat new Gauge[Int] { def value(): Int = { inLock(controllerContext.controllerLock) { - if (!isActive()) + if (!isActive) 0 else controllerContext.partitionReplicaAssignment.count { @@ -230,7 +228,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat */ def shutdownBroker(id: Int) : Set[TopicAndPartition] = { - if (!isActive()) { + if (!isActive) { throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown") } @@ -398,7 +396,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat /** * Returns true if this broker is the current controller. */ - def isActive(): Boolean = { + def isActive: Boolean = { inLock(controllerContext.controllerLock) { controllerContext.controllerChannelManager != null } @@ -1176,7 +1174,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat } private def checkAndTriggerPartitionRebalance(): Unit = { - if (isActive()) { + if (isActive) { trace("checking need to trigger partition rebalance") // get all the active brokers var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null @@ -1234,10 +1232,10 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned * partitions. */ -class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging { - this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: " - val zkUtils = controller.controllerContext.zkUtils - val controllerContext = controller.controllerContext +class PartitionsReassignedListener(protected val controller: KafkaController) extends ControllerZkDataListener { + private val controllerContext = controller.controllerContext + + protected def logName = "PartitionsReassignedListener" /** * Invoked when some partitions are reassigned by the admin command @@ -1245,12 +1243,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL * @throws Exception On any error. */ @throws(classOf[Exception]) - def handleDataChange(dataPath: String, data: Object) { - // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order - if(!controller.isActive()) { - return - } - + def doHandleDataChange(dataPath: String, data: AnyRef) { debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" .format(dataPath, data)) val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString) @@ -1271,22 +1264,16 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL } } - /** - * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader - * - * @throws Exception On any error. - */ @throws(classOf[Exception]) - def handleDataDeleted(dataPath: String) { - } + def doHandleDataDeleted(dataPath: String) {} } -class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int, - reassignedReplicas: Set[Int]) - extends IZkDataListener with Logging { - this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: " - val zkUtils = controller.controllerContext.zkUtils - val controllerContext = controller.controllerContext +class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaController, topic: String, partition: Int, + reassignedReplicas: Set[Int]) extends ControllerZkDataListener { + private val zkUtils = controller.controllerContext.zkUtils + private val controllerContext = controller.controllerContext + + protected def logName = "ReassignedPartitionsIsrChangeListener" /** * Invoked when some partitions need to move leader to preferred replica @@ -1294,12 +1281,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: * @throws Exception On any error. */ @throws(classOf[Exception]) - def handleDataChange(dataPath: String, data: Object) { - // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order - if(!controller.isActive()) { - return - } - + def doHandleDataChange(dataPath: String, data: AnyRef) { inLock(controllerContext.controllerLock) { debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data)) val topicAndPartition = TopicAndPartition(topic, partition) @@ -1335,13 +1317,9 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: } } - /** - * @throws Exception - * On any error. - */ @throws(classOf[Exception]) - def handleDataDeleted(dataPath: String) { - } + def doHandleDataDeleted(dataPath: String) {} + } /** @@ -1349,27 +1327,22 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: * * @param controller */ -class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging { +class IsrChangeNotificationListener(protected val controller: KafkaController) extends ControllerZkChildListener { - override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = { - import scala.collection.JavaConverters._ - // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order - if(!controller.isActive()) { - return - } + protected def logName = "IsrChangeNotificationListener" + override def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit = { inLock(controller.controllerContext.controllerLock) { - debug("[IsrChangeNotificationListener] Fired!!!") - val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala + debug("ISR change notification listener fired") try { - val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.flatMap(x => getTopicAndPartition(x)).toSet + val topicAndPartitions = currentChildren.flatMap(getTopicAndPartition).toSet if (topicAndPartitions.nonEmpty) { controller.updateLeaderAndIsrCache(topicAndPartitions) processUpdateNotifications(topicAndPartitions) } } finally { // delete processed children - childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath( + currentChildren.map(x => controller.controllerContext.zkUtils.deletePath( ZkUtils.IsrChangeNotificationPath + "/" + x)) } } @@ -1418,10 +1391,10 @@ object IsrChangeNotificationListener { * Starts the preferred replica leader election for the list of partitions specified under * /admin/preferred_replica_election - */ -class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging { - this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: " - val zkUtils = controller.controllerContext.zkUtils - val controllerContext = controller.controllerContext +class PreferredReplicaElectionListener(protected val controller: KafkaController) extends ControllerZkDataListener { + private val controllerContext = controller.controllerContext + + protected def logName = "PreferredReplicaElectionListener" /** * Invoked when some partitions are reassigned by the admin command @@ -1429,22 +1402,17 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD * @throws Exception On any error. */ @throws(classOf[Exception]) - def handleDataChange(dataPath: String, data: Object) { - // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order - if(!controller.isActive()) { - return - } - + def doHandleDataChange(dataPath: String, data: AnyRef) { debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s" .format(dataPath, data.toString)) inLock(controllerContext.controllerLock) { val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString) - if(controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty) + if (controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty) info("These partitions are already undergoing preferred replica election: %s" .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(","))) val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) - if(partitionsForTopicsToBeDeleted.nonEmpty) { + if (partitionsForTopicsToBeDeleted.nonEmpty) { error("Skipping preferred replica election for partitions %s since the respective topics are being deleted" .format(partitionsForTopicsToBeDeleted)) } @@ -1456,8 +1424,7 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD * @throws Exception On any error. */ @throws(classOf[Exception]) - def handleDataDeleted(dataPath: String) { - } + def doHandleDataDeleted(dataPath: String) {} } case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 4cacde737449..6d3011876fac 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -17,13 +17,11 @@ package kafka.controller import collection._ -import collection.JavaConverters._ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.utils.{Logging, ReplicationUtils} import kafka.utils.ZkUtils._ -import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.controller.Callbacks.CallbackBuilder import kafka.utils.CoreUtils._ @@ -48,8 +46,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val hasStarted = new AtomicBoolean(false) private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) - private val topicChangeListener = new TopicChangeListener() - private val deleteTopicsListener = new DeleteTopicsListener() + private val topicChangeListener = new TopicChangeListener(controller) + private val deleteTopicsListener = new DeleteTopicsListener(controller) private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty private val stateChangeLogger = KafkaController.stateChangeLogger @@ -375,7 +373,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } def registerPartitionChangeListener(topic: String) = { - partitionModificationsListeners.put(topic, new PartitionModificationsListener(topic)) + partitionModificationsListeners.put(topic, new PartitionModificationsListener(controller, topic)) zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) } @@ -406,22 +404,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { /** * This is the zookeeper listener that triggers all the state transitions for a partition */ - class TopicChangeListener extends IZkChildListener with Logging { - this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: " + class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener { - @throws(classOf[Exception]) - def handleChildChange(parentPath : String, children : java.util.List[String]) { - // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order - if(!controller.isActive()) { - return - } + protected def logName = "TopicChangeListener" + @throws(classOf[Exception]) + def doHandleChildChange(parentPath: String, children: Seq[String]) { inLock(controllerContext.controllerLock) { if (hasStarted.get) { try { val currentChildren = { - debug("Topic change listener fired for path %s with children %s".format(parentPath, children.asScala.mkString(","))) - children.asScala.toSet + debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) + children.toSet } val newTopics = currentChildren -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- currentChildren @@ -436,7 +430,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { if (newTopics.nonEmpty) controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet) } catch { - case e: Throwable => error("Error while handling new topic", e ) + case e: Throwable => error("Error while handling new topic", e) } } } @@ -448,23 +442,19 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists * 2. If there are topics to be deleted, it signals the delete topic thread */ - class DeleteTopicsListener() extends IZkChildListener with Logging { - this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: " - val zkUtils = controllerContext.zkUtils + class DeleteTopicsListener(protected val controller: KafkaController) extends ControllerZkChildListener { + private val zkUtils = controllerContext.zkUtils + + protected def logName = "DeleteTopicsListener" /** * Invoked when a topic is being deleted * @throws Exception On any error. */ @throws(classOf[Exception]) - def handleChildChange(parentPath : String, children : java.util.List[String]) { - // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order - if(!controller.isActive()) { - return - } - + def doHandleChildChange(parentPath: String, children: Seq[String]) { inLock(controllerContext.controllerLock) { - var topicsToBeDeleted = children.asScala.toSet + var topicsToBeDeleted = children.toSet debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(","))) val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics if(nonExistentTopics.nonEmpty) { @@ -491,34 +481,22 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics for (topic <- topicsToBeDeleted) { info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled") - val zkUtils = controllerContext.zkUtils zkUtils.zkClient.delete(getDeleteTopicPath(topic)) } } } } - /** - * - * @throws Exception - * On any error. - */ @throws(classOf[Exception]) - def handleDataDeleted(dataPath: String) { - } + def doHandleDataDeleted(dataPath: String) {} } - class PartitionModificationsListener(topic: String) extends IZkDataListener with Logging { + class PartitionModificationsListener(protected val controller: KafkaController, topic: String) extends ControllerZkDataListener { - this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: " + protected def logName = "AddPartitionsListener" @throws(classOf[Exception]) - def handleDataChange(dataPath : String, data: Object) { - // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order - if(!controller.isActive()) { - return - } - + def doHandleDataChange(dataPath: String, data: AnyRef) { inLock(controllerContext.controllerLock) { try { info(s"Partition modification triggered $data for path $dataPath") @@ -541,10 +519,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } + // this is not implemented for partition change @throws(classOf[Exception]) - def handleDataDeleted(parentPath : String) { - // this is not implemented for partition change - } + def doHandleDataDeleted(parentPath: String): Unit = {} } } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index f8ee93f504d9..b106b0101eba 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -17,13 +17,11 @@ package kafka.controller import collection._ -import collection.JavaConverters._ import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{StateChangeFailedException, TopicAndPartition} import kafka.controller.Callbacks.CallbackBuilder import kafka.utils.{Logging, ReplicationUtils, ZkUtils} -import org.I0Itec.zkclient.IZkChildListener import kafka.utils.CoreUtils._ /** @@ -49,7 +47,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val controllerId = controller.config.brokerId private val zkUtils = controllerContext.zkUtils private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty - private val brokerChangeListener = new BrokerChangeListener() + private val brokerChangeListener = new BrokerChangeListener(controller) private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val hasStarted = new AtomicBoolean(false) private val stateChangeLogger = KafkaController.stateChangeLogger @@ -348,20 +346,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { /** * This is the zookeeper listener that triggers all the state transitions for a replica */ - class BrokerChangeListener() extends IZkChildListener with Logging { - this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: " - def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { - // Avoid continue to exectue handleChildChange after controller is not active because of zkclient's callback order - if(!controller.isActive()) { - return - } + class BrokerChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener { + + protected def logName = "BrokerChangeListener" - info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.asScala.sorted.mkString(","))) + def doHandleChildChange(parentPath: String, currentBrokerList: Seq[String]) { + info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(","))) inLock(controllerContext.controllerLock) { if (hasStarted.get) { ControllerStats.leaderElectionTimer.time { try { - val curBrokers = currentBrokerList.asScala.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) + val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) val curBrokerIds = curBrokers.map(_.id) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 447e4e2b4bb9..8620fbaad909 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1159,7 +1159,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) } - if (!controller.isActive()) { + if (!controller.isActive) { val results = createTopicsRequest.topics.asScala.map { case (topic, _) => (topic, Errors.NOT_CONTROLLER) } @@ -1209,7 +1209,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) } - if (!controller.isActive()) { + if (!controller.isActive) { val results = deleteTopicRequest.topics.asScala.map { topic => (topic, Errors.NOT_CONTROLLER) }.toMap diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index d9e2b5b01ec2..3221890e9a8e 100755 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -16,7 +16,6 @@ */ package kafka.server -import kafka.utils.ZkUtils._ import kafka.utils.CoreUtils._ import kafka.utils.{Json, Logging, ZKCheckedEphemeral} import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -131,9 +130,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, amILeaderBeforeDataChange && !amILeader } - if (shouldResign) { + if (shouldResign) onResigningAsLeader() - } } /** @@ -149,9 +147,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, amILeader } - if(shouldResign) { + if (shouldResign) onResigningAsLeader() - } inLock(controllerContext.controllerLock) { elect From d64963fc8037f9eda47517436bc5cf93f61f0398 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 19 Dec 2016 14:41:52 +0000 Subject: [PATCH 3/6] More `isActive()` -> `isActive` changes --- .../scala/unit/kafka/controller/ControllerFailoverTest.scala | 2 +- core/src/test/scala/unit/kafka/server/BaseRequestTest.scala | 4 ++-- .../test/scala/unit/kafka/server/MetadataRequestTest.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index 63afd4e1b1b9..fd23894d9e9e 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -68,7 +68,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { val epochMap: mutable.Map[Int, Int] = mutable.Map.empty for (server <- this.servers) { epochMap += (server.config.brokerId -> server.kafkaController.epoch) - if(server.kafkaController.isActive()) { + if(server.kafkaController.isActive) { controller = server } } diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 35dbbf0739c8..a166495b4313 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -56,13 +56,13 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { def controllerSocketServer = { servers.find { server => - server.kafkaController.isActive() + server.kafkaController.isActive }.map(_.socketServer).getOrElse(throw new IllegalStateException("No controller broker is available")) } def notControllerSocketServer = { servers.find { server => - !server.kafkaController.isActive() + !server.kafkaController.isActive }.map(_.socketServer).getOrElse(throw new IllegalStateException("No non-controller broker is available")) } diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index 11dd6fe3bf4a..9bcf4fd3695a 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -49,7 +49,7 @@ class MetadataRequestTest extends BaseRequestTest { @Test def testControllerId() { - val controllerServer = servers.find(_.kafkaController.isActive()).get + val controllerServer = servers.find(_.kafkaController.isActive).get val controllerId = controllerServer.config.brokerId val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1) @@ -60,7 +60,7 @@ class MetadataRequestTest extends BaseRequestTest { controllerServer.shutdown() controllerServer.startup() - val controllerServer2 = servers.find(_.kafkaController.isActive()).get + val controllerServer2 = servers.find(_.kafkaController.isActive).get val controllerId2 = controllerServer2.config.brokerId assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2) TestUtils.waitUntilTrue(() => { From 0d39f681773050fac5958966ea4740d161c91e2c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 19 Dec 2016 14:44:08 +0000 Subject: [PATCH 4/6] Use @throws[Exception] instead of throws(classOf[Exception]) consistently --- .../consumer/ZookeeperConsumerConnector.scala | 8 ++++---- .../consumer/ZookeeperTopicEventWatcher.scala | 6 +++--- .../kafka/controller/ControllerZkListener.scala | 4 ++-- .../scala/kafka/controller/KafkaController.scala | 16 ++++++++-------- .../kafka/controller/PartitionStateMachine.scala | 10 +++++----- .../scala/kafka/server/KafkaHealthcheck.scala | 4 ++-- .../kafka/server/ZookeeperLeaderElector.scala | 4 ++-- 7 files changed, 26 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 0b894778bd29..4ef32e9c2057 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -497,7 +497,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val topicCount: TopicCount, val loadBalancerListener: ZKRebalancerListener) extends IZkStateListener { - @throws(classOf[Exception]) + @throws[Exception] def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. } @@ -509,7 +509,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * @throws Exception * On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def handleNewSession() { /** * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a @@ -545,7 +545,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - @throws(classOf[Exception]) + @throws[Exception] def handleDataDeleted(dataPath : String) { // TODO: This need to be implemented when we support delete topic warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time") @@ -597,7 +597,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } watcherExecutorThread.start() - @throws(classOf[Exception]) + @throws[Exception] def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { rebalanceEventTriggered() } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index b9f2d41fc573..d00f465db1cc 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -59,7 +59,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils, class ZkTopicEventListener extends IZkChildListener { - @throws(classOf[Exception]) + @throws[Exception] def handleChildChange(parent: String, children: java.util.List[String]) { lock.synchronized { try { @@ -81,10 +81,10 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils, class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener) extends IZkStateListener { - @throws(classOf[Exception]) + @throws[Exception] def handleStateChanged(state: KeeperState) { } - @throws(classOf[Exception]) + @throws[Exception] def handleNewSession() { lock.synchronized { if (zkUtils != null) { diff --git a/core/src/main/scala/kafka/controller/ControllerZkListener.scala b/core/src/main/scala/kafka/controller/ControllerZkListener.scala index cef99c8af29d..f7557ed10753 100644 --- a/core/src/main/scala/kafka/controller/ControllerZkListener.scala +++ b/core/src/main/scala/kafka/controller/ControllerZkListener.scala @@ -28,14 +28,14 @@ trait ControllerZkListener extends Logging { } trait ControllerZkChildListener extends IZkChildListener with ControllerZkListener { - @throws(classOf[Exception]) + @throws[Exception] final def handleChildChange(parentPath: String, currentChildren: java.util.List[String]): Unit = { // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved if (controller.isActive) doHandleChildChange(parentPath, currentChildren.asScala) } - @throws(classOf[Exception]) + @throws[Exception] def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index ee2b5527dc48..767a1faccfe2 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1148,7 +1148,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState class SessionExpirationListener() extends IZkStateListener with Logging { this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " - @throws(classOf[Exception]) + @throws[Exception] def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. } @@ -1159,7 +1159,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState * * @throws Exception On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def handleNewSession() { info("ZK expired; shut down all controller components and try to re-elect") onControllerResignation() @@ -1242,7 +1242,7 @@ class PartitionsReassignedListener(protected val controller: KafkaController) ex * * @throws Exception On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataChange(dataPath: String, data: AnyRef) { debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" .format(dataPath, data)) @@ -1264,7 +1264,7 @@ class PartitionsReassignedListener(protected val controller: KafkaController) ex } } - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataDeleted(dataPath: String) {} } @@ -1280,7 +1280,7 @@ class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaContr * * @throws Exception On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataChange(dataPath: String, data: AnyRef) { inLock(controllerContext.controllerLock) { debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data)) @@ -1317,7 +1317,7 @@ class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaContr } } - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataDeleted(dataPath: String) {} } @@ -1401,7 +1401,7 @@ class PreferredReplicaElectionListener(protected val controller: KafkaController * * @throws Exception On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataChange(dataPath: String, data: AnyRef) { debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s" .format(dataPath, data.toString)) @@ -1423,7 +1423,7 @@ class PreferredReplicaElectionListener(protected val controller: KafkaController /** * @throws Exception On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataDeleted(dataPath: String) {} } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 6d3011876fac..34319e8adb92 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -408,7 +408,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { protected def logName = "TopicChangeListener" - @throws(classOf[Exception]) + @throws[Exception] def doHandleChildChange(parentPath: String, children: Seq[String]) { inLock(controllerContext.controllerLock) { if (hasStarted.get) { @@ -451,7 +451,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * Invoked when a topic is being deleted * @throws Exception On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def doHandleChildChange(parentPath: String, children: Seq[String]) { inLock(controllerContext.controllerLock) { var topicsToBeDeleted = children.toSet @@ -487,7 +487,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataDeleted(dataPath: String) {} } @@ -495,7 +495,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { protected def logName = "AddPartitionsListener" - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataChange(dataPath: String, data: AnyRef) { inLock(controllerContext.controllerLock) { try { @@ -520,7 +520,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } // this is not implemented for partition change - @throws(classOf[Exception]) + @throws[Exception] def doHandleDataDeleted(parentPath: String): Unit = {} } } diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 0ae912456d4e..4133145f0cb1 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -92,12 +92,12 @@ class KafkaHealthcheck(brokerId: Int, } } - @throws(classOf[Exception]) + @throws[Exception] override def handleStateChanged(state: KeeperState) { stateToMeterMap.get(state).foreach(_.mark()) } - @throws(classOf[Exception]) + @throws[Exception] override def handleNewSession() { info("re-registering broker info in ZK for broker " + brokerId) register() diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index 3221890e9a8e..ca0f6a03d8eb 100755 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -120,7 +120,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, * Called when the leader information stored in zookeeper has changed. Record the new leader in memory * @throws Exception On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def handleDataChange(dataPath: String, data: Object) { val shouldResign = inLock(controllerContext.controllerLock) { val amILeaderBeforeDataChange = amILeader @@ -139,7 +139,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, * @throws Exception * On any error. */ - @throws(classOf[Exception]) + @throws[Exception] def handleDataDeleted(dataPath: String) { val shouldResign = inLock(controllerContext.controllerLock) { debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" From efc7221db24c17564001524801499adc7e48df7f Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 19 Dec 2016 14:48:31 +0000 Subject: [PATCH 5/6] Remove `override` when implementing an abstract method --- core/src/main/scala/kafka/controller/KafkaController.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 767a1faccfe2..f745df8c7aa6 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1148,6 +1148,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState class SessionExpirationListener() extends IZkStateListener with Logging { this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " + @throws[Exception] def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. @@ -1168,7 +1169,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } } - override def handleSessionEstablishmentError(error: Throwable): Unit = { + def handleSessionEstablishmentError(error: Throwable): Unit = { //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError } } @@ -1331,7 +1332,7 @@ class IsrChangeNotificationListener(protected val controller: KafkaController) e protected def logName = "IsrChangeNotificationListener" - override def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit = { + def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit = { inLock(controller.controllerContext.controllerLock) { debug("ISR change notification listener fired") try { From bb082dcb1c77754276e5810be2b57f2c68380efa Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 19 Dec 2016 15:46:29 +0000 Subject: [PATCH 6/6] Remove incorrect @throws. --- .../main/scala/kafka/controller/KafkaController.scala | 10 ---------- .../scala/kafka/controller/PartitionStateMachine.scala | 8 ++------ 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f745df8c7aa6..0aec81d9fe83 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1149,7 +1149,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState class SessionExpirationListener() extends IZkStateListener with Logging { this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " - @throws[Exception] def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. } @@ -1265,7 +1264,6 @@ class PartitionsReassignedListener(protected val controller: KafkaController) ex } } - @throws[Exception] def doHandleDataDeleted(dataPath: String) {} } @@ -1278,10 +1276,7 @@ class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaContr /** * Invoked when some partitions need to move leader to preferred replica - * - * @throws Exception On any error. */ - @throws[Exception] def doHandleDataChange(dataPath: String, data: AnyRef) { inLock(controllerContext.controllerLock) { debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data)) @@ -1318,7 +1313,6 @@ class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaContr } } - @throws[Exception] def doHandleDataDeleted(dataPath: String) {} } @@ -1421,10 +1415,6 @@ class PreferredReplicaElectionListener(protected val controller: KafkaController } } - /** - * @throws Exception On any error. - */ - @throws[Exception] def doHandleDataDeleted(dataPath: String) {} } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 34319e8adb92..c0b94b17cac1 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -408,7 +408,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { protected def logName = "TopicChangeListener" - @throws[Exception] def doHandleChildChange(parentPath: String, children: Seq[String]) { inLock(controllerContext.controllerLock) { if (hasStarted.get) { @@ -457,7 +456,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { var topicsToBeDeleted = children.toSet debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(","))) val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics - if(nonExistentTopics.nonEmpty) { + if (nonExistentTopics.nonEmpty) { warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(",")) nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic))) } @@ -487,7 +486,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } - @throws[Exception] def doHandleDataDeleted(dataPath: String) {} } @@ -495,7 +493,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { protected def logName = "AddPartitionsListener" - @throws[Exception] def doHandleDataChange(dataPath: String, data: AnyRef) { inLock(controllerContext.controllerLock) { try { @@ -514,13 +511,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } } catch { - case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e ) + case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e) } } } // this is not implemented for partition change - @throws[Exception] def doHandleDataDeleted(parentPath: String): Unit = {} } }