/
AdminUtils.scala
644 lines (596 loc) · 31.3 KB
/
AdminUtils.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.common._
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils._
import kafka.utils.ZkUtils._
import java.util.Random
import java.util.Properties
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException}
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.MetadataResponse
import scala.collection._
import JavaConverters._
import mutable.ListBuffer
import scala.collection.mutable
import collection.Map
import collection.Set
import org.I0Itec.zkclient.exception.ZkNodeExistsException
object AdminUtils extends Logging {
val rand = new Random
val AdminClientId = "__admin_client"
val EntityConfigChangeZnodePrefix = "config_change_"
/**
* There are 3 goals of replica assignment:
*
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
* 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
*
* To achieve this goal for replica assignment without considering racks, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
*
* Here is an example of assigning
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
* p4 p0 p1 p2 p3 (2nd replica)
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*
* To create rack aware assignment, this API will first create a rack alternated broker list. For example,
* from this brokerID -> rack mapping:
*
* 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
*
* The rack alternated list will be:
*
* 0, 3, 1, 5, 4, 2
*
* Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment
* will be:
*
* 0 -> 0,3,1
* 1 -> 3,1,5
* 2 -> 1,5,4
* 3 -> 5,4,2
* 4 -> 4,2,0
* 5 -> 2,0,3
*
* Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start
* shifting the followers. This is to ensure we will not always get the same set of sequences.
* In this case, if there is another partition to assign (partition #6), the assignment will be:
*
* 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
*
* The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated
* broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have
* any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on
* the broker list.
*
* As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that
* each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect
* situation where the number of replicas is the same as the number of racks and each rack has the same number of
* brokers, it guarantees that the replica distribution is even across brokers and racks.
*
* @return a Map from partition id to replica ids
* @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to
* assign each replica to a unique rack.
*
*/
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdminOperationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (replicationFactor > brokerMetadatas.size)
throw new AdminOperationException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
if (brokerMetadatas.forall(_.rack.isEmpty))
assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
startPartitionId)
else {
if (brokerMetadatas.exists(_.rack.isEmpty))
throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
startPartitionId)
}
}
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
private def assignReplicasToBrokersRackAware(nPartitions: Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
id -> rack
}.toMap
val numRacks = brokerRackMap.values.toSet.size
val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
val numBrokers = arrangedBrokerList.size
val ret = mutable.Map[Int, Seq[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
val leader = arrangedBrokerList(firstReplicaIndex)
val replicaBuffer = mutable.ArrayBuffer(leader)
val racksWithReplicas = mutable.Set(brokerRackMap(leader))
val brokersWithReplicas = mutable.Set(leader)
var k = 0
for (_ <- 0 until replicationFactor - 1) {
var done = false
while (!done) {
val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
val rack = brokerRackMap(broker)
// Skip this broker if
// 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
// that do not have any replica, or
// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
}
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
/**
* Given broker and rack information, returns a list of brokers alternated by the rack. Assume
* this is the rack and its brokers:
*
* rack1: 0, 1, 2
* rack2: 3, 4, 5
* rack3: 6, 7, 8
*
* This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
*
* This is essential to make sure that the assignReplicasToBrokers API can use such list and
* assign replicas to brokers in a simple round-robin fashion, while ensuring an even
* distribution of leader and replica counts on each broker and that replicas are
* distributed to all racks.
*/
private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = {
val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) =>
(rack, brokers.toIterator)
}
val racks = brokersIteratorByRack.keys.toArray.sorted
val result = new mutable.ArrayBuffer[Int]
var rackIndex = 0
while (result.size < brokerRackMap.size) {
val rackIterator = brokersIteratorByRack(racks(rackIndex))
if (rackIterator.hasNext)
result += rackIterator.next()
rackIndex = (rackIndex + 1) % racks.length
}
result
}
private[admin] def getInverseMap(brokerRackMap: Map[Int, String]): Map[String, Seq[Int]] = {
brokerRackMap.toSeq.map { case (id, rack) => (rack, id) }
.groupBy { case (rack, _) => rack }
.map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) }
}
/**
* Add partitions to existing topic with optional replica assignment
*
* @param zkUtils Zookeeper utilities
* @param topic Topic for adding partitions to
* @param numPartitions Number of partitions to be set
* @param replicaAssignmentStr Manual replica assignment
* @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
*/
def addPartitions(zkUtils: ZkUtils,
topic: String,
numPartitions: Int = 1,
replicaAssignmentStr: String = "",
checkBrokerAvailable: Boolean = true,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
case None => throw new AdminOperationException("the topic does not have partition with id 0, it should never happen")
case Some(headPartitionReplica) => headPartitionReplica._2
}
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
if (partitionsToAdd <= 0)
throw new AdminOperationException("The number of partitions for a topic can only be increased")
// create the new partition replication list
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
val newPartitionReplicaList =
if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingReplicaListForPartitionZero.head))
AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingReplicaListForPartitionZero.size,
startIndex, existingPartitionsReplicaList.size)
}
else
getManualReplicaAssignment(replicaAssignmentStr, brokerMetadatas.map(_.id).toSet,
existingPartitionsReplicaList.size, checkBrokerAvailable)
// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
if (unmatchedRepFactorList.size != 0)
throw new AdminOperationException("The replication factor in manual replication assignment " +
" is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)
info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList, update = true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
var partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
partitionList = partitionList.takeRight(partitionList.size - partitionId)
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
if (brokerList.size <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
ret.put(partitionId, brokerList.toList)
if (ret(partitionId).size != ret(startPartitionId).size)
throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList)
partitionId = partitionId + 1
}
ret.toMap
}
def deleteTopic(zkUtils: ZkUtils, topic: String) {
try {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
} catch {
case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
"topic %s is already marked for deletion".format(topic))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
zkUtils.getConsumersInGroup(group).nonEmpty
}
/**
* Delete the whole directory of the given consumer group if the group is inactive.
*
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @return whether or not we deleted the consumer group information
*/
def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupDirs(group)
zkUtils.deletePathRecursive(dir.consumerGroupDir)
true
}
else false
}
/**
* Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
* If the consumer group consumes no other topics, delete the whole consumer group directory.
*
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @param topic Topic of the consumer group information we wish to delete
* @return whether or not we deleted the consumer group information for the given topic
*/
def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics == Seq(topic)) {
deleteConsumerGroupInZK(zkUtils, group)
}
else if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupTopicDirs(group, topic)
zkUtils.deletePathRecursive(dir.consumerOwnerDir)
zkUtils.deletePathRecursive(dir.consumerOffsetDir)
true
}
else false
}
/**
* Delete every inactive consumer group's information about the given topic in Zookeeper.
*
* @param zkUtils Zookeeper utilities
* @param topic Topic of the consumer group information we wish to delete
*/
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
}
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.zkClient.exists(getTopicPath(topic))
def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
val allBrokers = zkUtils.getAllBrokersInCluster()
val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
val brokersWithRack = brokers.filter(_.rack.nonEmpty)
if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
" to make replica assignment without rack information.")
}
val brokerMetadatas = rackAwareMode match {
case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
brokers.map(broker => BrokerMetadata(broker.id, None))
case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
}
brokerMetadatas.sortBy(_.id)
}
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
update: Boolean = false) {
// validate arguments
Topic.validate(topic)
require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
val topicPath = getTopicPath(topic)
if (!update) {
if (zkUtils.zkClient.exists(topicPath))
throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
else if (Topic.hasCollisionChars(topic)) {
val allTopics = zkUtils.getAllTopics()
val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t))
if (collidingTopics.nonEmpty) {
throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", ")))
}
}
}
partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
// Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
LogConfig.validate(config)
writeEntityConfig(zkUtils, ConfigType.Topic, topic, config)
}
// create the partition assignment
writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val zkPath = getTopicPath(topic)
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
if (!update) {
info("Topic creation " + jsonPartitionData.toString)
zkUtils.createPersistentPath(zkPath, jsonPartitionData)
} else {
info("Topic update " + jsonPartitionData.toString)
zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
}
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
/**
* Update the config for a client and create a change notification so the change will propagate to other brokers
*
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param clientId: The clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) {
changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs)
}
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
*
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param topic: The topic for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
if(!topicExists(zkUtils, topic))
throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
// remove the topic overrides
LogConfig.validate(configs)
changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
}
private def changeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties) {
// write the new config--may not exist if there were previously no overrides
writeEntityConfig(zkUtils, entityType, entityName, configs)
// create the change notification
val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
zkUtils.zkClient.createPersistentSequential(seqNode, content)
}
def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName)
}
/**
* Write out the topic config to zk, if there is any
*/
private def writeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, config: Properties) {
val configMap: mutable.Map[String, String] = {
import JavaConversions._
config
}
val map = Map("version" -> 1, "config" -> configMap)
zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), Json.encode(map))
}
/**
* Read the entity (topic or client) config (if any) from zk
*/
def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entity: String): Properties = {
val str: String = zkUtils.zkClient.readData(getEntityConfigPath(entityType, entity), true)
val props = new Properties()
if(str != null) {
Json.parseFull(str) match {
case None => // there are no config overrides
case Some(mapAnon: Map[_, _]) =>
val map = mapAnon collect { case (k: String, v: Any) => k -> v }
require(map("version") == 1)
map.get("config") match {
case Some(config: Map[_, _]) =>
for(configTup <- config)
configTup match {
case (k: String, v: String) =>
props.setProperty(k, v)
case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
}
case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
}
case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)"
.format(str, entityType, entity))
}
}
props
}
def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata =
fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[MetadataResponse.TopicMetadata] =
fetchTopicMetadataFromZk(topics, zkUtils, SecurityProtocol.PLAINTEXT)
def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, protocol: SecurityProtocol): Set[MetadataResponse.TopicMetadata] = {
val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo, protocol))
}
private def fetchTopicMetadataFromZk(topic: String,
zkUtils: ZkUtils,
cachedBrokerInfo: mutable.HashMap[Int, Broker],
protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): MetadataResponse.TopicMetadata = {
if(zkUtils.pathExists(getTopicPath(topic))) {
val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
val partitionMetadata = sortedPartitions.map { partitionMap =>
val partition = partitionMap._1
val replicas = partitionMap._2
val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partition)
val leader = zkUtils.getLeaderForPartition(topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
var leaderInfo: Node = Node.noNode()
var replicaInfo: Seq[Node] = Nil
var isrInfo: Seq[Node] = Nil
try {
leaderInfo = leader match {
case Some(l) =>
try {
getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getNode(protocol)
} catch {
case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
}
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
try {
replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getNode(protocol))
isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getNode(protocol))
} catch {
case e: Throwable => throw new ReplicaNotAvailableException(e)
}
if(replicaInfo.size < replicas.size)
throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
if(isrInfo.size < inSyncReplicas.size)
throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
new MetadataResponse.PartitionMetadata(Errors.NONE, partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
} catch {
case e: Throwable =>
debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
}
}
new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.asJava)
} else {
// topic doesn't exist, send appropriate error code
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic), java.util.Collections.emptyList())
}
}
private def getBrokerInfoFromCache(zkUtils: ZkUtils,
cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
brokerIds: Seq[Int]): Seq[Broker] = {
var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
val brokerMetadata = brokerIds.map { id =>
val optionalBrokerInfo = cachedBrokerInfo.get(id)
optionalBrokerInfo match {
case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
case None => // fetch it from zookeeper
zkUtils.getBrokerInfo(id) match {
case Some(brokerInfo) =>
cachedBrokerInfo += (id -> brokerInfo)
Some(brokerInfo)
case None =>
failedBrokerIds += id
None
}
}
}
brokerMetadata.filter(_.isDefined).map(_.get)
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
}