-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
ActorRef.scala
828 lines (717 loc) · 29.9 KB
/
ActorRef.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import scala.collection.immutable
import akka.dispatch._
import akka.dispatch.sysmsg._
import java.lang.{ IllegalStateException, UnsupportedOperationException }
import akka.serialization.{ JavaSerializer, Serialization }
import akka.event.{ EventStream, Logging, LoggingAdapter, MarkerLoggingAdapter }
import scala.annotation.tailrec
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import scala.util.control.NonFatal
object ActorRef {
/**
* Use this value as an argument to [[ActorRef#tell]] if there is not actor to
* reply to (e.g. when sending from non-actor code).
*/
final val noSender: ActorRef = Actor.noSender
}
/**
* Immutable and serializable handle to an actor, which may or may not reside
* on the local host or inside the same [[akka.actor.ActorSystem]]. An ActorRef
* can be obtained from an [[akka.actor.ActorRefFactory]], an interface which
* is implemented by ActorSystem and [[akka.actor.ActorContext]]. This means
* actors can be created top-level in the ActorSystem or as children of an
* existing actor, but only from within that actor.
*
* ActorRefs can be freely shared among actors by message passing. Message
* passing conversely is their only purpose, as demonstrated in the following
* examples:
*
* Scala:
* {{{
* import akka.pattern.ask
* import scala.concurrent.Await
*
* class ExampleActor extends Actor {
* val other = context.actorOf(Props[OtherActor], "childName") // will be destroyed and re-created upon restart by default
*
* def receive {
* case Request1(msg) => other ! refine(msg) // uses this actor as sender reference, reply goes to us
* case Request2(msg) => other.tell(msg, sender()) // forward sender reference, enabling direct reply
* case Request3(msg) =>
* implicit val timeout = Timeout(5.seconds)
* (other ? msg) pipeTo sender()
* // the ask call will get a future from other's reply
* // when the future is complete, send its value to the original sender
* }
* }
* }}}
*
* Java:
* {{{
* import static akka.pattern.Patterns.ask;
* import static akka.pattern.Patterns.pipe;
*
* public class ExampleActor extends AbstractActor {
* // this child will be destroyed and re-created upon restart by default
* final ActorRef other = getContext().actorOf(Props.create(OtherActor.class), "childName");
* @Override
* public Receive createReceive() {
* return receiveBuilder()
* .match(Request1.class, msg ->
* // uses this actor as sender reference, reply goes to us
* other.tell(msg, getSelf()))
* .match(Request2.class, msg ->
* // forward sender reference, enabling direct reply
* other.tell(msg, getSender()))
* .match(Request3.class, msg ->
* // the ask call will get a future from other's reply
* // when the future is complete, send its value to the original sender
* pipe(ask(other, msg, 5000), context().dispatcher()).to(getSender()))
* .build();
* }
* }
* }}}
*
* ActorRef does not have a method for terminating the actor it points to, use
* [[akka.actor.ActorRefFactory]]`.stop(ref)`, or send a [[akka.actor.PoisonPill]],
* for this purpose.
*
* Two actor references are compared equal when they have the same path and point to
* the same actor incarnation. A reference pointing to a terminated actor doesn't compare
* equal to a reference pointing to another (re-created) actor with the same path.
*
* If you need to keep track of actor references in a collection and do not care
* about the exact actor incarnation you can use the ``ActorPath`` as key because
* the unique id of the actor is not taken into account when comparing actor paths.
*/
abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
scalaRef: InternalActorRef ⇒
/**
* Returns the path for this actor (from this actor up to the root actor).
*/
def path: ActorPath
/**
* Comparison takes path and the unique id of the actor cell into account.
*/
final def compareTo(other: ActorRef) = {
val x = this.path compareTo other.path
if (x == 0) if (this.path.uid < other.path.uid) -1 else if (this.path.uid == other.path.uid) 0 else 1
else x
}
/**
* Sends the specified message to this ActorRef, i.e. fire-and-forget
* semantics, including the sender reference if possible.
*
* Pass [[akka.actor.ActorRef]] `noSender` or `null` as sender if there is nobody to reply to
*/
final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)
/**
* Forwards the message and passes the original sender actor as the sender.
*
* Works, no matter whether originally sent with tell/'!' or ask/'?'.
*/
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender())
/**
* INTERNAL API
* Is the actor shut down?
* The contract is that if this method returns true, then it will never be false again.
* But you cannot rely on that it is alive if it returns false, since this by nature is a racy method.
*/
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
private[akka] def isTerminated: Boolean
final override def hashCode: Int = {
if (path.uid == ActorCell.undefinedUid) path.hashCode
else path.uid
}
/**
* Equals takes path and the unique id of the actor cell into account.
*/
final override def equals(that: Any): Boolean = that match {
case other: ActorRef ⇒ path.uid == other.path.uid && path == other.path
case _ ⇒ false
}
override def toString: String =
if (path.uid == ActorCell.undefinedUid) s"Actor[${path}]"
else s"Actor[${path}#${path.uid}]"
}
/**
* This trait represents the Scala Actor API
* There are implicit conversions in package.scala
* from ActorRef -> ScalaActorRef and back
*/
trait ScalaActorRef { ref: ActorRef ⇒
/**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
* <p/>
*
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
* <p/>
*
* This actor 'sender' reference is then available in the receiving actor in the 'sender()' member variable,
* if invoked from within an Actor. If not then no sender is available.
* <pre>
* actor ! message
* </pre>
* <p/>
*/
def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
}
/**
* All ActorRefs have a scope which describes where they live. Since it is
* often necessary to distinguish between local and non-local references, this
* is the only method provided on the scope.
*/
private[akka] trait ActorRefScope {
def isLocal: Boolean
}
/**
* Refs which are statically known to be local inherit from this Scope
*/
private[akka] trait LocalRef extends ActorRefScope {
final def isLocal = true
}
/**
* RepointableActorRef (and potentially others) may change their locality at
* runtime, meaning that isLocal might not be stable. RepointableActorRef has
* the feature that it starts out “not fully started” (but you can send to it),
* which is why `isStarted` features here; it is not improbable that cluster
* actor refs will have the same behavior.
*/
private[akka] trait RepointableRef extends ActorRefScope {
def isStarted: Boolean
}
/**
* Internal trait for assembling all the functionality needed internally on
* ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
*
* DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA!
*/
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒
/*
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
*/
def start(): Unit
def resume(causedByFailure: Throwable): Unit
def suspend(): Unit
def restart(cause: Throwable): Unit
def stop(): Unit
def sendSystemMessage(message: SystemMessage): Unit
/**
* Get a reference to the actor ref provider which created this ref.
*/
def provider: ActorRefProvider
/**
* Obtain parent of this ref; used by getChild for ".." paths.
*/
def getParent: InternalActorRef
/**
* Obtain ActorRef by possibly traversing the actor tree or looking it up at
* some provider-specific location. This method shall return the end result,
* i.e. not only the next step in the look-up; this will typically involve
* recursive invocation. A path element of ".." signifies the parent, a
* trailing "" element must be disregarded. If the requested path does not
* exist, return Nobody.
*/
def getChild(name: Iterator[String]): InternalActorRef
/**
* Scope: if this ref points to an actor which resides within the same JVM,
* i.e. whose mailbox is directly reachable etc.
*/
def isLocal: Boolean
/**
* INTERNAL API: Returns “true” if the actor is locally known to be terminated, “false” if
* alive or uncertain.
*/
private[akka] def isTerminated: Boolean
}
/**
* Common trait of all actor refs which actually have a Cell, most notably
* LocalActorRef and RepointableActorRef. The former specializes the return
* type of `underlying` so that follow-up calls can use invokevirtual instead
* of invokeinterface.
*/
private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope ⇒
def underlying: Cell
def children: immutable.Iterable[ActorRef]
def getSingleChild(name: String): InternalActorRef
}
/**
* This is an internal look-up failure token, not useful for anything else.
*/
private[akka] case object Nobody extends MinimalActorRef {
override val path: RootActorPath = new RootActorPath(Address("akka", "all-systems"), "/Nobody")
override def provider = throw new UnsupportedOperationException("Nobody does not provide")
private val serialized = new SerializedNobody
@throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = serialized
}
/**
* INTERNAL API
*/
@SerialVersionUID(1L) private[akka] class SerializedNobody extends Serializable {
@throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = Nobody
}
/**
* Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
*
* INTERNAL API
*/
private[akka] class LocalActorRef private[akka] (
_system: ActorSystemImpl,
_props: Props,
_dispatcher: MessageDispatcher,
_mailboxType: MailboxType,
_supervisor: InternalActorRef,
override val path: ActorPath)
extends ActorRefWithCell with LocalRef {
/*
* Safe publication of this class’s fields is guaranteed by mailbox.setActor()
* which is called indirectly from actorCell.init() (if you’re wondering why
* this is at all important, remember that under the JMM final fields are only
* frozen at the _end_ of the constructor, but we are publishing “this” before
* that is reached).
* This means that the result of newActorCell needs to be written to the val
* actorCell before we call init and start, since we can start using "this"
* object from another thread as soon as we run init.
*/
private val actorCell: ActorCell = newActorCell(_system, this, _props, _dispatcher, _supervisor)
actorCell.init(sendSupervise = true, _mailboxType)
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, dispatcher, supervisor)
protected def actorContext: ActorContext = actorCell
/**
* INTERNAL API: Is the actor terminated?
* If this method returns true, it will never return false again, but if it
* returns false, you cannot be sure if it's alive still (race condition)
*/
override private[akka] def isTerminated: Boolean = actorCell.isTerminated
/**
* Starts the actor after initialization.
*/
override def start(): Unit = actorCell.start()
/**
* Suspends the actor so that it will not process messages until resumed. The
* suspend request is processed asynchronously to the caller of this method
* as well as to normal message sends: the only ordering guarantee is that
* message sends done from the same thread after calling this method will not
* be processed until resumed.
*/
override def suspend(): Unit = actorCell.suspend()
/**
* Resumes a suspended actor.
*/
override def resume(causedByFailure: Throwable): Unit = actorCell.resume(causedByFailure)
/**
* Shuts down the actor and its message queue
*/
override def stop(): Unit = actorCell.stop()
override def getParent: InternalActorRef = actorCell.parent
override def provider: ActorRefProvider = actorCell.provider
def children: immutable.Iterable[ActorRef] = actorCell.children
/**
* Method for looking up a single child beneath this actor. Override in order
* to inject “synthetic” actor paths like “/temp”.
* It is racy if called from the outside.
*/
def getSingleChild(name: String): InternalActorRef = actorCell.getSingleChild(name)
override def getChild(names: Iterator[String]): InternalActorRef = {
/*
* The idea is to recursively descend as far as possible with LocalActor
* Refs and hand over to that “foreign” child when we encounter it.
*/
@tailrec
def rec(ref: InternalActorRef, name: Iterator[String]): InternalActorRef =
ref match {
case l: LocalActorRef ⇒
val next = name.next() match {
case ".." ⇒ l.getParent
case "" ⇒ l
case any ⇒ l.getSingleChild(any)
}
if (next == Nobody || name.isEmpty) next else rec(next, name)
case _ ⇒
ref.getChild(name)
}
if (names.isEmpty) this
else rec(this, names)
}
// ========= AKKA PROTECTED FUNCTIONS =========
def underlying: ActorCell = actorCell
override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message)
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)
override def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(this)
}
/**
* Memento pattern for serializing ActorRefs transparently
* INTERNAL API
*/
@SerialVersionUID(1L)
private[akka] final case class SerializedActorRef private (path: String) {
import akka.serialization.JavaSerializer.currentSystem
def this(actorRef: ActorRef) = {
this(Serialization.serializedActorPath(actorRef))
}
@throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = currentSystem.value match {
case null ⇒
throw new IllegalStateException(
"Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'")
case someSystem ⇒
someSystem.provider.resolveActorRef(path)
}
}
/**
* INTERNAL API
*/
private[akka] object SerializedActorRef {
def apply(actorRef: ActorRef): SerializedActorRef = {
new SerializedActorRef(actorRef)
}
}
/**
* Trait for ActorRef implementations where all methods contain default stubs.
*
* INTERNAL API
*/
private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
override def getParent: InternalActorRef = Nobody
override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody
override def start(): Unit = ()
override def suspend(): Unit = ()
override def resume(causedByFailure: Throwable): Unit = ()
override def stop(): Unit = ()
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
override private[akka] def isTerminated = false
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = ()
override def sendSystemMessage(message: SystemMessage): Unit = ()
override def restart(cause: Throwable): Unit = ()
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(this)
}
/** Subscribe to this class to be notified about all DeadLetters (also the suppressed ones). */
sealed trait AllDeadLetters {
def message: Any
def sender: ActorRef
def recipient: ActorRef
}
/**
* When a message is sent to an Actor that is terminated before receiving the message, it will be sent as a DeadLetter
* to the ActorSystem's EventStream.
*
* When this message was sent without a sender [[ActorRef]], `sender` will be `system.deadLetters`.
*/
@SerialVersionUID(1L)
final case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) extends AllDeadLetters {
require(sender ne null, "DeadLetter sender may not be null")
require(recipient ne null, "DeadLetter recipient may not be null")
}
/**
* Use with caution: Messages extending this trait will not be logged by the default dead-letters listener.
* Instead they will be wrapped as [[SuppressedDeadLetter]] and may be subscribed for explicitly.
*/
trait DeadLetterSuppression
/**
* Similar to [[DeadLetter]] with the slight twist of NOT being logged by the default dead letters listener.
* Messages which end up being suppressed dead letters are internal messages for which ending up as dead-letter is both expected and harmless.
*
* It is possible to subscribe to suppressed dead letters on the ActorSystem's EventStream explicitly.
*/
@SerialVersionUID(1L)
final case class SuppressedDeadLetter(message: DeadLetterSuppression, sender: ActorRef, recipient: ActorRef) extends AllDeadLetters {
require(sender ne null, "DeadLetter sender may not be null")
require(recipient ne null, "DeadLetter recipient may not be null")
}
private[akka] object DeadLetterActorRef {
@SerialVersionUID(1L)
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
@throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = JavaSerializer.currentSystem.value.deadLetters
}
val serialized = new SerializedDeadLetterActorRef
}
/**
* This special dead letter reference has a name: it is that which is returned
* by a local look-up which is unsuccessful.
*
* INTERNAL API
*/
private[akka] class EmptyLocalActorRef(
override val provider: ActorRefProvider,
override val path: ActorPath,
val eventStream: EventStream) extends MinimalActorRef {
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
override private[akka] def isTerminated = true
override def sendSystemMessage(message: SystemMessage): Unit = {
if (Mailbox.debug) println(s"ELAR $path having enqueued $message")
specialHandle(message, provider.deadLetters)
}
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match {
case null ⇒ throw InvalidMessageException("Message is null")
case d: DeadLetter ⇒
specialHandle(d.message, d.sender) // do NOT form endless loops, since deadLetters will resend!
case _ if !specialHandle(message, sender) ⇒
eventStream.publish(DeadLetter(message, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
case _ ⇒
}
protected def specialHandle(msg: Any, sender: ActorRef): Boolean = msg match {
case w: Watch ⇒
if (w.watchee == this && w.watcher != this)
w.watcher.sendSystemMessage(
DeathWatchNotification(w.watchee, existenceConfirmed = false, addressTerminated = false))
true
case _: Unwatch ⇒ true // Just ignore
case Identify(messageId) ⇒
sender ! ActorIdentity(messageId, None)
true
case sel: ActorSelectionMessage ⇒
sel.identifyRequest match {
case Some(identify) ⇒
if (!sel.wildcardFanOut) sender ! ActorIdentity(identify.messageId, None)
case None ⇒
eventStream.publish(DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
}
true
case m: DeadLetterSuppression ⇒
eventStream.publish(SuppressedDeadLetter(m, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
true
case _ ⇒ false
}
}
/**
* Internal implementation of the dead letter destination: will publish any
* received message to the eventStream, wrapped as [[akka.actor.DeadLetter]].
*
* INTERNAL API
*/
private[akka] class DeadLetterActorRef(
_provider: ActorRefProvider,
_path: ActorPath,
_eventStream: EventStream) extends EmptyLocalActorRef(_provider, _path, _eventStream) {
override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match {
case null ⇒ throw InvalidMessageException("Message is null")
case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, None)
case d: DeadLetter ⇒ if (!specialHandle(d.message, d.sender)) eventStream.publish(d)
case _ ⇒ if (!specialHandle(message, sender))
eventStream.publish(DeadLetter(message, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
}
override protected def specialHandle(msg: Any, sender: ActorRef): Boolean = msg match {
case w: Watch ⇒
if (w.watchee != this && w.watcher != this)
w.watcher.sendSystemMessage(
DeathWatchNotification(w.watchee, existenceConfirmed = false, addressTerminated = false))
true
case _ ⇒ super.specialHandle(msg, sender)
}
@throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
/**
* Internal implementation detail used for paths like “/temp”
*
* INTERNAL API
*/
private[akka] class VirtualPathContainer(
override val provider: ActorRefProvider,
override val path: ActorPath,
override val getParent: InternalActorRef,
val log: MarkerLoggingAdapter) extends MinimalActorRef {
private val children = new ConcurrentHashMap[String, InternalActorRef]
/**
* In [[ActorSelectionMessage]]s only [[SelectChildName]] elements
* are supported, otherwise messages are sent to [[EmptyLocalActorRef]].
*/
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match {
case sel @ ActorSelectionMessage(msg, elements, wildcardFanOut) ⇒ {
require(elements.nonEmpty)
def emptyRef = new EmptyLocalActorRef(provider, path / sel.elements.map(_.toString),
provider.systemGuardian.underlying.system.eventStream)
elements.head match {
case SelectChildName(name) ⇒
getChild(name) match {
case null ⇒
if (!wildcardFanOut)
emptyRef.tell(msg, sender)
case child ⇒
if (elements.tail.isEmpty) {
child ! msg
} else if (!wildcardFanOut) {
emptyRef.tell(msg, sender)
}
}
case _ ⇒
if (!wildcardFanOut)
emptyRef.tell(msg, sender)
}
}
case _ ⇒ super.!(message)
}
def addChild(name: String, ref: InternalActorRef): Unit = {
children.put(name, ref) match {
case null ⇒ // okay
case old ⇒
// this can happen from RemoteSystemDaemon if a new child is created
// before the old is removed from RemoteSystemDaemon children
log.debug("{} replacing child {} ({} -> {})", path, name, old, ref)
old.stop()
}
}
def removeChild(name: String): Unit =
if (children.remove(name) eq null) log.warning("{} trying to remove non-child {}", path, name)
/**
* Remove a named child if it matches the ref.
*/
protected def removeChild(name: String, ref: ActorRef): Unit = {
val current = getChild(name)
if (current eq null)
log.warning("{} trying to remove non-child {}", path, name)
else if (current == ref)
children.remove(name, current) // remove when same value
}
def getChild(name: String): InternalActorRef = children.get(name)
override def getChild(name: Iterator[String]): InternalActorRef = {
if (name.isEmpty) this
else {
val n = name.next()
if (n.isEmpty) this
else children.get(n) match {
case null ⇒ Nobody
case some ⇒
if (name.isEmpty) some
else some.getChild(name)
}
}
}
def hasChildren: Boolean = !children.isEmpty
def foreachChild(f: ActorRef ⇒ Unit): Unit = {
val iter = children.values.iterator
while (iter.hasNext) f(iter.next)
}
}
/**
* INTERNAL API
*
* This kind of ActorRef passes all received messages to the given function for
* performing a non-blocking side-effect. The intended use is to transform the
* message before sending to the real target actor. Such references can be created
* by calling `ActorCell.addFunctionRef` and must be deregistered when no longer
* needed by calling `ActorCell.removeFunctionRef`. FunctionRefs do not count
* towards the live children of an actor, they do not receive the Terminate command
* and do not prevent the parent from terminating. FunctionRef is properly
* registered for remote lookup and ActorSelection.
*
* When using the watch() feature you must ensure that upon reception of the
* Terminated message the watched actorRef is unwatch()ed.
*/
private[akka] final class FunctionRef(
override val path: ActorPath,
override val provider: ActorRefProvider,
val eventStream: EventStream,
f: (ActorRef, Any) ⇒ Unit) extends MinimalActorRef {
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
f(sender, message)
}
override def sendSystemMessage(message: SystemMessage): Unit = {
message match {
case w: Watch ⇒ addWatcher(w.watchee, w.watcher)
case u: Unwatch ⇒ remWatcher(u.watchee, u.watcher)
case DeathWatchNotification(actorRef, _, _) ⇒
this.!(Terminated(actorRef)(existenceConfirmed = true, addressTerminated = false))(actorRef)
case _ ⇒ //ignore all other messages
}
}
private[this] var watching = ActorCell.emptyActorRefSet
private[this] val _watchedBy = new AtomicReference[Set[ActorRef]](ActorCell.emptyActorRefSet)
override def isTerminated = _watchedBy.get() == null
//noinspection EmptyCheck
protected def sendTerminated(): Unit = {
val watchedBy = _watchedBy.getAndSet(null)
if (watchedBy != null) {
if (watchedBy.nonEmpty) {
watchedBy foreach sendTerminated(ifLocal = false)
watchedBy foreach sendTerminated(ifLocal = true)
}
if (watching.nonEmpty) {
watching foreach unwatchWatched
watching = Set.empty
}
}
}
private def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit =
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal)
watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(this, existenceConfirmed = true, addressTerminated = false))
private def unwatchWatched(watched: ActorRef): Unit =
watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this))
override def stop(): Unit = sendTerminated()
@tailrec private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit =
_watchedBy.get() match {
case null ⇒
sendTerminated(ifLocal = true)(watcher)
sendTerminated(ifLocal = false)(watcher)
case watchedBy ⇒
val watcheeSelf = watchee == this
val watcherSelf = watcher == this
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher))
if (!_watchedBy.compareAndSet(watchedBy, watchedBy + watcher))
addWatcher(watchee, watcher) // try again
} else if (!watcheeSelf && watcherSelf) {
publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered watch from $watcher to $watchee is illegal on FunctionRef"))
} else {
publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Watch($watchee,$watcher) for $this"))
}
}
@tailrec private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
_watchedBy.get() match {
case null ⇒ // do nothing...
case watchedBy ⇒
val watcheeSelf = watchee == this
val watcherSelf = watcher == this
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher))
if (!_watchedBy.compareAndSet(watchedBy, watchedBy - watcher))
remWatcher(watchee, watcher) // try again
} else if (!watcheeSelf && watcherSelf) {
publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered unwatch from $watcher to $watchee is illegal on FunctionRef"))
} else {
publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Unwatch($watchee,$watcher) for $this"))
}
}
}
private def publish(e: Logging.LogEvent): Unit = try eventStream.publish(e) catch { case NonFatal(_) ⇒ }
/**
* Have this FunctionRef watch the given Actor. This method must not be
* called concurrently from different threads, it should only be called by
* its parent Actor.
*
* Upon receiving the Terminated message, unwatch() must be called from a
* safe context (i.e. normally from the parent Actor).
*/
def watch(actorRef: ActorRef): Unit = {
watching += actorRef
actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Watch(actorRef.asInstanceOf[InternalActorRef], this))
}
/**
* Have this FunctionRef unwatch the given Actor. This method must not be
* called concurrently from different threads, it should only be called by
* its parent Actor.
*/
def unwatch(actorRef: ActorRef): Unit = {
watching -= actorRef
actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(actorRef.asInstanceOf[InternalActorRef], this))
}
/**
* Query whether this FunctionRef is currently watching the given Actor. This
* method must not be called concurrently from different threads, it should
* only be called by its parent Actor.
*/
def isWatching(actorRef: ActorRef): Boolean = watching.contains(actorRef)
}