From e2dfcf98b70dfd3a97b30cad25b6b74abbd191b1 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 16 Feb 2016 14:30:49 -0800 Subject: [PATCH 1/2] KAFKA-3242: minor rename / logging change to refererences to 'adding partitions' to indicate 'modifying partitions' --- .../controller/PartitionStateMachine.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 73b173e57c28a..b0d3d8fc1bd79 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -51,7 +51,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) private val topicChangeListener = new TopicChangeListener() private val deleteTopicsListener = new DeleteTopicsListener() - private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty + private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty private val stateChangeLogger = KafkaController.stateChangeLogger this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " @@ -82,11 +82,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // de-register topic and partition change listeners def deregisterListeners() { deregisterTopicChangeListener() - addPartitionsListener.foreach { + partitionModificationsListeners.foreach { case (topic, listener) => zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener) } - addPartitionsListener.clear() + partitionModificationsListeners.clear() if(controller.config.deleteTopicEnable) deregisterDeleteTopicListener() } @@ -379,13 +379,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } def registerPartitionChangeListener(topic: String) = { - addPartitionsListener.put(topic, new AddPartitionsListener(topic)) - zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic)) + partitionModificationsListeners.put(topic, new PartitionModificationsListener(topic)) + zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) } def deregisterPartitionChangeListener(topic: String) = { - zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic)) - addPartitionsListener.remove(topic) + zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) + partitionModificationsListeners.remove(topic) } private def registerDeleteTopicListener() = { @@ -497,7 +497,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } - class AddPartitionsListener(topic: String) extends IZkDataListener with Logging { + class PartitionModificationsListener(topic: String) extends IZkDataListener with Logging { this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: " @@ -505,7 +505,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { def handleDataChange(dataPath : String, data: Object) { inLock(controllerContext.controllerLock) { try { - info("Add Partition triggered " + data.toString + " for path " + dataPath) + info("Partition modification triggered " + data.toString + " for path " + dataPath) val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1)) From 881a7b618ee10f9ec09db83087a6cb0090ae590c Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 23 Feb 2016 09:33:49 -0800 Subject: [PATCH 2/2] KAFKA-3242: add string interpolation --- .../src/main/scala/kafka/controller/PartitionStateMachine.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index b0d3d8fc1bd79..ec03b84395bd8 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -505,7 +505,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { def handleDataChange(dataPath : String, data: Object) { inLock(controllerContext.controllerLock) { try { - info("Partition modification triggered " + data.toString + " for path " + dataPath) + info(s"Partition modification triggered $data for path $dataPath") val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1))