From e16d4fa8db4307e0e54f51f81cfc4c348163f9b3 Mon Sep 17 00:00:00 2001 From: jmlvanre Date: Wed, 22 Jan 2014 18:13:05 -0800 Subject: [PATCH] Adding a rack-id to kafka config. This rack-id can be used during replica assignment by using the max-rack-replication argument in the admin scripts (create topic, etc.). By default the original replication assignment algorithm is used because max-rack-replication defaults to -1. max-rack-replication > -1 is not honored if you are doing manual replica assignment (preffered). --- .../kafka/admin/AddPartitionsCommand.scala | 12 +++- .../main/scala/kafka/admin/AdminUtils.scala | 59 +++++++++++++++---- .../kafka/admin/CreateTopicCommand.scala | 12 +++- .../admin/ReassignPartitionsCommand.scala | 11 +++- .../main/scala/kafka/client/ClientUtils.scala | 5 +- .../src/main/scala/kafka/cluster/Broker.scala | 19 +++--- .../main/scala/kafka/server/KafkaConfig.scala | 3 + .../scala/kafka/server/KafkaZooKeeper.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 6 +- .../other/kafka/TestLogPerformance.scala | 2 +- .../unit/kafka/admin/AddPartitionsTest.scala | 15 +++-- .../scala/unit/kafka/admin/AdminTest.scala | 9 ++- .../RequestResponseSerializationTest.scala | 2 +- .../kafka/consumer/ConsumerIteratorTest.scala | 2 +- .../integration/AutoOffsetResetTest.scala | 2 +- .../unit/kafka/integration/FetcherTest.scala | 2 +- .../integration/LazyInitProducerTest.scala | 2 +- .../kafka/integration/PrimitiveApiTest.scala | 2 +- .../kafka/integration/RollingBounceTest.scala | 13 ++-- .../kafka/integration/TopicMetadataTest.scala | 2 +- .../scala/unit/kafka/log/LogManagerTest.scala | 8 +-- .../scala/unit/kafka/log/LogOffsetTest.scala | 5 +- .../test/scala/unit/kafka/log/LogTest.scala | 2 +- .../kafka/log4j/KafkaLog4jAppenderTest.scala | 3 +- .../kafka/producer/AsyncProducerTest.scala | 14 ++--- .../unit/kafka/producer/ProducerTest.scala | 10 ++-- .../kafka/server/LeaderElectionTest.scala | 12 ++-- .../kafka/server/ServerShutdownTest.scala | 2 +- .../scala/unit/kafka/utils/TestUtils.scala | 17 +++--- 29 files changed, 168 insertions(+), 87 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala index 7f037081304c5..b32f59b010d74 100644 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -46,6 +46,11 @@ object AddPartitionsCommand extends Logging { "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...") .ofType(classOf[String]) .defaultsTo("") + val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack") + .withRequiredArg + .describedAs("max # of replicas per rack") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) val options = parser.parse(args : _*) @@ -62,10 +67,11 @@ object AddPartitionsCommand extends Logging { val zkConnect = options.valueOf(zkConnectOpt) val nPartitions = options.valueOf(nPartitionsOpt).intValue val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) + val rackReplication = options.valueOf(rackReplicationOpt).intValue var zkClient: ZkClient = null try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) + addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, rackReplication) println("adding partitions succeeded!") } catch { case e: Throwable => @@ -77,7 +83,7 @@ object AddPartitionsCommand extends Logging { } } - def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") { + def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", rackReplication: Int = -1) { val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) if (existingPartitionsReplicaList.size == 0) throw new AdministrationException("The topic %s does not exist".format(topic)) @@ -87,7 +93,7 @@ object AddPartitionsCommand extends Logging { // create the new partition replication list val brokerList = ZkUtils.getSortedBrokerList(zkClient) val newPartitionReplicaList = if (replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size) + AdminUtils.assignReplicasToBrokers(zkClient, brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size, rackReplication) else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index d6ab275d6ed03..a8983457d8b52 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -47,8 +47,8 @@ object AdminUtils extends Logging { * p3 p4 p0 p1 p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica) */ - def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int, - fixedStartIndex: Int = -1, startPartitionId: Int = -1) + def assignReplicasToBrokers(zkClient: ZkClient, brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int, + fixedStartIndex: Int = -1, startPartitionId: Int = -1, maxReplicaPerRack: Int = -1) : Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new AdministrationException("number of partitions must be larger than 0") @@ -62,15 +62,52 @@ object AdminUtils extends Logging { var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0 var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) - for (i <- 0 until nPartitions) { - if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) - nextReplicaShift += 1 - val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size - var replicaList = List(brokerList(firstReplicaIndex)) - for (j <- 0 until replicationFactor - 1) - replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size)) - ret.put(currentPartitionId, replicaList.reverse) - currentPartitionId = currentPartitionId + 1 + if (maxReplicaPerRack <= 0) { + for (i <- 0 until nPartitions) { + if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) + nextReplicaShift += 1 + val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size + var replicaList = List(brokerList(firstReplicaIndex)) + for (j <- 0 until replicationFactor - 1) + replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size)) + ret.put(currentPartitionId, replicaList.reverse) + currentPartitionId = currentPartitionId + 1 + } + } else { + val brokerToRackMap: Map[Int, Int] = brokerList.map(brokerId => (brokerId -> (ZkUtils.getBrokerInfo(zkClient, brokerId) match { + case Some(broker) => broker.rack + case None => throw new AdministrationException("broker " + brokerId + " must have rack id") + }) )).toMap + for (i <- 0 until nPartitions) { + if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) + nextReplicaShift += 1 + val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size + var replicaList = List(brokerList(firstReplicaIndex)) + var rackReplicaCount: mutable.Map[Int, Int] = mutable.Map(brokerToRackMap(brokerList(firstReplicaIndex)) -> 1) + var k = 0 + for (j <- 0 until replicationFactor - 1) { + var done = false; + while (!done && k < brokerList.size) { + val broker = brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, k, brokerList.size)) + val rack = brokerToRackMap(broker) + if (!(rackReplicaCount contains rack)) { + replicaList ::= broker + rackReplicaCount += (rack -> 1) + done = true; + } else if (rackReplicaCount(rack) < maxReplicaPerRack) { + rackReplicaCount(rack) = rackReplicaCount(rack) + 1 + replicaList ::= broker + done = true; + } + k = k + 1 + } + if (!done) { + throw new AdministrationException("not enough brokers available in unique racks to meet maxReplicaPerRack limit of " + maxReplicaPerRack) + } + } + ret.put(currentPartitionId, replicaList.reverse) + currentPartitionId = currentPartitionId + 1 + } } ret.toMap } diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala index 84c20950fbf39..ef5229b3275f8 100644 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala @@ -52,6 +52,11 @@ object CreateTopicCommand extends Logging { "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...") .ofType(classOf[String]) .defaultsTo("") + val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack") + .withRequiredArg + .describedAs("max # of replicas per rack") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) val options = parser.parse(args : _*) @@ -68,10 +73,11 @@ object CreateTopicCommand extends Logging { val nPartitions = options.valueOf(nPartitionsOpt).intValue val replicationFactor = options.valueOf(replicationFactorOpt).intValue val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) + val rackReplication = options.valueOf(rackReplicationOpt).intValue var zkClient: ZkClient = null try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) + createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr, rackReplication) println("creation succeeded!") } catch { case e: Throwable => @@ -83,13 +89,13 @@ object CreateTopicCommand extends Logging { } } - def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") { + def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "", rackReplication: Int = -1) { Topic.validate(topic) val brokerList = ZkUtils.getSortedBrokerList(zkClient) val partitionReplicaAssignment = if (replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor) + AdminUtils.assignReplicasToBrokers(zkClient, brokerList, numPartitions, replicationFactor, -1, -1, rackReplication) else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment)) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2f706c94d340f..d30faf843745e 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -64,6 +64,12 @@ object ReassignPartitionsCommand extends Logging { .describedAs("partition reassignment json file path") .ofType(classOf[String]) + val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack") + .withRequiredArg + .describedAs("max # of replicas per rack") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) + val options = parser.parse(args : _*) for(arg <- List(zkConnectOpt)) { @@ -110,11 +116,12 @@ object ReassignPartitionsCommand extends Logging { val brokerListToReassign = brokerList.split(',') map (_.toInt) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) + val rackReplication = options.valueOf(rackReplicationOpt).intValue val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) groupedByTopic.foreach { topicInfo => - val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, - topicInfo._2.head._2.size) + val assignedReplicas = AdminUtils.assignReplicasToBrokers(zkClient, brokerListToReassign, topicInfo._2.size, + topicInfo._2.head._2.size, -1, -1, rackReplication) partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 1d2f81be4f980..17228a636bf0d 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -89,7 +89,7 @@ object ClientUtils extends Logging{ } /** - * Parse a list of broker urls in the form host1:port1, host2:port2, ... + * Parse a list of broker urls in the form host1:port1:rack1, host2:port2:rack2, ... */ def parseBrokerList(brokerListStr: String): Seq[Broker] = { val brokersStr = Utils.parseCsvList(brokerListStr) @@ -100,7 +100,8 @@ object ClientUtils extends Logging{ val brokerInfos = brokerStr.split(":") val hostName = brokerInfos(0) val port = brokerInfos(1).toInt - new Broker(brokerId, hostName, port) + val rack = brokerInfos(2).toInt + new Broker(brokerId, hostName, port, rack) }) } diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 9407ed21fbbd5..7c2d4a208e44a 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -37,7 +37,8 @@ private[kafka] object Broker { val brokerInfo = m.asInstanceOf[Map[String, Any]] val host = brokerInfo.get("host").get.asInstanceOf[String] val port = brokerInfo.get("port").get.asInstanceOf[Int] - new Broker(id, host, port) + val rack = brokerInfo.get("rack").get.asInstanceOf[Int] + new Broker(id, host, port, rack) case None => throw new BrokerNotAvailableException("Broker id %d does not exist".format(id)) } @@ -50,32 +51,34 @@ private[kafka] object Broker { val id = buffer.getInt val host = readShortString(buffer) val port = buffer.getInt - new Broker(id, host, port) + val rack = buffer.getInt + new Broker(id, host, port, rack) } } -private[kafka] case class Broker(val id: Int, val host: String, val port: Int) { +private[kafka] case class Broker(val id: Int, val host: String, val port: Int, val rack: Int) { - override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port) + override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port + ",rack:" + rack) - def getConnectionString(): String = host + ":" + port + def getConnectionString(): String = host + ":" + port + ":" + rack def writeTo(buffer: ByteBuffer) { buffer.putInt(id) writeShortString(buffer, host) buffer.putInt(port) + buffer.putInt(rack) } - def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + 4 /* rack id*/ override def equals(obj: Any): Boolean = { obj match { case null => false - case n: Broker => id == n.id && host == n.host && port == n.port + case n: Broker => id == n.id && host == n.host && port == n.port && rack == n.rack case _ => false } } - override def hashCode(): Int = hashcode(id, host, port) + override def hashCode(): Int = hashcode(id, host, port, rack) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 41c962604854a..3290541239105 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -37,6 +37,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the broker id for this server */ val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) + /* the rack id for this server */ + val rackId: Int = props.getIntInRange("broker.rack", (0, Int.MaxValue)) + /* the maximum size of message that the server can receive */ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 553640f43dc7e..f96a548b9f7c4 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -48,7 +48,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { else config.hostName val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.rackId, config.zkSessionTimeoutMs, jmxPort) } /** diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index c21bc60f3ad1c..a443882366326 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -183,14 +183,14 @@ object ZkUtils extends Logging { replicas.contains(brokerId.toString) } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, rack: Int, timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val timestamp = "\"" + SystemTime.milliseconds.toString + "\"" val brokerInfo = Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++ - Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp), + Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "rack" -> rack.toString, "timestamp" -> timestamp), valueInQuotes = false)) - val expectedBroker = new Broker(id, host, port) + val expectedBroker = new Broker(id, host, port, rack) try { createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker, diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala index 9f3bb40728fa6..0443c9775a634 100644 --- a/core/src/test/scala/other/kafka/TestLogPerformance.scala +++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala @@ -30,7 +30,7 @@ object TestLogPerformance { val messageSize = args(1).toInt val batchSize = args(2).toInt val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt) - val props = TestUtils.createBrokerConfig(0, -1) + val props = TestUtils.createBrokerConfig(0, -1, 1) val config = new KafkaConfig(props) val dir = TestUtils.tempDir() val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime) diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 24362890fe446..1ef17448e0fde 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -37,10 +37,15 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { val port3 = TestUtils.choosePort() val port4 = TestUtils.choosePort() - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) - val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) - val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) + val rack1 = 1 + val rack2 = 2 + val rack3 = 3 + val rack4 = 4 + + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, rack1) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, rack2) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, rack3) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, rack4) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var brokers: Seq[Broker] = Seq.empty[Broker] @@ -61,7 +66,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { val server4 = TestUtils.createServer(new KafkaConfig(configProps4)) servers ++= List(server1, server2, server3, server4) - brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port, s.config.rackId)) // create topics with 1 partition, 2 replicas, one on each broker CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1") diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index a480881d7f351..71a228dbf55cb 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -24,7 +24,6 @@ import kafka.server.KafkaConfig import kafka.utils.{Logging, ZkUtils, TestUtils} import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition} - class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test @@ -33,7 +32,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // test 0 replication factor try { - AdminUtils.assignReplicasToBrokers(brokerList, 10, 0) + AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 0) fail("shouldn't allow replication factor 0") } catch { @@ -43,7 +42,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // test wrong replication factor try { - AdminUtils.assignReplicasToBrokers(brokerList, 10, 6) + AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 6) fail("shouldn't allow replication factor larger than # of brokers") } catch { @@ -66,7 +65,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { 9 -> List(4, 1, 2) ) - val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0) + val actualAssignment = AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 3, 0) val e = (expectedAssignment.toList == actualAssignment.toList) assertTrue(expectedAssignment.toList == actualAssignment.toList) } @@ -155,7 +154,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { 11 -> 1 ) val topic = "test" - TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) + TestUtils.createBrokersInZk(zkClient, List((0, 0), (1, 1), (2, 2), (3, 3), (4, 4))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index f43ac8f0f4ff3..6ca5020b7fecd 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -75,7 +75,7 @@ object SerializationTestUtils{ TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) ) - private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013)) + private val brokers = List(new Broker(0, "localhost", 1011, 1), new Broker(1, "localhost", 1012, 2), new Broker(2, "localhost", 1013, 3)) private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0) private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1) private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2) diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 1ee34b95d5fe2..e283d7f9313ed 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -47,7 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val group = "group1" val consumer0 = "consumer0" val consumedOffset = 5 - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port, c.rackId))) val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, 0, diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 2317760ef977a..fc6a380286336 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -33,7 +33,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L val group = "default_group" val testConsumer = "consumer" val BrokerPort = 9892 - val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort))) + val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort, 0))) val NumMessages = 10 val LargeOffset = 10000 val SmallOffset = -1 diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index c5cddeac2da9b..d2eeaaf13033e 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -40,7 +40,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { yield new KafkaConfig(props) val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port, c.rackId))) val shutdown = ZookeeperConsumerConnector.shutdownCommand val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala index c3c7631659fbd..bf801a9acc04d 100644 --- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala +++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala @@ -34,7 +34,7 @@ import org.junit.Assert.assertEquals class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness { val port = TestUtils.choosePort - val props = TestUtils.createBrokerConfig(0, port) + val props = TestUtils.createBrokerConfig(0, port, 0) val config = new KafkaConfig(props) val configs = List(config) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index f764151b2a548..69ce6269f975c 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -37,7 +37,7 @@ import kafka.utils.{TestUtils, Utils} class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness { val port = TestUtils.choosePort - val props = TestUtils.createBrokerConfig(0, port) + val props = TestUtils.createBrokerConfig(0, port, 0) val config = new KafkaConfig(props) val configs = List(config) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index 26e9bd6e4e1f2..a9b826fb3f4c4 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -39,14 +39,19 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { val port3 = TestUtils.choosePort() val port4 = TestUtils.choosePort() + val rackId1 = 0 + val rackId2 = 1 + val rackId3 = 2 + val rackId4 = 3 + val enableShutdown = true - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, rackId1) configProps1.put("controlled.shutdown.enable", "true") - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, rackId2) configProps2.put("controlled.shutdown.enable", "true") - val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, rackId3) configProps3.put("controlled.shutdown.enable", "true") - val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, rackId4) configProps4.put("controlled.shutdown.enable", "true") configProps4.put("controlled.shutdown.retry.backoff.ms", "100") diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index edf855528b27f..4d406ed69729f 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -34,7 +34,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) val configs = props.map(p => new KafkaConfig(p)) private var server1: KafkaServer = null - val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port)) + val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port,c.rackId)) override def setUp() { super.setUp() diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index ce893bfe84293..89fb5177d2db4 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -39,7 +39,7 @@ class LogManagerTest extends JUnit3Suite { override def setUp() { super.setUp() - config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) { + config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1, 0)) { override val logSegmentBytes = 1024 override val logFlushIntervalMessages = 10000 override val logRetentionHours = maxLogAgeHours @@ -111,7 +111,7 @@ class LogManagerTest extends JUnit3Suite { val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes val retentionHours = 1 val retentionMs = 1000 * 60 * 60 * retentionHours - val props = TestUtils.createBrokerConfig(0, -1) + val props = TestUtils.createBrokerConfig(0, -1, 0) logManager.shutdown() config = new KafkaConfig(props) { override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages @@ -155,7 +155,7 @@ class LogManagerTest extends JUnit3Suite { @Test def testTimeBasedFlush() { - val props = TestUtils.createBrokerConfig(0, -1) + val props = TestUtils.createBrokerConfig(0, -1, 0) logManager.shutdown() config = new KafkaConfig(props) { override val logSegmentBytes = 1024 *1024 *1024 @@ -179,7 +179,7 @@ class LogManagerTest extends JUnit3Suite { @Test def testLeastLoadedAssignment() { // create a log manager with multiple data directories - val props = TestUtils.createBrokerConfig(0, -1) + val props = TestUtils.createBrokerConfig(0, -1, 0) val dirs = Seq(TestUtils.tempDir().getAbsolutePath, TestUtils.tempDir().getAbsolutePath, TestUtils.tempDir().getAbsolutePath) diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index 1a9cc0167f95f..79153c5e1ce0a 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -48,7 +48,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { @Before override def setUp() { super.setUp() - val config: Properties = createBrokerConfig(1, brokerPort) + val config: Properties = createBrokerConfig(1, brokerPort, 1) val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime() @@ -196,9 +196,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(Seq(0L), consumerOffsets) } - private def createBrokerConfig(nodeId: Int, port: Int): Properties = { + private def createBrokerConfig(nodeId: Int, port: Int, rack: Int): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) + props.put("broker.rack", rack.toString) props.put("port", port.toString) props.put("log.dir", getLogDir.getAbsolutePath) props.put("log.flush.interval.messages", "1") diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index df906953fb241..d240d7cb9fcf6 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -38,7 +38,7 @@ class LogTest extends JUnitSuite { @Before def setUp() { logDir = TestUtils.tempDir() - val props = TestUtils.createBrokerConfig(0, -1) + val props = TestUtils.createBrokerConfig(0, -1, 0) config = new KafkaConfig(props) } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd042dfd..c703f8e3ff03f 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -43,6 +43,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with val tLogger = Logger.getLogger(getClass()) private val brokerZk = 0 + private val rackZk = 0 private val ports = TestUtils.choosePorts(2) private val portZk = ports(0) @@ -51,7 +52,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with override def setUp() { super.setUp() - val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk) + val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk, rackZk) val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 69c88c73521d5..484a54eaf9385 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -166,8 +166,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - val broker1 = new Broker(0, "localhost", 9092) - val broker2 = new Broker(1, "localhost", 9093) + val broker1 = new Broker(0, "localhost", 9092, 0) + val broker2 = new Broker(1, "localhost", 9093, 1) broker1 // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) @@ -401,7 +401,7 @@ class AsyncProducerTest extends JUnit3Suite { val config = new ProducerConfig(props) val topic1 = "topic1" - val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092) + val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092, 0) val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata] topicPartitionInfos.put("topic1", topic1Metadata) @@ -488,12 +488,12 @@ class AsyncProducerTest extends JUnit3Suite { producerDataList } - private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort) + private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int, rackId: Int = -1): TopicMetadata = { + getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort, rackId) } - private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - val broker1 = new Broker(brokerId, brokerHost, brokerPort) + private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int, rackId: Int): TopicMetadata = { + val broker1 = new Broker(brokerId, brokerHost, brokerPort, rackId) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 2cabfbb761657..4aa9daa44500f 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -40,6 +40,8 @@ import org.junit.Assert.assertEquals class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private val brokerId1 = 0 private val brokerId2 = 1 + private val rackId1 = 0 + private val rackId2 = 1 private val ports = TestUtils.choosePorts(2) private val (port1, port2) = (ports(0), ports(1)) private var server1: KafkaServer = null @@ -49,12 +51,12 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) private var servers = List.empty[KafkaServer] - private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, rackId1) private val config1 = new KafkaConfig(props1) { override val hostName = "localhost" override val numPartitions = 4 } - private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, rackId2) private val config2 = new KafkaConfig(props2) { override val hostName = "localhost" override val numPartitions = 4 @@ -99,7 +101,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) val props1 = new util.Properties() - props1.put("metadata.broker.list", "localhost:80,localhost:81") + props1.put("metadata.broker.list", "localhost:80:0,localhost:81:1") props1.put("serializer.class", "kafka.serializer.StringEncoder") val producerConfig1 = new ProducerConfig(props1) val producer1 = new Producer[String, String](producerConfig1) @@ -114,7 +116,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val props2 = new util.Properties() - props2.put("metadata.broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1))) + props2.put("metadata.broker.list", "localhost:80:0," + TestUtils.getBrokerListStrFromConfigs(Seq( config1))) props2.put("serializer.class", "kafka.serializer.StringEncoder") val producerConfig2= new ProducerConfig(props2) val producer2 = new Producer[String, String](producerConfig2) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 70e4b51a84239..e0cff390bc81c 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -35,8 +35,11 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val port1 = TestUtils.choosePort() val port2 = TestUtils.choosePort() - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) + val rackId1 = 0 + val rackId2 = 1 + + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, rackId1) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, rackId2) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var staleControllerEpochDetected = false @@ -122,8 +125,9 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // start another controller val controllerId = 2 - val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort())) - val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) + val controllerRackId = 2 + val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort(), controllerRackId)) + val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port, s.config.rackId)) val controllerContext = new ControllerContext(zkClient, 6000) controllerContext.liveBrokers = brokers.toSet val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 947e795345104..05b5afc989312 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -32,7 +32,7 @@ import kafka.utils.{TestUtils, Utils} class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val port = TestUtils.choosePort - val props = TestUtils.createBrokerConfig(0, port) + val props = TestUtils.createBrokerConfig(0, port, 0) val config = new KafkaConfig(props) val host = "localhost" diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a06cfff406572..34d37210d6f1f 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -113,19 +113,20 @@ object TestUtils extends Logging { */ def createBrokerConfigs(numConfigs: Int): List[Properties] = { for((port, node) <- choosePorts(numConfigs).zipWithIndex) - yield createBrokerConfig(node, port) + yield createBrokerConfig(node, port, node) } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { - configs.map(c => c.hostName + ":" + c.port).mkString(",") + configs.map(c => c.hostName + ":" + c.port + ":" + c.rackId).mkString(",") } /** * Create a test config for the given node id */ - def createBrokerConfig(nodeId: Int, port: Int): Properties = { + def createBrokerConfig(nodeId: Int, port: Int, rack: Int): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) + props.put("broker.rack", rack.toString) props.put("host.name", "localhost") props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) @@ -344,14 +345,14 @@ object TestUtils extends Logging { } } - def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667)) - brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, 6000, jmxPort = -1)) + def createBrokersInZk(zkClient: ZkClient, ids: Seq[(Int, Int)]): Seq[Broker] = { + val brokers = ids.map(id => new Broker(id._1, "localhost", 6667, id._2)) + brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, b.rack, 6000, jmxPort = -1)) brokers } - def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667)) + def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[(Int, Int)]): Seq[Broker] = { + val brokers = ids.map(id => new Broker(id._1, "localhost", 6667, id._2)) brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b)) brokers }