forked from apache/kafka
-
Notifications
You must be signed in to change notification settings - Fork 1
/
PartitionStateMachine.scala
executable file
·532 lines (497 loc) · 28.6 KB
/
PartitionStateMachine.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import collection._
import collection.JavaConversions
import collection.mutable.Buffer
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.utils.{Logging, ReplicationUtils}
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.utils.CoreUtils._
/**
* This class represents the state machine for partitions. It defines the states that a partition can be in, and
* transitions to move the partition to another legal state. The different states that a partition can be in are -
* 1. NonExistentPartition: This state indicates that the partition was either never created or was created and then
* deleted. Valid previous state, if one exists, is OfflinePartition
* 2. NewPartition : After creation, the partition is in the NewPartition state. In this state, the partition should have
* replicas assigned to it, but no leader/isr yet. Valid previous states are NonExistentPartition
* 3. OnlinePartition : Once a leader is elected for a partition, it is in the OnlinePartition state.
* Valid previous states are NewPartition/OfflinePartition
* 4. OfflinePartition : If, after successful leader election, the leader for partition dies, then the partition
* moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
*/
class PartitionStateMachine(controller: KafkaController) extends Logging {
private val controllerContext = controller.controllerContext
private val controllerId = controller.config.brokerId
private val zkUtils = controllerContext.zkUtils
private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false)
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
private val topicChangeListener = new TopicChangeListener()
private val deleteTopicsListener = new DeleteTopicsListener()
private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
private val stateChangeLogger = KafkaController.stateChangeLogger
this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
/**
* Invoked on successful controller election. First registers a topic change listener since that triggers all
* state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
* the OnlinePartition state change for all new or offline partitions.
*/
def startup() {
// initialize partition state
initializePartitionState()
// set started flag
hasStarted.set(true)
// try to move partitions to online state
triggerOnlinePartitionStateChange()
info("Started partition state machine with initial state -> " + partitionState.toString())
}
// register topic and partition change listeners
def registerListeners() {
registerTopicChangeListener()
if(controller.config.deleteTopicEnable)
registerDeleteTopicListener()
}
// de-register topic and partition change listeners
def deregisterListeners() {
deregisterTopicChangeListener()
partitionModificationsListeners.foreach {
case (topic, listener) =>
zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
}
partitionModificationsListeners.clear()
if(controller.config.deleteTopicEnable)
deregisterDeleteTopicListener()
}
/**
* Invoked on controller shutdown.
*/
def shutdown() {
// reset started flag
hasStarted.set(false)
// clear partition state
partitionState.clear()
// de-register all ZK listeners
deregisterListeners()
info("Stopped partition state machine")
}
/**
* This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
* state. This is called on a successful controller election and on broker changes
*/
def triggerOnlinePartitionStateChange() {
try {
brokerRequestBatch.newBatch()
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
// that belong to topics to be deleted
for((topicAndPartition, partitionState) <- partitionState
if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
} catch {
case e: Throwable => error("Error while moving some partitions to the online state", e)
// TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
}
}
def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
partitionState.filter(p => p._2 == state).keySet
}
/**
* This API is invoked by the partition change zookeeper listener
* @param partitions The list of partitions that need to be transitioned to the target state
* @param targetState The state that the partitions should be moved to
*/
def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) {
info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
try {
brokerRequestBatch.newBatch()
partitions.foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
} catch {
case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
// TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
}
}
/**
* This API exercises the partition's state machine. It ensures that every state transition happens from a legal
* previous state to the target state. Valid state transitions are:
* NonExistentPartition -> NewPartition:
* --load assigned replicas from ZK to controller cache
*
* NewPartition -> OnlinePartition
* --assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition
* --send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker
*
* OnlinePartition,OfflinePartition -> OnlinePartition
* --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
* --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
*
* NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition
* --nothing other than marking partition state as Offline
*
* OfflinePartition -> NonExistentPartition
* --nothing other than marking the partition state as NonExistentPartition
* @param topic The topic of the partition for which the state transition is invoked
* @param partition The partition for which the state transition is invoked
* @param targetState The end state that the partition should be moved to
*/
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
if (!hasStarted.get)
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
"the partition state machine has not started")
.format(controllerId, controller.epoch, topicAndPartition, targetState))
val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
try {
targetState match {
case NewPartition =>
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
partitionState.put(topicAndPartition, NewPartition)
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
assignedReplicas))
// post: partition has been assigned replicas
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
partitionState(topicAndPartition) match {
case NewPartition =>
// initialize leader and isr path for new partition
initializeLeaderAndIsrForPartition(topicAndPartition)
case OfflinePartition =>
electLeaderForPartition(topic, partition, leaderSelector)
case OnlinePartition => // invoked when the leader needs to be re-elected
electLeaderForPartition(topic, partition, leaderSelector)
case _ => // should never come here since illegal previous states are checked above
}
partitionState.put(topicAndPartition, OnlinePartition)
val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
// post: partition has a leader
case OfflinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, OfflinePartition)
// post: partition has no alive leader
case NonExistentPartition =>
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, NonExistentPartition)
// post: partition state is deleted from all brokers and zookeeper
}
} catch {
case t: Throwable =>
stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
}
}
/**
* Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
* zookeeper
*/
private def initializePartitionState() {
for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
// check if leader and isr path exists for partition. If not, then it is in NEW state
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
// else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match {
case true => // leader is alive
partitionState.put(topicPartition, OnlinePartition)
case false =>
partitionState.put(topicPartition, OfflinePartition)
}
case None =>
partitionState.put(topicPartition, NewPartition)
}
}
}
private def assertValidPreviousStates(topicAndPartition: TopicAndPartition, fromStates: Seq[PartitionState],
targetState: PartitionState) {
if(!fromStates.contains(partitionState(topicAndPartition)))
throw new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
.format(topicAndPartition, fromStates.mkString(","), targetState) + ". Instead it is in %s state"
.format(partitionState(topicAndPartition)))
}
/**
* Invoked on the NewPartition->OnlinePartition state change. When a partition is in the New state, it does not have
* a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, its leader and isr
* path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the
* OfflinePartition state.
* @param topicAndPartition The topic/partition whose leader and isr path is to be initialized
*/
private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
liveAssignedReplicas.size match {
case 0 =>
val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
"live brokers are [%s]. No assigned replica is alive.")
.format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg)
case _ =>
debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
// make the first replica in the list of assigned replicas, the leader
val leader = liveAssignedReplicas.head
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
controller.epoch)
debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
try {
zkUtils.createPersistentPath(
getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
// NOTE: the above write can fail only if the current controller lost its zk session and the new controller
// took over and initialized this partition. This can happen if the current controller went into a long
// GC pause
controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
} catch {
case e: ZkNodeExistsException =>
// read the controller epoch
val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
topicAndPartition.partition).get
val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
"exists with value %s and controller epoch %d")
.format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg)
}
}
}
/**
* Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state change.
* It invokes the leader election API to elect a leader for the input offline partition
* @param topic The topic of the offline partition
* @param partition The offline partition
* @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.)
*/
def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
// handle leader election for the partitions whose leader is no longer alive
stateChangeLogger.trace("Controller %d epoch %d started leader election for partition %s"
.format(controllerId, controller.epoch, topicAndPartition))
try {
var zookeeperPathUpdateSucceeded: Boolean = false
var newLeaderAndIsr: LeaderAndIsr = null
var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
while(!zookeeperPathUpdateSucceeded) {
val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
if (controllerEpoch > controller.epoch) {
val failMsg = ("aborted leader election for partition [%s,%d] since the LeaderAndIsr path was " +
"already written by another controller. This probably means that the current controller %d went through " +
"a soft failure and another controller was elected with epoch %d.")
.format(topic, partition, controllerId, controllerEpoch)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg)
}
// elect new leader or throw exception
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
newLeaderAndIsr = leaderAndIsr
newLeaderAndIsr.zkVersion = newVersion
zookeeperPathUpdateSucceeded = updateSucceeded
replicasForThisPartition = replicas
}
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
// update the leader cache
controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
.format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
// store new leader and isr info in cache
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, replicas)
} catch {
case lenne: LeaderElectionNotNeededException => // swallow
case nroe: NoReplicaOnlineException => throw nroe
case sce: Throwable =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg, sce)
}
debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
}
private def registerTopicChangeListener() = {
zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
}
private def deregisterTopicChangeListener() = {
zkUtils.zkClient.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
}
def registerPartitionChangeListener(topic: String) = {
partitionModificationsListeners.put(topic, new PartitionModificationsListener(topic))
zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
}
def deregisterPartitionChangeListener(topic: String) = {
zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
partitionModificationsListeners.remove(topic)
}
private def registerDeleteTopicListener() = {
zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
private def deregisterDeleteTopicListener() = {
zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
val topicAndPartition = TopicAndPartition(topic, partition)
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) match {
case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
case None =>
val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state"
.format(topicAndPartition, partitionState(topicAndPartition))
throw new StateChangeFailedException(failMsg)
}
}
/**
* This is the zookeeper listener that triggers all the state transitions for a partition
*/
class TopicChangeListener extends IZkChildListener with Logging {
this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
try {
val currentChildren = {
import JavaConversions._
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
(children: Buffer[String]).toSet
}
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
deletedTopics, addedPartitionReplicaAssignment))
if(newTopics.size > 0)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
} catch {
case e: Throwable => error("Error while handling new topic", e )
}
}
}
}
}
/**
* Delete topics includes the following operations -
* 1. Add the topic to be deleted to the delete topics cache, only if the topic exists
* 2. If there are topics to be deleted, it signals the delete topic thread
*/
class DeleteTopicsListener() extends IZkChildListener with Logging {
this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
val zkUtils = controllerContext.zkUtils
/**
* Invoked when a topic is being deleted
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[String]).toSet
}
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
if(nonExistentTopics.size > 0) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
}
}
/**
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}
class PartitionModificationsListener(topic: String) extends IZkDataListener with Logging {
this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
@throws(classOf[Exception])
def handleDataChange(dataPath : String, data: Object) {
inLock(controllerContext.controllerLock) {
try {
info(s"Partition modification triggered $data for path $dataPath")
val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
error("Skipping adding partitions %s for topic %s since it is currently being deleted"
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
else {
if (partitionsToBeAdded.size > 0) {
info("New partitions to be added %s".format(partitionsToBeAdded))
controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
}
}
} catch {
case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(parentPath : String) {
// this is not implemented for partition change
}
}
}
sealed trait PartitionState { def state: Byte }
case object NewPartition extends PartitionState { val state: Byte = 0 }
case object OnlinePartition extends PartitionState { val state: Byte = 1 }
case object OfflinePartition extends PartitionState { val state: Byte = 2 }
case object NonExistentPartition extends PartitionState { val state: Byte = 3 }