forked from akka/akka-modules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
PublisherRequestor.scala
64 lines (56 loc) · 1.95 KB
/
PublisherRequestor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package akka.camel
import akka.actor._
/**
* Base class for concrete (un)publish requestors. Subclasses are responsible for requesting
* (un)publication of consumer actors by calling <code>deliverCurrentEvent</code>.
*
* @author Martin Krasser
*/
private[camel] abstract class PublishRequestor extends Actor {
private val events = collection.mutable.Set[ConsumerEvent]()
private var publisher: Option[ActorRef] = None
def receiveActorRegistryEvent: Receive
/**
* Accepts
* <ul>
* <li><code>InitPublishRequestor</code> messages to configure a publisher for this requestor.</li>
* <li><code>ActorRegistryEvent</code> messages to be handled <code>receiveActorRegistryEvent</code>
* implementators</li>.
* </ul>
* Other messages are simply ignored. Calls to <code>deliverCurrentEvent</code> prior to setting a
* publisher are buffered. They will be sent after a publisher has been set.
*/
def receive = {
case InitPublishRequestor(pub) => {
publisher = Some(pub)
deliverBufferedEvents
}
case e: ActorRegistryEvent => receiveActorRegistryEvent(e)
case _ => { /* ignore */ }
}
/**
* Deliver the given <code>event</code> to <code>publisher</code> or buffer the event if
* <code>publisher</code> is not defined yet.
*/
protected def deliverCurrentEvent(event: ConsumerEvent) {
publisher match {
case Some(pub) => pub ! event
case None => events += event
}
}
private def deliverBufferedEvents {
for (event <- events) deliverCurrentEvent(event)
events.clear
}
}
/**
* @author Martin Krasser
*/
private[camel] object PublishRequestor {
def pastActorRegisteredEvents = for (actor <- Actor.registry.local.actors) yield ActorRegistered(actor.address, actor)
}
/**
* Command message to initialize a PublishRequestor to use <code>publisher</code>
* for publishing consumer actors.
*/
private[camel] case class InitPublishRequestor(publisher: ActorRef)