/
EventBus.scala
414 lines (340 loc) · 13.7 KB
/
EventBus.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.event
import akka.actor.{ ActorSystem, ActorRef }
import akka.util.Index
import java.util.concurrent.ConcurrentSkipListSet
import java.util.Comparator
import akka.util.{ Subclassification, SubclassifiedIndex }
import scala.collection.immutable
import java.util.concurrent.atomic.{ AtomicReference }
/**
* Represents the base type for EventBuses
* Internally has an Event type, a Classifier type and a Subscriber type
*
* For the Java API, see akka.event.japi.*
*/
trait EventBus {
type Event
type Classifier
type Subscriber
//#event-bus-api
/**
* Attempts to register the subscriber to the specified Classifier
* @return true if successful and false if not (because it was already
* subscribed to that Classifier, or otherwise)
*/
def subscribe(subscriber: Subscriber, to: Classifier): Boolean
/**
* Attempts to deregister the subscriber from the specified Classifier
* @return true if successful and false if not (because it wasn't subscribed
* to that Classifier, or otherwise)
*/
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
/**
* Attempts to deregister the subscriber from all Classifiers it may be subscribed to
*/
def unsubscribe(subscriber: Subscriber): Unit
/**
* Publishes the specified Event to this bus
*/
def publish(event: Event): Unit
//#event-bus-api
}
/**
* Represents an EventBus where the Subscriber type is ActorRef
*/
trait ActorEventBus extends EventBus {
type Subscriber = ActorRef
protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b
}
/**
* Can be mixed into an EventBus to specify that the Classifier type is ActorRef
*/
trait ActorClassifier { this: EventBus ⇒
type Classifier = ActorRef
}
/**
* Can be mixed into an EventBus to specify that the Classifier type is a Function from Event to Boolean (predicate)
*/
trait PredicateClassifier { this: EventBus ⇒
type Classifier = Event ⇒ Boolean
}
/**
* Maps Subscribers to Classifiers using equality on Classifier to store a Set of Subscribers (hence the need for compareSubscribers)
* Maps Events to Classifiers through the classify-method (so it knows who to publish to)
*
* The compareSubscribers need to provide a total ordering of the Subscribers
*/
trait LookupClassification { this: EventBus ⇒
protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
})
/**
* This is a size hint for the number of Classifiers you expect to have (use powers of 2)
*/
protected def mapSize(): Int
/**
* Provides a total ordering of Subscribers (think java.util.Comparator.compare)
*/
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
/**
* Returns the Classifier associated with the given Event
*/
protected def classify(event: Event): Classifier
/**
* Publishes the given Event to the given Subscriber
*/
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)
def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber)
def publish(event: Event): Unit = {
val i = subscribers.valueIterator(classify(event))
while (i.hasNext) publish(event, i.next())
}
}
/**
* Classification which respects relationships between channels: subscribing
* to one channel automatically and idempotently subscribes to all sub-channels.
*/
trait SubchannelClassification { this: EventBus ⇒
/**
* The logic to form sub-class hierarchy
*/
protected implicit def subclassification: Subclassification[Classifier]
// must be lazy to avoid initialization order problem with subclassification
private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()
@volatile
private var cache = Map.empty[Classifier, Set[Subscriber]]
/**
* Returns the Classifier associated with the given Event
*/
protected def classify(event: Event): Classifier
/**
* Publishes the given Event to the given Subscriber
*/
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized {
val diff = subscriptions.addValue(to, subscriber)
addToCache(diff)
diff.nonEmpty
}
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscriptions.synchronized {
val diff = subscriptions.removeValue(from, subscriber)
// removeValue(K, V) does not return the diff to remove from or add to the cache
// but instead the whole set of keys and values that should be updated in the cache
cache ++= diff
diff.nonEmpty
}
def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized {
removeFromCache(subscriptions.removeValue(subscriber))
}
def publish(event: Event): Unit = {
val c = classify(event)
val recv =
if (cache contains c) cache(c) // c will never be removed from cache
else subscriptions.synchronized {
if (cache contains c) cache(c)
else {
addToCache(subscriptions.addKey(c))
cache(c)
}
}
recv foreach (publish(event, _))
}
/**
* INTERNAL API
* Expensive call! Avoid calling directly from event bus subscribe / unsubscribe.
*/
private[akka] def hasSubscriptions(subscriber: Subscriber): Boolean =
// FIXME binary incompatible, but I think it is safe to filter out this problem,
// since it is only called from new functionality in EventStreamUnsubscriber
cache.values exists { _ contains subscriber }
private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) {
case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) diff cs)
}
private def addToCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) {
case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) union cs)
}
}
/**
* Maps Classifiers to Subscribers and selects which Subscriber should receive which publication through scanning through all Subscribers
* through the matches(classifier, event) method
*
* Note: the compareClassifiers and compareSubscribers must together form an absolute ordering (think java.util.Comparator.compare)
*/
trait ScanningClassification { self: EventBus ⇒
protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] {
def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = compareClassifiers(a._1, b._1) match {
case 0 ⇒ compareSubscribers(a._2, b._2)
case other ⇒ other
}
})
/**
* Provides a total ordering of Classifiers (think java.util.Comparator.compare)
*/
protected def compareClassifiers(a: Classifier, b: Classifier): Int
/**
* Provides a total ordering of Subscribers (think java.util.Comparator.compare)
*/
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
/**
* Returns whether the specified Classifier matches the specified Event
*/
protected def matches(classifier: Classifier, event: Event): Boolean
/**
* Publishes the specified Event to the specified Subscriber
*/
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber))
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber))
def unsubscribe(subscriber: Subscriber): Unit = {
val i = subscribers.iterator()
while (i.hasNext) {
val e = i.next()
if (compareSubscribers(subscriber, e._2) == 0) i.remove()
}
}
def publish(event: Event): Unit = {
val currentSubscribers = subscribers.iterator()
while (currentSubscribers.hasNext) {
val (classifier, subscriber) = currentSubscribers.next()
if (matches(classifier, event))
publish(event, subscriber)
}
}
}
/**
* Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs.
*
* All subscribers will be watched by an `akka.event.ActorClassificationUnsubscriber` and unsubscribed when they terminate.
* The unsubscriber actor will not be stopped automatically, and if you want to stop using the bus you should stop it yourself.
*/
trait ManagedActorClassification { this: ActorEventBus with ActorClassifier ⇒
import scala.annotation.tailrec
protected def system: ActorSystem
private class ManagedActorClassificationMappings(val seqNr: Int, val backing: Map[ActorRef, immutable.TreeSet[ActorRef]]) {
def get(monitored: ActorRef): immutable.TreeSet[ActorRef] = backing.getOrElse(monitored, empty)
def add(monitored: ActorRef, monitor: ActorRef) = {
val watchers = backing.get(monitored).getOrElse(empty) + monitor
new ManagedActorClassificationMappings(seqNr + 1, backing.updated(monitored, watchers))
}
def remove(monitored: ActorRef, monitor: ActorRef) = {
val monitors = backing.get(monitored).getOrElse(empty) - monitor
new ManagedActorClassificationMappings(seqNr + 1, backing.updated(monitored, monitors))
}
def remove(monitored: ActorRef) = {
val v = backing - monitored
new ManagedActorClassificationMappings(seqNr + 1, v)
}
}
private val mappings = new AtomicReference[ManagedActorClassificationMappings](
new ManagedActorClassificationMappings(0, Map.empty[ActorRef, immutable.TreeSet[ActorRef]]))
private val empty = immutable.TreeSet.empty[ActorRef]
/** The unsubscriber takes care of unsubscribing actors, which have terminated. */
protected lazy val unsubscriber = ActorClassificationUnsubscriber.start(system, this)
@tailrec
protected final def associate(monitored: ActorRef, monitor: ActorRef): Boolean = {
val current = mappings.get
current.backing.get(monitored) match {
case None ⇒
val added = current.add(monitored, monitor)
if (mappings.compareAndSet(current, added)) registerWithUnsubscriber(monitor, added.seqNr)
else associate(monitored, monitor)
case Some(monitors) ⇒
if (monitors.contains(monitored)) false
else {
val added = current.add(monitored, monitor)
val noChange = current.backing == added.backing
if (noChange) false
else if (mappings.compareAndSet(current, added)) registerWithUnsubscriber(monitor, added.seqNr)
else associate(monitored, monitor)
}
}
}
protected final def dissociate(actor: ActorRef): Unit = {
@tailrec
def dissociateAsMonitored(monitored: ActorRef): Unit = {
val current = mappings.get
if (current.backing.contains(monitored)) {
val removed = current.remove(monitored)
if (!mappings.compareAndSet(current, removed))
dissociateAsMonitored(monitored)
}
}
def dissociateAsMonitor(monitor: ActorRef): Unit = {
val current = mappings.get
val i = current.backing.iterator
while (i.hasNext) {
val (key, value) = i.next()
value match {
case null ⇒
// do nothing
case monitors ⇒
if (monitors.contains(monitor))
dissociate(key, monitor)
}
}
}
try { dissociateAsMonitored(actor) } finally { dissociateAsMonitor(actor) }
}
@tailrec
protected final def dissociate(monitored: ActorRef, monitor: ActorRef): Boolean = {
val current = mappings.get
current.backing.get(monitored) match {
case None ⇒ false
case Some(monitors) ⇒
val removed = current.remove(monitored, monitor)
val removedMonitors = removed.get(monitored)
if (monitors.isEmpty || monitors == removedMonitors) {
false
} else {
if (mappings.compareAndSet(current, removed)) unregisterFromUnsubscriber(monitor, removed.seqNr)
else dissociate(monitored, monitor)
}
}
}
/**
* Returns the Classifier associated with the specified Event
*/
protected def classify(event: Event): Classifier
/**
* This is a size hint for the number of Classifiers you expect to have (use powers of 2)
*/
protected def mapSize: Int
def publish(event: Event): Unit = {
mappings.get.backing.get(classify(event)) match {
case None ⇒ ()
case Some(refs) ⇒ refs.foreach { _ ! event }
}
}
def subscribe(subscriber: Subscriber, to: Classifier): Boolean =
if (subscriber eq null) throw new IllegalArgumentException("Subscriber is null")
else if (to eq null) throw new IllegalArgumentException("Classifier is null")
else associate(to, subscriber)
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean =
if (subscriber eq null) throw new IllegalArgumentException("Subscriber is null")
else if (from eq null) throw new IllegalArgumentException("Classifier is null")
else dissociate(from, subscriber)
def unsubscribe(subscriber: Subscriber): Unit =
if (subscriber eq null) throw new IllegalArgumentException("Subscriber is null")
else dissociate(subscriber)
/**
* INTERNAL API
*/
private[akka] def registerWithUnsubscriber(subscriber: ActorRef, seqNr: Int): Boolean = {
unsubscriber ! ActorClassificationUnsubscriber.Register(subscriber, seqNr)
true
}
/**
* INTERNAL API
*/
private[akka] def unregisterFromUnsubscriber(subscriber: ActorRef, seqNr: Int): Boolean = {
unsubscriber ! ActorClassificationUnsubscriber.Unregister(subscriber, seqNr)
true
}
}