Permalink
Browse files

KAFKA-554 Dynamic per-topic configuration. This patch adds a mechanis…

…m for storing per-topic configurations in zookeeper and dynamically making config changes across the cluster. Reviewed by Neha and Jun.
  • Loading branch information...
1 parent 4f2742d commit c1ed12e44ddebe41dc464683e3d7eeb4e6d39a45 @jkreps jkreps committed Mar 8, 2013
Showing with 906 additions and 496 deletions.
  1. +0 −19 bin/kafka-create-topic.sh
  2. +0 −19 bin/kafka-delete-topic.sh
  3. +1 −1 bin/{kafka-list-topic.sh → kafka-topics.sh}
  4. +6 −18 core/src/main/scala/kafka/{common/KafkaZookeperClient.scala → admin/AdminOperationException.scala}
  5. +115 −22 core/src/main/scala/kafka/admin/AdminUtils.scala
  6. +0 −117 core/src/main/scala/kafka/admin/CreateTopicCommand.scala
  7. +3 −3 core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
  8. +185 −0 core/src/main/scala/kafka/admin/TopicCommand.scala
  9. +4 −1 core/src/main/scala/kafka/cluster/Partition.scala
  10. +1 −1 core/src/main/scala/kafka/log/Log.scala
  11. +95 −2 core/src/main/scala/kafka/log/LogConfig.scala
  12. +14 −17 core/src/main/scala/kafka/log/LogManager.scala
  13. +1 −1 core/src/main/scala/kafka/network/SocketServer.scala
  14. +22 −21 core/src/main/scala/kafka/server/KafkaApis.scala
  15. +24 −28 core/src/main/scala/kafka/server/{KafkaZooKeeper.scala → KafkaHealthcheck.scala}
  16. +1 −1 core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  17. +40 −37 core/src/main/scala/kafka/server/KafkaServer.scala
  18. +1 −1 core/src/main/scala/kafka/server/ReplicaManager.scala
  19. +133 −0 core/src/main/scala/kafka/server/TopicConfigManager.scala
  20. +1 −1 core/src/main/scala/kafka/utils/CommandLineUtils.scala
  21. +31 −0 core/src/main/scala/kafka/utils/Json.scala
  22. +19 −0 core/src/main/scala/kafka/utils/Utils.scala
  23. +15 −10 core/src/main/scala/kafka/utils/ZkUtils.scala
  24. +79 −79 core/src/test/scala/unit/kafka/admin/AdminTest.scala
  25. +3 −2 core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
  26. +2 −2 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
  27. +1 −2 core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
  28. +2 −2 core/src/test/scala/unit/kafka/integration/FetcherTest.scala
  29. +7 −20 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
  30. +4 −4 core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
  31. +6 −6 core/src/test/scala/unit/kafka/log/LogManagerTest.scala
  32. +8 −15 core/src/test/scala/unit/kafka/producer/ProducerTest.scala
  33. +4 −4 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
  34. +3 −3 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
  35. +3 −3 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
  36. +8 −8 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
  37. +13 −10 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
  38. +0 −1 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
  39. +6 −4 core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
  40. +2 −2 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
  41. +2 −2 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
  42. +27 −0 core/src/test/scala/unit/kafka/utils/JsonTest.scala
  43. +6 −2 core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
  44. +8 −5 core/src/test/scala/unit/kafka/utils/TestUtils.scala
View
19 bin/kafka-create-topic.sh
@@ -1,19 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-base_dir=$(dirname $0)
-export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
-$base_dir/kafka-run-class.sh kafka.admin.CreateTopicCommand $@
View
19 bin/kafka-delete-topic.sh
@@ -1,19 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-base_dir=$(dirname $0)
-export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
-$base_dir/kafka-run-class.sh kafka.admin.DeleteTopicCommand $@
View
2 bin/kafka-list-topic.sh → bin/kafka-topics.sh
@@ -16,4 +16,4 @@
base_dir=$(dirname $0)
export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
-$base_dir/kafka-run-class.sh kafka.admin.ListTopicCommand $@
+$base_dir/kafka-run-class.sh kafka.admin.TopicCommand $@
View
24 ...la/kafka/common/KafkaZookeperClient.scala → ...kafka/admin/AdminOperationException.scala
@@ -13,23 +13,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-package kafka.common
+package kafka.admin
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, ZKConfig}
-import java.util.concurrent.atomic.AtomicReference
-
-object KafkaZookeeperClient {
- private val INSTANCE = new AtomicReference[ZkClient](null)
-
- def getZookeeperClient(config: ZKConfig): ZkClient = {
- // TODO: This cannot be a singleton since unit tests break if we do that
-// INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-// ZKStringSerializer))
- INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- ZKStringSerializer))
- INSTANCE.get()
- }
-}
+class AdminOperationException(val error: String, cause: Throwable) extends RuntimeException(error, cause) {
+ def this(error: Throwable) = this(error.getMessage, error)
+ def this(msg: String) = this(msg, null)
+}
View
137 core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,9 +18,13 @@
package kafka.admin
import java.util.Random
+import java.util.Properties
import kafka.api.{TopicMetadata, PartitionMetadata}
import kafka.cluster.Broker
import kafka.utils.{Logging, ZkUtils}
+import kafka.log.LogConfig
+import kafka.server.TopicConfigManager
+import kafka.utils.{Logging, Utils, ZkUtils, Json}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import scala.collection._
@@ -30,7 +34,7 @@ import scala.Some
object AdminUtils extends Logging {
val rand = new Random
- val AdminEpoch = -1
+ val TopicConfigChangeZnodePrefix = "config_change_"
/**
* There are 2 goals of replica assignment:
@@ -50,43 +54,136 @@ object AdminUtils extends Logging {
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*/
- def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
+ def assignReplicasToBrokers(brokers: Seq[Int],
+ partitions: Int,
+ replicationFactor: Int,
fixedStartIndex: Int = -1) // for testing only
: Map[Int, Seq[Int]] = {
- if (nPartitions <= 0)
- throw new AdministrationException("number of partitions must be larger than 0")
+ if (partitions <= 0)
+ throw new AdminOperationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
- throw new AdministrationException("replication factor must be larger than 0")
- if (replicationFactor > brokerList.size)
- throw new AdministrationException("replication factor: " + replicationFactor +
- " larger than available brokers: " + brokerList.size)
+ throw new AdminOperationException("replication factor must be larger than 0")
+ if (replicationFactor > brokers.size)
+ throw new AdminOperationException("replication factor: " + replicationFactor +
+ " larger than available brokers: " + brokers.size)
val ret = new mutable.HashMap[Int, List[Int]]()
- val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
+ val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size)
- var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
- for (i <- 0 until nPartitions) {
- if (i > 0 && (i % brokerList.size == 0))
+ var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokers.size)
+ for (i <- 0 until partitions) {
+ if (i > 0 && (i % brokers.size == 0))
secondReplicaShift += 1
- val firstReplicaIndex = (i + startIndex) % brokerList.size
- var replicaList = List(brokerList(firstReplicaIndex))
+ val firstReplicaIndex = (i + startIndex) % brokers.size
+ var replicaList = List(brokers(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
- replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
+ replicaList ::= brokers(replicaIndex(firstReplicaIndex, secondReplicaShift, j, brokers.size))
ret.put(i, replicaList.reverse)
}
ret.toMap
}
+
+ def deleteTopic(zkClient: ZkClient, topic: String) {
+ zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
+ zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
+ }
+
+ def topicExists(zkClient: ZkClient, topic: String): Boolean =
+ zkClient.exists(ZkUtils.getTopicPath(topic))
+
+ def createTopic(zkClient: ZkClient,
+ topic: String,
+ partitions: Int,
+ replicationFactor: Int,
+ topicConfig: Properties = new Properties) {
+ val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+ val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, replicaAssignment, topicConfig)
+ }
+
+ def createTopicWithAssignment(zkClient: ZkClient,
+ topic: String,
+ partitionReplicaAssignment: Map[Int, Seq[Int]],
+ config: Properties = new Properties) {
+ // validate arguments
+ Topic.validate(topic)
+ LogConfig.validate(config)
+ require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
- def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) {
+ val topicPath = ZkUtils.getTopicPath(topic)
+ if(zkClient.exists(topicPath))
+ throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
+ partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
+
+ // write out the config if there is any, this isn't transactional with the partition assignments
+ writeTopicConfig(zkClient, topic, config)
+
+ // create the partition assignment
+ writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment)
+ }
+
+ private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]]) {
try {
val zkPath = ZkUtils.getTopicPath(topic)
val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2 => throw new AdminOperationException(e2.toString)
+ }
+ }
+
+ /**
+ * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+ */
+ def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
+ LogConfig.validate(config)
+ if(!topicExists(zkClient, topic))
+ throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
+
+ // write the new config--may not exist if there were previously no overrides
+ writeTopicConfig(zkClient, topic, config)
+
+ // create the change notification
+ zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
+ }
+
+ /**
+ * Write out the topic config to zk, if there is any
+ */
+ private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
+ if(config.size > 0) {
+ val map = Map("version" -> 1, "config" -> JavaConversions.asMap(config))
+ ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
}
}
+
+ /**
+ * Read the topic config (if any) from zk
+ */
+ def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = {
+ val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true)
+ val props = new Properties()
+ if(str != null) {
+ Json.parseFull(str) match {
+ case None => // there are no config overrides
+ case Some(map: Map[String, _]) =>
+ require(map("version") == 1)
+ map.get("config") match {
+ case Some(config: Map[String, String]) =>
+ for((k,v) <- config)
+ props.setProperty(k, v)
+ case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
+ }
+
+ case o => throw new IllegalArgumentException("Unexpected value in config: " + str)
+ }
+ }
+ props
+ }
+
+ def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
+ ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap
def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
@@ -158,12 +255,8 @@ object AdminUtils extends Logging {
}
}
- private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
+ private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
}
-
-class AdministrationException(val errorMessage: String) extends RuntimeException(errorMessage) {
- def this() = this(null)
-}
View
117 core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
-import scala.collection.mutable
-import kafka.common.Topic
-
-object CreateTopicCommand extends Logging {
-
- def main(args: Array[String]): Unit = {
- val parser = new OptionParser
- val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be created.")
- .withRequiredArg
- .describedAs("topic")
- .ofType(classOf[String])
- val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
- "Multiple URLS can be given to allow fail-over.")
- .withRequiredArg
- .describedAs("urls")
- .ofType(classOf[String])
- val nPartitionsOpt = parser.accepts("partition", "number of partitions in the topic")
- .withRequiredArg
- .describedAs("# of partitions")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
- val replicationFactorOpt = parser.accepts("replica", "replication factor for each partitions in the topic")
- .withRequiredArg
- .describedAs("replication factor")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
- val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers")
- .withRequiredArg
- .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
- "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
- .ofType(classOf[String])
- .defaultsTo("")
-
- val options = parser.parse(args : _*)
-
- for(arg <- List(topicOpt, zkConnectOpt)) {
- if(!options.has(arg)) {
- System.err.println("Missing required argument \"" + arg + "\"")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
- }
-
- val topic = options.valueOf(topicOpt)
- val zkConnect = options.valueOf(zkConnectOpt)
- val nPartitions = options.valueOf(nPartitionsOpt).intValue
- val replicationFactor = options.valueOf(replicationFactorOpt).intValue
- val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
- var zkClient: ZkClient = null
- try {
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
- createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
- println("creation succeeded!")
- } catch {
- case e =>
- println("creation failed because of " + e.getMessage)
- println(Utils.stackTrace(e))
- } finally {
- if (zkClient != null)
- zkClient.close()
- }
- }
-
- def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
- Topic.validate(topic)
-
- val brokerList = ZkUtils.getSortedBrokerList(zkClient)
-
- val partitionReplicaAssignment = if (replicaAssignmentStr == "")
- AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
- else
- getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
- debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
- }
-
- def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = {
- val partitionList = replicaAssignmentList.split(",")
- val ret = new mutable.HashMap[Int, List[Int]]()
- for (i <- 0 until partitionList.size) {
- val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
- if (brokerList.size <= 0)
- throw new AdministrationException("replication factor must be larger than 0")
- if (brokerList.size != brokerList.toSet.size)
- throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList)
- if (!brokerList.toSet.subsetOf(availableBrokerList))
- throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
- "available broker:" + availableBrokerList.toString)
- ret.put(i, brokerList.toList)
- if (ret(i).size != ret(0).size)
- throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
- }
- ret.toMap
- }
-}
View
6 core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -85,7 +85,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
TopicAndPartition(topic, partition)
}
- case None => throw new AdministrationException("Preferred replica election data is empty")
+ case None => throw new AdminOperationException("Preferred replica election data is empty")
}
}
@@ -102,9 +102,9 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case nee: ZkNodeExistsException =>
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
- throw new AdministrationException("Preferred replica leader election currently in progress for " +
+ throw new AdminOperationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2 => throw new AdminOperationException(e2.toString)
}
}
}
View
185 core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple._
+import java.util.Properties
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import scala.collection._
+import scala.collection.JavaConversions._
+import kafka.common.Topic
+import kafka.cluster.Broker
+
+object TopicCommand {
+
+ def main(args: Array[String]): Unit = {
+
+ val opts = new TopicCommandOptions(args)
+
+ // should have exactly one action
+ val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
+ if(actions != 1) {
+ System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter")
+ opts.parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+
+ CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
+
+ val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
+
+ if(opts.options.has(opts.createOpt))
+ createTopic(zkClient, opts)
+ else if(opts.options.has(opts.alterOpt))
+ alterTopic(zkClient, opts)
+ else if(opts.options.has(opts.deleteOpt))
+ deleteTopic(zkClient, opts)
+ else if(opts.options.has(opts.listOpt))
+ listTopics(zkClient)
+ else if(opts.options.has(opts.describeOpt))
+ describeTopic(zkClient, opts)
+
+ zkClient.close()
+ }
+
+ def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+ CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
+ val topics = opts.options.valuesOf(opts.topicOpt)
+ val configs = parseTopicConfigs(opts)
+ for (topic <- topics) {
+ if (opts.options.has(opts.replicaAssignmentOpt)) {
+ val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
+ AdminUtils.createTopicWithAssignment(zkClient, topic, assignment, configs)
+ } else {
+ CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
+ val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
+ val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
+ AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
+ }
+ println("Created topic \"%s\".".format(topic))
+ }
+ }
+
+ def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+ CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
+ val topics = opts.options.valuesOf(opts.topicOpt)
+ val configs = parseTopicConfigs(opts)
+ if(opts.options.has(opts.partitionsOpt))
+ Utils.croak("Changing the number of partitions is not supported.")
+ if(opts.options.has(opts.replicationFactorOpt))
+ Utils.croak("Changing the replication factor is not supported.")
+ for(topic <- topics) {
+ AdminUtils.changeTopicConfig(zkClient, topic, configs)
+ println("Updated config for topic \"%s\".".format(topic))
+ }
+ }
+
+ def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+ CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
+ for(topic <- opts.options.valuesOf(opts.topicOpt)) {
+ AdminUtils.deleteTopic(zkClient, topic)
+ println("Topic \"%s\" deleted.".format(topic))
+ }
+ }
+
+ def listTopics(zkClient: ZkClient) {
+ for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
+ println(topic)
+ }
+
+ def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+ CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
+ val topics = opts.options.valuesOf(opts.topicOpt)
+ val metadata = AdminUtils.fetchTopicMetadataFromZk(topics.toSet, zkClient)
+ for(md <- metadata) {
+ println(md.topic)
+ val config = AdminUtils.fetchTopicConfig(zkClient, md.topic)
+ println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
+ println("\tpartitions: " + md.partitionsMetadata.size)
+ for(pd <- md.partitionsMetadata) {
+ println("\t\tpartition " + pd.partitionId)
+ println("\t\tleader: " + (if(pd.leader.isDefined) formatBroker(pd.leader.get) else "none"))
+ println("\t\treplicas: " + pd.replicas.map(formatBroker).mkString(", "))
+ println("\t\tisr: " + pd.isr.map(formatBroker).mkString(", "))
+ }
+ }
+ }
+
+ def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
+
+ def parseTopicConfigs(opts: TopicCommandOptions): Properties = {
+ val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*"))
+ require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".")
+ val props = new Properties
+ configs.foreach(pair => props.setProperty(pair(0), pair(1)))
+ props
+ }
+
+ def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
+ val partitionList = replicaAssignmentList.split(",")
+ val ret = new mutable.HashMap[Int, List[Int]]()
+ for (i <- 0 until partitionList.size) {
+ val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+ ret.put(i, brokerList.toList)
+ if (ret(i).size != ret(0).size)
+ throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
+ }
+ ret.toMap
+ }
+
+ class TopicCommandOptions(args: Array[String]) {
+ val parser = new OptionParser
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+ val listOpt = parser.accepts("list", "List all available topics.")
+ val createOpt = parser.accepts("create", "Create a new topic.")
+ val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
+ val deleteOpt = parser.accepts("delete", "Delete the topic.")
+ val describeOpt = parser.accepts("describe", "List details for the given topics.")
+ val helpOpt = parser.accepts("help", "Print usage information.")
+ val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.")
+ .withRequiredArg
+ .describedAs("name=value")
+ .ofType(classOf[String])
+ val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created.")
+ .withRequiredArg
+ .describedAs("# of partitions")
+ .ofType(classOf[java.lang.Integer])
+ val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
+ .withRequiredArg
+ .describedAs("replication factor")
+ .ofType(classOf[java.lang.Integer])
+ val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
+ .withRequiredArg
+ .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
+ "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
+ .ofType(classOf[String])
+
+
+ val options = parser.parse(args : _*)
+ }
+
+}
View
5 core/src/main/scala/kafka/cluster/Partition.scala
@@ -17,9 +17,11 @@
package kafka.cluster
import scala.collection._
+import kafka.admin.AdminUtils
import kafka.utils._
import java.lang.Object
import kafka.api.LeaderAndIsr
+import kafka.log.LogConfig
import kafka.server.ReplicaManager
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
@@ -74,7 +76,8 @@ class Partition(val topic: String,
case Some(replica) => replica
case None =>
if (isReplicaLocal(replicaId)) {
- val log = logManager.getOrCreateLog(topic, partitionId)
+ val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
+ val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
View
2 core/src/main/scala/kafka/log/Log.scala
@@ -49,7 +49,7 @@ import com.yammer.metrics.core.Gauge
*/
@threadsafe
class Log(val dir: File,
- val config: LogConfig,
+ @volatile var config: LogConfig,
val needsRecovery: Boolean,
val scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
View
97 core/src/main/scala/kafka/log/LogConfig.scala
@@ -17,7 +17,7 @@
package kafka.log
-import java.io.File
+import java.util.Properties
import scala.collection._
import kafka.common._
@@ -46,6 +46,99 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
val indexInterval: Int = 4096,
val fileDeleteDelayMs: Long = 60*1000,
val minCleanableRatio: Double = 0.5,
- val dedupe: Boolean = false)
+ val dedupe: Boolean = false) {
+
+ def toProps: Properties = {
+ val props = new Properties()
+ import LogConfig._
+ props.put(SegmentBytesProp, segmentSize.toString)
+ props.put(SegmentMsProp, segmentMs.toString)
+ props.put(SegmentIndexBytesProp, maxIndexSize.toString)
+ props.put(FlushMessagesProp, flushInterval.toString)
+ props.put(FlushMsProp, flushMs.toString)
+ props.put(RetentionBytesProp, retentionSize.toString)
+ props.put(RententionMsProp, retentionMs.toString)
+ props.put(MaxMessageBytesProp, maxMessageSize.toString)
+ props.put(IndexIntervalBytesProp, indexInterval.toString)
+ props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
+ props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
+ props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
+ props
+ }
+
+}
+
+object LogConfig {
+ val SegmentBytesProp = "segment.bytes"
+ val SegmentMsProp = "segment.ms"
+ val SegmentIndexBytesProp = "segment.index.bytes"
+ val FlushMessagesProp = "flush.messages"
+ val FlushMsProp = "flush.ms"
+ val RetentionBytesProp = "retention.bytes"
+ val RententionMsProp = "retention.ms"
+ val MaxMessageBytesProp = "max.message.bytes"
+ val IndexIntervalBytesProp = "index.interval.bytes"
+ val FileDeleteDelayMsProp = "file.delete.delay.ms"
+ val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
+ val CleanupPolicyProp = "cleanup.policy"
+
+ val ConfigNames = Set(SegmentBytesProp,
+ SegmentMsProp,
+ SegmentIndexBytesProp,
+ FlushMessagesProp,
+ FlushMsProp,
+ RetentionBytesProp,
+ RententionMsProp,
+ MaxMessageBytesProp,
+ IndexIntervalBytesProp,
+ FileDeleteDelayMsProp,
+ MinCleanableDirtyRatioProp,
+ CleanupPolicyProp)
+
+
+ /**
+ * Parse the given properties instance into a LogConfig object
+ */
+ def fromProps(props: Properties): LogConfig = {
+ new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt,
+ segmentMs = props.getProperty(SegmentMsProp).toLong,
+ maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt,
+ flushInterval = props.getProperty(FlushMessagesProp).toLong,
+ flushMs = props.getProperty(FlushMsProp).toLong,
+ retentionSize = props.getProperty(RetentionBytesProp).toLong,
+ retentionMs = props.getProperty(RententionMsProp).toLong,
+ maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt,
+ indexInterval = props.getProperty(IndexIntervalBytesProp).toInt,
+ fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
+ minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
+ dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
+ }
+
+ /**
+ * Create a log config instance using the given properties and defaults
+ */
+ def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
+ val props = new Properties(defaults)
+ props.putAll(overrides)
+ fromProps(props)
+ }
+
+ /**
+ * Check that property names are valid
+ */
+ private def validateNames(props: Properties) {
+ for(name <- JavaConversions.asMap(props).keys)
+ require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
+ }
+
+ /**
+ * Check that the given properties contain only valid log config names, and that all values can be parsed.
+ */
+ def validate(props: Properties) {
+ validateNames(props)
+ LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
+ }
+
+}
View
31 core/src/main/scala/kafka/log/LogManager.scala
@@ -174,31 +174,19 @@ class LogManager(val logDirs: Array[File],
/**
* Get the log if it exists, otherwise return None
*/
- def getLog(topic: String, partition: Int): Option[Log] = {
- val topicAndPartiton = TopicAndPartition(topic, partition)
- val log = logs.get(topicAndPartiton)
+ def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
+ val log = logs.get(topicAndPartition)
if (log == null)
None
else
Some(log)
}
/**
- * Create the log if it does not exist, otherwise just return it
- */
- def getOrCreateLog(topic: String, partition: Int): Log = {
- val topicAndPartition = TopicAndPartition(topic, partition)
- logs.get(topicAndPartition) match {
- case null => createLogIfNotExists(topicAndPartition)
- case log: Log => log
- }
- }
-
- /**
* Create a log for the given topic and the given partition
* If the log already exists, just return a copy of the existing log
*/
- private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = {
+ def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
logCreationLock synchronized {
var log = logs.get(topicAndPartition)
@@ -211,12 +199,16 @@ class LogManager(val logDirs: Array[File],
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
log = new Log(dir,
- defaultConfig,
+ config,
needsRecovery = false,
scheduler,
time)
- info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
logs.put(topicAndPartition, log)
+ info("Created log for topic %s partition %d in %s with properties {%s}."
+ .format(topicAndPartition.topic,
+ topicAndPartition.partition,
+ dataDir.getAbsolutePath,
+ JavaConversions.asMap(config.toProps).mkString(", ")))
log
}
}
@@ -289,6 +281,11 @@ class LogManager(val logDirs: Array[File],
* Get all the partition logs
*/
def allLogs(): Iterable[Log] = logs.values
+
+ /**
+ * Get a map of TopicAndPartition => Log
+ */
+ def logsByTopicPartition = logs.toMap
/**
* Flush any log which has exceeded its flush interval and has unwritten messages.
View
2 core/src/main/scala/kafka/network/SocketServer.scala
@@ -281,7 +281,7 @@ private[kafka] class Processor(val id: Int,
debug("Ignoring response for closed socket.")
close(key)
}
- }finally {
+ } finally {
curr = requestChannel.receiveResponse(id)
}
}
View
43 core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,14 +17,15 @@
package kafka.server
-import kafka.admin.{CreateTopicCommand, AdminUtils}
+import kafka.admin.AdminUtils
import kafka.api._
import kafka.message._
import kafka.network._
import kafka.log._
import kafka.utils.ZKGroupTopicDirs
import org.apache.log4j.Logger
import scala.collection._
+import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
import kafka.metrics.KafkaMetricsGroup
@@ -367,7 +368,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
- logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match {
+ logManager.getLog(topicAndPartition) match {
case Some(log) =>
fetchOffsetsBefore(log, timestamp, maxNumOffsets)
case None =>
@@ -442,7 +443,7 @@ class KafkaApis(val requestChannel: RequestChannel,
/* check if auto creation of topics is turned on */
if (config.autoCreateTopicsEnable) {
try {
- CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+ AdminUtils.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
} catch {
@@ -478,24 +479,25 @@ class KafkaApis(val requestChannel: RequestChannel,
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString)
trace("Handling offset commit request " + offsetCommitRequest.toString)
- val responseInfo = offsetCommitRequest.requestInfo.map( t => {
- val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic)
- try {
- if(t._2.metadata != null && t._2.metadata.length > config.offsetMetadataMaxSize) {
- (t._1, ErrorMapping.OffsetMetadataTooLargeCode)
- } else {
- ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
- t._1.partition, t._2.offset.toString)
- (t._1, ErrorMapping.NoError)
+ val responseInfo = offsetCommitRequest.requestInfo.map{
+ case (topicAndPartition, metaAndError) => {
+ val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
+ try {
+ if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
+ (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+ } else {
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+ topicAndPartition.partition, metaAndError.offset.toString)
+ (topicAndPartition, ErrorMapping.NoError)
+ }
+ } catch {
+ case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
- } catch {
- case e =>
- (t._1, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
- })
+ }
val response = new OffsetCommitResponse(responseInfo,
- offsetCommitRequest.correlationId,
- offsetCommitRequest.clientId)
+ offsetCommitRequest.correlationId,
+ offsetCommitRequest.clientId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
@@ -506,7 +508,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString)
- trace("Handling offset fetch request " + offsetFetchRequest.toString)
val responseInfo = offsetFetchRequest.requestInfo.map( t => {
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
try {
@@ -525,8 +526,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
})
val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*),
- offsetFetchRequest.correlationId,
- offsetFetchRequest.clientId)
+ offsetFetchRequest.correlationId,
+ offsetFetchRequest.clientId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
View
52 ...n/scala/kafka/server/KafkaZooKeeper.scala → ...scala/kafka/server/KafkaHealthcheck.scala
@@ -25,30 +25,36 @@ import java.net.InetAddress
/**
- * Handles registering broker with zookeeper in the following path:
+ * This class registers the broker in zookeeper to allow
+ * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
* /brokers/[0...N] --> host:port
+ *
+ * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
+ * we are dead.
*/
-class KafkaZooKeeper(config: KafkaConfig) extends Logging {
+class KafkaHealthcheck(private val brokerId: Int,
+ private val host: String,
+ private val port: Int,
+ private val zkClient: ZkClient) extends Logging {
- val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
- private var zkClient: ZkClient = null
-
- def startup() {
- /* start client */
- info("connecting to ZK: " + config.zkConnect)
- zkClient = KafkaZookeeperClient.getZookeeperClient(config)
- zkClient.subscribeStateChanges(new SessionExpireListener)
- registerBrokerInZk()
- }
+ val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
+
+ def startup() {
+ zkClient.subscribeStateChanges(new SessionExpireListener)
+ register()
+ }
- private def registerBrokerInZk() {
+ /**
+ * Register this broker as "alive" in zookeeper
+ */
+ def register() {
val hostName =
- if(config.hostName == null || config.hostName.trim.isEmpty)
+ if(host == null || host.trim.isEmpty)
InetAddress.getLocalHost.getCanonicalHostName
else
- config.hostName
+ host
val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
- ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort)
+ ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, jmxPort)
}
/**
@@ -70,21 +76,11 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
*/
@throws(classOf[Exception])
def handleNewSession() {
- info("re-registering broker info in ZK for broker " + config.brokerId)
- registerBrokerInZk()
+ info("re-registering broker info in ZK for broker " + brokerId)
+ register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
}
- def shutdown() {
- if (zkClient != null) {
- info("Closing zookeeper client...")
- zkClient.close()
- }
- }
-
- def getZookeeperClient = {
- zkClient
- }
}
View
2 core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -67,7 +67,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
handler.shutdown
for(thread <- threads)
thread.join
- info("shutted down completely")
+ info("shut down completely")
}
}
View
77 core/src/main/scala/kafka/server/KafkaServer.scala
@@ -18,6 +18,7 @@
package kafka.server
import kafka.network.SocketServer
+import kafka.admin._
import kafka.log.LogConfig
import kafka.log.CleanerConfig
import kafka.log.LogManager
@@ -39,7 +40,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
var logManager: LogManager = null
- var kafkaZookeeper: KafkaZooKeeper = null
+ var kafkaHealthcheck: KafkaHealthcheck = null
+ var topicConfigManager: TopicConfigManager = null
var replicaManager: ReplicaManager = null
var apis: KafkaApis = null
var kafkaController: KafkaController = null
@@ -57,9 +59,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
/* start scheduler */
kafkaScheduler.startup()
+
+ /* setup zookeeper */
+ zkClient = initZk()
/* start log manager */
- logManager = createLogManager(config)
+ logManager = createLogManager(zkClient)
logManager.startup()
socketServer = new SocketServer(config.brokerId,
@@ -68,31 +73,40 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketRequestMaxBytes)
+ socketServer.startup()
- socketServer.startup
-
- /* start client */
- kafkaZookeeper = new KafkaZooKeeper(config)
- // starting relevant replicas and leader election for partitions assigned to this broker
- kafkaZookeeper.startup
-
- info("Connecting to ZK: " + config.zkConnect)
-
- replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
-
- kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
- apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, config)
+ replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager)
+ kafkaController = new KafkaController(config, zkClient)
+
+ /* start processing requests */
+ apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
- Mx4jLoader.maybeLoad
+
+ Mx4jLoader.maybeLoad()
- // start the replica manager
replicaManager.startup()
- // start the controller
+
kafkaController.startup()
- // register metrics beans
+
+ topicConfigManager = new TopicConfigManager(zkClient, logManager)
+ topicConfigManager.startup()
+
+ /* tell everyone we are alive */
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, zkClient)
+ kafkaHealthcheck.startup()
+
+
registerStats()
+
info("started")
}
+
+ private def initZk(): ZkClient = {
+ info("Connecting to zookeeper on " + config.zkConnect)
+ val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+ ZkUtils.setupCommonPaths(zkClient)
+ zkClient
+ }
/**
* Forces some dynamic jmx beans to be registered on server startup.
@@ -118,15 +132,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
Utils.swallow(kafkaScheduler.shutdown())
if(apis != null)
Utils.swallow(apis.close())
- if(kafkaZookeeper != null)
- Utils.swallow(kafkaZookeeper.shutdown())
if(replicaManager != null)
Utils.swallow(replicaManager.shutdown())
if(logManager != null)
Utils.swallow(logManager.shutdown())
-
if(kafkaController != null)
Utils.swallow(kafkaController.shutdown())
+ if(zkClient != null)
+ Utils.swallow(zkClient.close())
shutdownLatch.countDown()
info("shut down completed")
@@ -140,13 +153,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def getLogManager(): LogManager = logManager
- private def createLogManager(config: KafkaConfig): LogManager = {
- val topics = config.logCleanupPolicyMap.keys ++
- config.logSegmentBytesPerTopicMap.keys ++
- config.logFlushIntervalMsPerTopicMap.keys ++
- config.logRollHoursPerTopicMap.keys ++
- config.logRetentionBytesPerTopicMap.keys ++
- config.logRetentionHoursPerTopicMap.keys
+ private def createLogManager(zkClient: ZkClient): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = 60 * 60 * 1000 * config.logRollHours,
flushInterval = config.logFlushIntervalMessages,
@@ -159,13 +166,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
fileDeleteDelayMs = config.logDeleteDelayMs,
minCleanableRatio = config.logCleanerMinCleanRatio,
dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe")
- val logConfigs = for(topic <- topics) yield
- topic -> defaultLogConfig.copy(segmentSize = config.logSegmentBytesPerTopicMap.getOrElse(topic, config.logSegmentBytes),
- segmentMs = 60 * 60 * 1000 * config.logRollHoursPerTopicMap.getOrElse(topic, config.logRollHours),
- flushMs = config.logFlushIntervalMsPerTopicMap.getOrElse(topic, config.logFlushIntervalMs).toLong,
- retentionSize = config.logRetentionBytesPerTopicMap.getOrElse(topic, config.logRetentionBytes),
- retentionMs = 60 * 60 * 1000 * config.logRetentionHoursPerTopicMap.getOrElse(topic, config.logRetentionHours),
- dedupe = config.logCleanupPolicyMap.getOrElse(topic, config.logCleanupPolicy).trim.toLowerCase == "dedupe")
+ val defaultProps = defaultLogConfig.toProps
+ val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
+ // read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
ioBufferSize = config.logCleanerIoBufferSize,
@@ -174,7 +177,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
- topicConfigs = logConfigs.toMap,
+ topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
flushCheckMs = config.logFlushSchedulerIntervalMs,
View
2 core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -175,7 +175,7 @@ class ReplicaManager(val config: KafkaConfig,
case Some(leaderReplica) => leaderReplica
case None =>
throw new LeaderNotAvailableException("Leader not local for topic %s partition %d on broker %d"
- .format(topic, partitionId, config.brokerId))
+ .format(topic, partitionId, config.brokerId))
}
}
}
View
133 core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+import scala.collection._
+import kafka.log._
+import kafka.utils._
+import kafka.admin.AdminUtils
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+
+/**
+ * This class initiates and carries out topic config changes.
+ *
+ * It works as follows.
+ *
+ * Config is stored under the path
+ * /brokers/topics/<topic_name>/config
+ * This znode stores the topic-overrides for this topic (but no defaults) in properties format.
+ *
+ * To avoid watching all topics for changes instead we have a notification path
+ * /brokers/config_changes
+ * The TopicConfigManager has a child watch on this path.
+ *
+ * To update a topic config we first update the topic config properties. Then we create a new sequential
+ * znode under the change path which contains the name of the topic that was updated, say
+ * /brokers/config_changes/config_change_13321
+ *
+ * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
+ * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
+ * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
+ * For any new changes it reads the new configuration, combines it with the defaults, and updates the log config
+ * for all logs for that topic (if any) that it has.
+ *
+ * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
+ * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
+ * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the
+ * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
+ * but that is harmless.
+ *
+ * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
+ * on startup where a change might be missed between the initial config load and registering for change notifications.
+ *
+ */
+class TopicConfigManager(private val zkClient: ZkClient,
+ private val logManager: LogManager,
+ private val changeExpirationMs: Long = 10*60*1000,
+ private val time: Time = SystemTime) extends Logging {
+ private var lastExecutedChange = -1L
+
+ /**
+ * Begin watching for config changes
+ */
+ def startup() {
+ ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
+ zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
+ processAllConfigChanges()
+ }
+
+ /**
+ * Process all config changes
+ */
+ private def processAllConfigChanges() {
+ val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
+ processConfigChanges(JavaConversions.asBuffer(configChanges).sorted)
+ }
+
+ /**
+ * Process the given list of config changes
+ */
+ private def processConfigChanges(notifications: Seq[String]) {
+ if (notifications.size > 0) {
+ info("Processing %d topic config change notification(s)...".format(notifications.size))
+ val now = time.milliseconds
+ val logs = logManager.logsByTopicPartition.toBuffer
+ val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
+ val lastChangeId = notifications.map(changeNumber).max
+ for (notification <- notifications) {
+ val changeId = changeNumber(notification)
+ if (changeId > lastExecutedChange) {
+ val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
+ val (topicJson, stat) = ZkUtils.readData(zkClient, changeZnode)
+ val topic = topicJson.substring(1, topicJson.length - 1) // dequote
+ if (logsByTopic.contains(topic)) {
+ /* combine the default properties with the overrides in zk to create the new LogConfig */
+ val props = new Properties(logManager.defaultConfig.toProps)
+ props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
+ val logConfig = LogConfig.fromProps(props)
+ for (log <- logsByTopic(topic))
+ log.config = logConfig
+ lastExecutedChange = changeId
+ info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
+ } else if (now - stat.getCtime > changeExpirationMs) {
+ /* this change is now obsolete, try to delete it unless it is the last change left */
+ ZkUtils.deletePath(zkClient, changeZnode)
+ }
+ }
+ }
+ }
+ }
+
+ /* get the change number from a change notification znode */
+ private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong
+
+ /**
+ * A listener that applies config changes to logs
+ */
+ object ConfigChangeListener extends IZkChildListener {
+ override def handleChildChange(path: String, chillins: java.util.List[String]) {
+ try {
+ processConfigChanges(JavaConversions.asBuffer(chillins))
+ } catch {
+ case e: Exception => error("Error processing config change:", e)
+ }
+ }
+ }
+
+}
View
2 core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -10,7 +10,7 @@ object CommandLineUtils extends Logging {
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {
- error("Missing required argument \"" + arg + "\"")
+ System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
View
31 core/src/main/scala/kafka/utils/Json.scala
@@ -1,6 +1,7 @@
package kafka.utils
import kafka.common._
+import scala.collection._
import util.parsing.json.JSON
/**
@@ -11,6 +12,9 @@ object Json extends Logging {
JSON.globalNumberParser = myConversionFunc
val lock = new Object
+ /**
+ * Parse a JSON string into an object
+ */
def parseFull(input: String): Option[Any] = {
lock synchronized {
try {
@@ -21,4 +25,31 @@ object Json extends Logging {
}
}
}
+
+ /**
+ * Encode an object into a JSON string. This method accepts any type T where
+ * T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
+ * Any other type will result in an exception.
+ *
+ * This method does not properly handle non-ascii characters.
+ */
+ def encode(obj: Any): String = {
+ obj match {
+ case null => "null"
+ case b: Boolean => b.toString
+ case s: String => "\"" + s + "\""
+ case n: Number => n.toString
+ case m: Map[_, _] =>
+ "{" +
+ m.map(elem =>
+ elem match {
+ case t: Tuple2[_,_] => encode(t._1) + ":" + encode(t._2)
+ case _ => throw new IllegalArgumentException("Invalid map element (" + elem + ") in " + obj)
+ }).mkString(",") +
+ "}"
+ case a: Array[_] => encode(a.toSeq)
+ case i: Iterable[_] => "[" + i.map(encode).mkString(",") + "]"
+ case other: AnyRef => throw new IllegalArgumentException("Unknown arguement of type " + other.getClass + ": " + other)
+ }
+ }
}
View
19 core/src/main/scala/kafka/utils/Utils.scala
@@ -577,6 +577,25 @@ object Utils extends Logging {
}
/**
+ * Turn a properties map into a string
+ */
+ def asString(props: Properties): String = {
+ val writer = new StringWriter()
+ props.store(writer, "")
+ writer.toString
+ }
+
+ /**
+ * Read some properties with the given default values
+ */
+ def readProps(s: String, defaults: Properties): Properties = {
+ val reader = new StringReader(s)
+ val props = new Properties(defaults)
+ props.load(reader)
+ props
+ }
+
+ /**
* Read a big-endian integer from a byte array
*/
def readInt(bytes: Array[Byte], offset: Int): Int = {
View
25 core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -38,6 +38,8 @@ object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
+ val TopicConfigPath = "/config/topics"
+ val TopicConfigChangesPath = "/config/changes"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
@@ -51,24 +53,24 @@ object ZkUtils extends Logging {
getTopicPath(topic) + "/partitions"
}
+ def getTopicConfigPath(topic: String): String =
+ TopicConfigPath + "/" + topic
+
def getController(zkClient: ZkClient): Int= {
readDataMaybeNull(zkClient, ControllerPath)._1 match {
case Some(controller) => controller.toInt
case None => throw new KafkaException("Controller doesn't exist")
}
}
- def getTopicPartitionPath(topic: String, partitionId: Int): String ={
+ def getTopicPartitionPath(topic: String, partitionId: Int): String =
getTopicPartitionsPath(topic) + "/" + partitionId
- }
- def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String ={
+ def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
getTopicPartitionPath(topic, partitionId) + "/" + "state"
- }
- def getSortedBrokerList(zkClient: ZkClient): Seq[Int] ={
+ def getSortedBrokerList(zkClient: ZkClient): Seq[Int] =
ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).sorted
- }
def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
@@ -89,6 +91,11 @@ object ZkUtils extends Logging {
def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
}
+
+ def setupCommonPaths(zkClient: ZkClient) {
+ for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath))
+ makeSurePersistentPathExists(zkClient, path)
+ }
def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat)
: Option[LeaderIsrAndControllerEpoch] = {
@@ -179,7 +186,7 @@ object ZkUtils extends Logging {
debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas))
replicas.contains(brokerId.toString)
}
-
+
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val brokerInfo =
@@ -312,10 +319,8 @@ object ZkUtils extends Logging {
case e: ZkNodeExistsException =>
stat = client.writeData(path, data)
return stat.getVersion
- case e2 => throw e2
}
}
- case e2 => throw e2
}
}
@@ -596,7 +601,7 @@ object ZkUtils extends Logging {
case nne: ZkNoNodeException =>
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2 => throw new AdminOperationException(e2.toString)
}
}
}
View
158 core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -19,6 +19,9 @@ package kafka.admin
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.utils._
+import kafka.log._
import kafka.zk.ZooKeeperTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{Logging, ZkUtils, TestUtils}
@@ -32,28 +35,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val brokerList = List(0, 1, 2, 3, 4)
// test 0 replication factor
- try {
+ intercept[AdminOperationException] {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
- fail("shouldn't allow replication factor 0")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
}
// test wrong replication factor
- try {
+ intercept[AdminOperationException] {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
- fail("shouldn't allow replication factor larger than # of brokers")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
}
// correct assignment
- {
- val expectedAssignment = Map(
+ val expectedAssignment = Map(
0 -> List(0, 1, 2),
1 -> List(1, 2, 3),
2 -> List(2, 3, 4),
@@ -63,65 +55,34 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
6 -> List(1, 3, 4),
7 -> List(2, 4, 0),
8 -> List(3, 0, 1),
- 9 -> List(4, 1, 2)
- )
+ 9 -> List(4, 1, 2))
- val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
- val e = (expectedAssignment.toList == actualAssignment.toList)
- assertTrue(expectedAssignment.toList == actualAssignment.toList)
- }
+ val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
+ val e = (expectedAssignment.toList == actualAssignment.toList)
+ assertTrue(expectedAssignment.toList == actualAssignment.toList)
}
@Test
def testManualReplicaAssignment() {
- val brokerList = Set(0, 1, 2, 3, 4)
-
- // duplicated brokers
- try {
- val replicationAssignmentStr = "0,0,1:1,2,3"
- CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- fail("replication assginment shouldn't have duplicated brokers")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
- }
+ val brokers = List(0, 1, 2, 3, 4)
+ TestUtils.createBrokersInZk(zkClient, brokers)
- // non-exist brokers
- try {
- val replicationAssignmentStr = "0,1,2:1,2,7"
- CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- fail("replication assginment shouldn't contain non-exist brokers")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
+ // duplicate brokers
+ intercept[IllegalArgumentException] {
+ AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,0)))
}
// inconsistent replication factor
- try {
- val replicationAssignmentStr = "0,1,2:1,2"
- CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- fail("all partitions should have the same replication factor")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
+ intercept[IllegalArgumentException] {
+ AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0)))
}
// good assignment
- {
- val replicationAssignmentStr = "0:1:2,1:2:3"
- val expectedReplicationAssignment = Map(
- 0 -> List(0, 1, 2),
- 1 -> List(1, 2, 3)
- )
- val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
- for( (part, replicas) <- expectedReplicationAssignment ) {
- assertEquals(replicas, actualReplicationAssignment(part))
- }
- }
+ val assignment = Map(0 -> List(0, 1, 2),
+ 1 -> List(1, 2, 3))
+ AdminUtils.createTopicWithAssignment(zkClient, "test", assignment)
+ val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test"))
+ assertEquals(assignment, found("test"))
}
@Test
@@ -157,7 +118,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val topic = "test"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
@@ -166,28 +127,23 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
for(i <- 0 until actualReplicaList.size)
assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
- try {
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
- fail("shouldn't be able to create a topic already exists")
- } catch {
- case e: TopicExistsException => // this is good
- case e2 => throw e2
+ intercept[TopicExistsException] {
+ // shouldn't be able to create a topic that already exists
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
}
}
@Test
def testGetTopicMetadata() {
val expectedReplicaAssignment = Map(
0 -> List(0, 1, 2),
- 1 -> List(1, 2, 3)
- )
+ 1 -> List(1, 2, 3))
val leaderForPartitionMap = Map(
0 -> 0,
- 1 -> 1
- )
+ 1 -> 1)
val topic = "auto-topic"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
@@ -215,7 +171,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(0, 2, 3)
val partitionToBeReassigned = 0
@@ -240,7 +196,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(1, 2, 3)
val partitionToBeReassigned = 0
@@ -266,7 +222,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
@@ -307,7 +263,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1))
val topic = "test"
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// put the partition in the reassigned path as well
// reassign partition 0
val newReplicas = Seq(0, 1)
@@ -346,7 +302,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
@@ -367,7 +323,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
@@ -404,6 +360,50 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
servers.foreach(_.shutdown())
}
}
+
+ /**
+ * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
+ * then changes the config and checks that the new values take effect.
+ */
+ @Test
+ def testTopicConfigChange() {
+ val partitions = 3
+ val topic = "my-topic"
+ val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+
+ def makeConfig(messageSize: Int, retentionMs: Long) = {
+ var props = new Properties()
+ props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
+ props.setProperty(LogConfig.RententionMsProp, retentionMs.toString)
+ props
+ }
+
+ def checkConfig(messageSize: Int, retentionMs: Long) {
+ TestUtils.retry(10000) {
+ for(part <- 0 until partitions) {
+ val logOpt = server.logManager.getLog(TopicAndPartition(topic, part))
+ assertTrue(logOpt.isDefined)
+ assertEquals(retentionMs, logOpt.get.config.retentionMs)
+ assertEquals(messageSize, logOpt.get.config.maxMessageSize)
+ }
+ }
+ }
+
+ try {
+ // create a topic with a few config overrides and check that they are applied
+ val maxMessageSize = 1024
+ val retentionMs = 1000*1000
+ AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
+ checkConfig(maxMessageSize, retentionMs)
+
+ // now double the config values for the topic and check that it is applied
+ AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
+ checkConfig(2*maxMessageSize, 2 * retentionMs)
+ } finally {
+ server.shutdown()
+ server.config.logDirs.map(Utils.rm(_))
+ }
+ }
private def checkIfReassignPartitionPathExists(): Boolean = {
ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
View
5 core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -20,14 +20,15 @@ package kafka.consumer
import java.util.concurrent._
import java.util.concurrent.atomic._
+import java.util.Properties
import scala.collection._
import junit.framework.Assert._
import kafka.message._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils._
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import org.junit.Test
import kafka.serializer._
import kafka.cluster.{Broker, Cluster}
@@ -60,7 +61,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
override def setUp() {
super.setUp
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
}
View
4 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -26,7 +26,7 @@ import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.serializer._
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import org.I0Itec.zkclient.ZkClient
import kafka.utils._
import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
@@ -298,7 +298,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
// create topic topic1 with 1 partition on broker 0
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
// send some messages to each broker
val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
View
3 core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -32,8 +32,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
val topic = "test_topic"
val group = "default_group"
val testConsumer = "consumer"
- val BrokerPort = 9892
- val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort)))
+ val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
val NumMessages = 10
val LargeOffset = 10000
val SmallOffset = -1
View
4 core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -30,7 +30,7 @@ import kafka.serializer._
import kafka.producer.{KeyedMessage, Producer}
import kafka.utils.TestUtils._
import kafka.utils.TestUtils
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -55,7 +55,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
override def setUp() {
super.setUp
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)))
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
fetcher.stopAllConnections()
View
27 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.AdminUtils
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
import kafka.utils.{TestUtils, Utils}
@@ -42,19 +42,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val configs = List(config)
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
- override def setUp() {
- super.setUp
- // temporarily set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.FATAL)
- }
-
- override def tearDown() {
- // restore set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.ERROR)
-
- super.tearDown
- }
-
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
.clientId("test-client")
@@ -299,7 +286,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testConsumerEmptyTopic() {
val newTopic = "new-topic"
- CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
+ AdminUtils.createTopic(zkClient, newTopic, 1, 1)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
@@ -327,10 +314,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
}
// wait until the messages are published
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test1", 0).get.logEndOffset == 2 }, 1000)
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test2", 0).get.logEndOffset == 2 }, 1000)
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test3", 0).get.logEndOffset == 2 }, 1000)
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test4", 0).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, 1000)
val replicaId = servers.head.config.brokerId
val hwWaitMs = config.replicaHighWatermarkCheckpointIntervalMs
@@ -354,7 +341,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
*/
def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
for( topic <- topics ) {
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
}
}
View
8 core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala