-
Notifications
You must be signed in to change notification settings - Fork 13.6k
/
FinalizedFeatureChangeListener.scala
258 lines (231 loc) · 11.3 KB
/
FinalizedFeatureChangeListener.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
import kafka.utils.{Logging, ShutdownableThread}
import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
import org.apache.kafka.common.internals.FatalExitError
import scala.concurrent.TimeoutException
/**
* Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
* is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
* to the latest features read from ZK. The cache updates are serialized through a single
* notification processor thread.
*
* @param finalizedFeatureCache the finalized feature cache
* @param zkClient the Zookeeper client
*/
class FinalizedFeatureChangeListener(private val finalizedFeatureCache: FinalizedFeatureCache,
private val zkClient: KafkaZkClient) extends Logging {
/**
* Helper class used to update the FinalizedFeatureCache.
*
* @param featureZkNodePath the path to the ZK feature node to be read
* @param maybeNotifyOnce an optional latch that can be used to notify the caller when an
* updateOrThrow() operation is over
*/
private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
/**
* Updates the feature cache in FinalizedFeatureCache with the latest features read from the
* ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
* exception is raised.
*
* NOTE: if a notifier was provided in the constructor, then, this method can be invoked exactly
* once successfully. A subsequent invocation will raise an exception.
*
* @throws IllegalStateException, if a non-empty notifier was provided in the constructor, and
* this method is called again after a successful previous invocation.
* @throws FeatureCacheUpdateException, if there was an error in updating the
* FinalizedFeatureCache.
*/
def updateLatestOrThrow(): Unit = {
maybeNotifyOnce.foreach(notifier => {
if (notifier.getCount != 1) {
throw new IllegalStateException(
"Can not notify after updateLatestOrThrow was called more than once successfully.")
}
})
debug(s"Reading feature ZK node at path: $featureZkNodePath")
val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
// There are 4 cases:
//
// (empty dataBytes, valid version) => The empty dataBytes will fail FeatureZNode deserialization.
// FeatureZNode, when present in ZK, can not have empty contents.
// (non-empty dataBytes, valid version) => This is a valid case, and should pass FeatureZNode deserialization
// if dataBytes contains valid data.
// (empty dataBytes, unknown version) => This is a valid case, and this can happen if the FeatureZNode
// does not exist in ZK.
// (non-empty dataBytes, unknown version) => This case is impossible, since, KafkaZkClient.getDataAndVersion
// API ensures that unknown version is returned only when the
// ZK node is absent. Therefore dataBytes should be empty in such
// a case.
if (version == ZkVersion.UnknownVersion) {
info(s"Feature ZK node at path: $featureZkNodePath does not exist")
finalizedFeatureCache.clear()
} else {
var maybeFeatureZNode: Option[FeatureZNode] = Option.empty
try {
maybeFeatureZNode = Some(FeatureZNode.decode(mayBeFeatureZNodeBytes.get))
} catch {
case e: IllegalArgumentException => {
error(s"Unable to deserialize feature ZK node at path: $featureZkNodePath", e)
finalizedFeatureCache.clear()
}
}
maybeFeatureZNode.foreach(featureZNode => {
featureZNode.status match {
case FeatureZNodeStatus.Disabled => {
info(s"Feature ZK node at path: $featureZkNodePath is in disabled status.")
finalizedFeatureCache.clear()
}
case FeatureZNodeStatus.Enabled => {
finalizedFeatureCache.updateOrThrow(featureZNode.features, version)
}
case _ => throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
}
})
}
maybeNotifyOnce.foreach(notifier => notifier.countDown())
}
/**
* Waits until at least a single updateLatestOrThrow completes successfully. This method returns
* immediately if an updateLatestOrThrow call had already completed successfully.
*
* @param waitTimeMs the timeout for the wait operation
*
* @throws TimeoutException if the wait can not be completed in waitTimeMs
* milli seconds
*/
def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
maybeNotifyOnce.foreach(notifier => {
if (!notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)) {
throw new TimeoutException(
s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
}
})
}
}
/**
* A shutdownable thread to process feature node change notifications that are populated into the
* queue. If any change notification can not be processed successfully (unless it is due to an
* interrupt), the thread treats it as a fatal event and triggers Broker exit.
*
* @param name name of the thread
*/
private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
override def doWork(): Unit = {
try {
queue.take.updateLatestOrThrow()
} catch {
case ie: InterruptedException =>
// While the queue is empty and this thread is blocking on taking an item from the queue,
// a concurrent call to FinalizedFeatureChangeListener.close() could interrupt the thread
// and cause an InterruptedException to be raised from queue.take(). In such a case, it is
// safe to ignore the exception if the thread is being shutdown. We raise the exception
// here again, because, it is ignored by ShutdownableThread if it is shutting down.
throw ie
case cacheUpdateException: FeatureCacheUpdateException =>
error("Failed to process feature ZK node change event. The broker will eventually exit.", cacheUpdateException)
throw new FatalExitError(1)
case e: Exception =>
// do not exit for exceptions unrelated to cache change processing (e.g. ZK session expiration)
warn("Unexpected exception in feature ZK node change event processing; will continue processing.", e)
}
}
}
// Feature ZK node change handler.
object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
override val path: String = FeatureZNode.path
override def handleCreation(): Unit = {
info(s"Feature ZK node created at path: $path")
queue.add(new FeatureCacheUpdater(path))
}
override def handleDataChange(): Unit = {
info(s"Feature ZK node updated at path: $path")
queue.add(new FeatureCacheUpdater(path))
}
override def handleDeletion(): Unit = {
warn(s"Feature ZK node deleted at path: $path")
// This event may happen, rarely (ex: ZK corruption or operational error).
// In such a case, we prefer to just log a warning and treat the case as if the node is absent,
// and populate the FinalizedFeatureCache with empty finalized features.
queue.add(new FeatureCacheUpdater(path))
}
}
object ZkStateChangeHandler extends StateChangeHandler {
val path: String = FeatureZNode.path
override val name: String = path
override def afterInitializingSession(): Unit = {
queue.add(new FeatureCacheUpdater(path))
}
}
private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
/**
* This method initializes the feature ZK node change listener. Optionally, it also ensures to
* update the FinalizedFeatureCache once with the latest contents of the feature ZK node
* (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers
* are conveniently detected before the initOrThrow() method returns to the caller. If feature
* incompatibilities are detected, this method will throw an Exception to the caller, and the Broker
* will exit eventually.
*
* @param waitOnceForCacheUpdateMs # of milli seconds to wait for feature cache to be updated once.
* (should be > 0)
*
* @throws Exception if feature incompatibility check could not be finished in a timely manner
*/
def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
if (waitOnceForCacheUpdateMs <= 0) {
throw new IllegalArgumentException(
s"Expected waitOnceForCacheUpdateMs > 0, but provided: $waitOnceForCacheUpdateMs")
}
thread.start()
zkClient.registerStateChangeHandler(ZkStateChangeHandler)
zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
val ensureCacheUpdateOnce = new FeatureCacheUpdater(
FeatureZNodeChangeHandler.path, Some(new CountDownLatch(1)))
queue.add(ensureCacheUpdateOnce)
try {
ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs)
} catch {
case e: Exception => {
close()
throw e
}
}
}
/**
* Closes the feature ZK node change listener by unregistering the listener from ZK client,
* clearing the queue and shutting down the ChangeNotificationProcessorThread.
*/
def close(): Unit = {
zkClient.unregisterStateChangeHandler(ZkStateChangeHandler.name)
zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path)
queue.clear()
thread.shutdown()
}
// For testing only.
def isListenerInitiated: Boolean = {
thread.isRunning && thread.isAlive
}
// For testing only.
def isListenerDead: Boolean = {
!thread.isRunning && !thread.isAlive
}
}