-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
TypedActor.scala
633 lines (557 loc) · 24.1 KB
/
TypedActor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
package akka.actor
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
import akka.japi.{ Creator, Option ⇒ JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Timeout, NonFatal }
import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.dispatch._
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.lang.IllegalStateException
import akka.util.Duration
trait TypedActorFactory {
protected def actorFactory: ActorRefFactory
protected def typedActor: TypedActorExtension
/**
* Stops the underlying ActorRef for the supplied TypedActor proxy,
* if any, returns whether it could find the find the ActorRef or not
*/
def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
case null ⇒ false
case ref ⇒ ref.asInstanceOf[InternalActorRef].stop; true
}
/**
* Sends a PoisonPill the underlying ActorRef for the supplied TypedActor proxy,
* if any, returns whether it could find the find the ActorRef or not
*/
def poisonPill(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
case null ⇒ false
case ref ⇒ ref ! PoisonPill; true
}
/**
* Returns wether the supplied AnyRef is a TypedActor proxy or not
*/
def isTypedActor(proxyOrNot: AnyRef): Boolean
/**
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
*/
def getActorRefFor(proxy: AnyRef): ActorRef
/**
* Creates a new TypedActor with the specified properties
*/
def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T]): R = {
val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver
val c = props.creator //Cache this to avoid closing over the Props
val ap = props.actorProps.withCreator(new TypedActor.TypedActor[R, T](proxyVar, c()))
typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap))
}
/**
* Creates a new TypedActor with the specified properties
*/
def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], name: String): R = {
val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver
val c = props.creator //Cache this to avoid closing over the Props
val ap = props.actorProps.withCreator(new akka.actor.TypedActor.TypedActor[R, T](proxyVar, c()))
typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap, name))
}
/**
* Creates a TypedActor that intercepts the calls and forwards them as [[akka.actor.TypedActor.MethodCall]]
* to the provided ActorRef.
*/
def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], actorRef: ActorRef): R =
typedActor.createActorRefProxy(props, null: AtomVar[R], actorRef)
}
object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvider {
override def get(system: ActorSystem): TypedActorExtension = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem): TypedActorExtension = new TypedActorExtension(system)
/**
* Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension
* will be children to the specified context, this allows for creating hierarchies of TypedActors.
* Do _not_ let this instance escape the TypedActor since that will not be thread-safe.
*/
def apply(context: ActorContext): TypedActorFactory = ContextualTypedActorFactory(apply(context.system), context)
/**
* Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension
* will be children to the specified context, this allows for creating hierarchies of TypedActors.
* Do _not_ let this instance escape the TypedActor since that will not be thread-safe.
*
* Java API
*/
def get(context: ActorContext): TypedActorFactory = apply(context)
/**
* This class represents a Method call, and has a reference to the Method to be called and the parameters to supply
* It's sent to the ActorRef backing the TypedActor and can be serialized and deserialized
*/
case class MethodCall(method: Method, parameters: Array[AnyRef]) {
def isOneWay = method.getReturnType == java.lang.Void.TYPE
def returnsFuture_? = classOf[Future[_]].isAssignableFrom(method.getReturnType)
def returnsJOption_? = classOf[akka.japi.Option[_]].isAssignableFrom(method.getReturnType)
def returnsOption_? = classOf[scala.Option[_]].isAssignableFrom(method.getReturnType)
/**
* Invokes the Method on the supplied instance
*
* @throws the underlying exception if there's an InvocationTargetException thrown on the invocation
*/
def apply(instance: AnyRef): AnyRef = try {
parameters match {
case null ⇒ method.invoke(instance)
case args if args.length == 0 ⇒ method.invoke(instance)
case args ⇒ method.invoke(instance, args: _*)
}
} catch { case i: InvocationTargetException ⇒ throw i.getTargetException }
private def writeReplace(): AnyRef = parameters match {
case null ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null)
case ps if ps.length == 0 ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array())
case ps ⇒
val serialization = SerializationExtension(akka.serialization.JavaSerializer.currentSystem.value)
val serializedParameters = Array.ofDim[(Int, Class[_], Array[Byte])](ps.length)
for (i ← 0 until ps.length) {
val p = ps(i)
val s = serialization.findSerializerFor(p)
val m = if (s.includeManifest) p.getClass else null
serializedParameters(i) = (s.identifier, m, s toBinary parameters(i)) //Mutable for the sake of sanity
}
SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, serializedParameters)
}
}
/**
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
*/
@deprecated("Will become private[akka] in 2.1, this is not user-api", "2.0.2")
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Int, Class[_], Array[Byte])]) {
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = {
val system = akka.serialization.JavaSerializer.currentSystem.value
if (system eq null) throw new IllegalStateException(
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
val serialization = SerializationExtension(system)
MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
case null ⇒ null
case a if a.length == 0 ⇒ Array[AnyRef]()
case a ⇒
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
for (i ← 0 until a.length) {
val (sId, manifest, bytes) = a(i)
deserializedParameters(i) =
serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest))
}
deserializedParameters
})
}
}
private val selfReference = new ThreadLocal[AnyRef]
private val currentContext = new ThreadLocal[ActorContext]
/**
* Returns the reference to the proxy when called inside a method call in a TypedActor
*
* Example:
* <p/>
* class FooImpl extends Foo {
* def doFoo {
* val myself = TypedActor.self[Foo]
* }
* }
*
* Useful when you want to send a reference to this TypedActor to someone else.
*
* NEVER EXPOSE "this" to someone else, always use "self[TypeOfInterface(s)]"
*
* @throws IllegalStateException if called outside of the scope of a method on this TypedActor
* @throws ClassCastException if the supplied type T isn't the type of the proxy associated with this TypedActor
*/
def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] match {
case null ⇒ throw new IllegalStateException("Calling TypedActor.self outside of a TypedActor implementation method!")
case some ⇒ some
}
/**
* Returns the ActorContext (for a TypedActor) when inside a method call in a TypedActor.
*/
def context: ActorContext = currentContext.get match {
case null ⇒ throw new IllegalStateException("Calling TypedActor.context outside of a TypedActor implementation method!")
case some ⇒ some
}
/**
* Returns the default dispatcher (for a TypedActor) when inside a method call in a TypedActor.
*/
implicit def dispatcher = context.dispatcher
/**
* Implementation of TypedActor as an Actor
*/
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T) extends Actor {
val me = try {
TypedActor.selfReference set proxyVar.get
TypedActor.currentContext set context
createInstance
} finally {
TypedActor.selfReference set null
TypedActor.currentContext set null
}
override def supervisorStrategy(): SupervisorStrategy = me match {
case l: Supervisor ⇒ l.supervisorStrategy
case _ ⇒ super.supervisorStrategy
}
override def preStart(): Unit = withContext {
me match {
case l: PreStart ⇒ l.preStart()
case _ ⇒ super.preStart()
}
}
override def postStop(): Unit = try {
withContext {
me match {
case l: PostStop ⇒ l.postStop()
case _ ⇒ super.postStop()
}
}
} finally {
TypedActor(context.system).invocationHandlerFor(proxyVar.get) match {
case null ⇒
case some ⇒
some.actorVar.set(context.system.deadLetters) //Point it to the DLQ
proxyVar.set(null.asInstanceOf[R])
}
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = withContext {
me match {
case l: PreRestart ⇒ l.preRestart(reason, message)
case _ ⇒ super.preRestart(reason, message)
}
}
override def postRestart(reason: Throwable): Unit = withContext {
me match {
case l: PostRestart ⇒ l.postRestart(reason)
case _ ⇒ super.postRestart(reason)
}
}
protected def withContext[T](unitOfWork: ⇒ T): T = {
TypedActor.selfReference set proxyVar.get
TypedActor.currentContext set context
try unitOfWork finally {
TypedActor.selfReference set null
TypedActor.currentContext set null
}
}
def receive = {
case m: MethodCall ⇒ withContext {
if (m.isOneWay) m(me)
else {
try {
if (m.returnsFuture_?) {
val s = sender
m(me).asInstanceOf[Future[Any]] onComplete {
case Left(f) ⇒ s ! Status.Failure(f)
case Right(r) ⇒ s ! r
}
} else {
sender ! m(me)
}
} catch {
case NonFatal(e) ⇒
sender ! Status.Failure(e)
throw e
}
}
}
case msg if me.isInstanceOf[Receiver] ⇒ withContext {
me.asInstanceOf[Receiver].onReceive(msg, sender)
}
}
}
/**
* Mix this into your TypedActor to be able to define supervisor strategy
*/
trait Supervisor {
/**
* User overridable definition the strategy to use for supervising
* child actors.
*/
def supervisorStrategy(): SupervisorStrategy
}
/**
* Mix this into your TypedActor to be able to intercept Terminated messages
*/
trait Receiver {
def onReceive(message: Any, sender: ActorRef): Unit
}
/**
* Mix this into your TypedActor to be able to hook into its lifecycle
*/
trait PreStart {
/**
* User overridable callback.
* <p/>
* Is called when an Actor is started by invoking 'actor'.
*/
def preStart(): Unit
}
/**
* Mix this into your TypedActor to be able to hook into its lifecycle
*/
trait PostStop {
/**
* User overridable callback.
* <p/>
* Is called when 'actor.stop()' is invoked.
*/
def postStop(): Unit
}
/**
* Mix this into your TypedActor to be able to hook into its lifecycle
*/
trait PreRestart {
/**
* User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''
* @param reason the Throwable that caused the restart to happen
* @param message optionally the current message the actor processed when failing, if applicable
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean
* up of resources before Actor is terminated.
* By default it terminates all children and calls postStop()
*/
def preRestart(reason: Throwable, message: Option[Any]): Unit
}
trait PostRestart {
/**
* User overridable callback: By default it calls `preStart()`.
* @param reason the Throwable that caused the restart to happen
* <p/>
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
*/
def postRestart(reason: Throwable): Unit
}
private[akka] class TypedActorInvocationHandler(val extension: TypedActorExtension, val actorVar: AtomVar[ActorRef], val timeout: Timeout) extends InvocationHandler {
def actor = actorVar.get
@throws(classOf[Throwable])
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
case "toString" ⇒ actor.toString
case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef]
case _ ⇒
import akka.pattern.ask
MethodCall(method, args) match {
case m if m.isOneWay ⇒ actor ! m; null //Null return value
case m if m.returnsFuture_? ⇒ ask(actor, m)(timeout)
case m if m.returnsJOption_? || m.returnsOption_? ⇒
val f = ask(actor, m)(timeout)
(try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match {
case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None
case Some(Right(joption: AnyRef)) ⇒ joption
case Some(Left(ex)) ⇒ throw ex
}
case m ⇒ Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef]
}
}
}
}
/**
* TypedProps is a TypedActor configuration object, that is thread safe and fully sharable.
* It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance.
*/
object TypedProps {
val defaultDispatcherId: String = Dispatchers.DefaultDispatcherId
val defaultTimeout: Option[Timeout] = None
val defaultLoader: Option[ClassLoader] = None
/**
* @return a sequence of interfaces that the specified class implements,
* or a sequence containing only itself, if itself is an interface.
*/
def extractInterfaces(clazz: Class[_]): Seq[Class[_]] =
if (clazz.isInterface) Seq[Class[_]](clazz) else clazz.getInterfaces.toList
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* proxying all the interfaces it implements.
*
* Scala API
*/
def apply[T <: AnyRef](implementation: Class[T]): TypedProps[T] =
new TypedProps[T](implementation)
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*
* Scala API
*/
def apply[T <: AnyRef](interface: Class[_ >: T], implementation: Class[T]): TypedProps[T] =
new TypedProps[T](extractInterfaces(interface), () ⇒ implementation.newInstance())
/**
* Uses the supplied thunk as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*
* Scala API
*/
def apply[T <: AnyRef](interface: Class[_ >: T], creator: ⇒ T): TypedProps[T] =
new TypedProps[T](extractInterfaces(interface), () ⇒ creator)
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* proxying all the interfaces it implements.
*
* Scala API
*/
def apply[T <: AnyRef: ClassManifest](): TypedProps[T] =
new TypedProps[T](implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[T]])
}
/**
* TypedProps is a TypedActor configuration object, that is thread safe and fully sharable.
* It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance.
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
case class TypedProps[T <: AnyRef] protected[TypedProps] (
interfaces: Seq[Class[_]],
creator: () ⇒ T,
dispatcher: String = TypedProps.defaultDispatcherId,
deploy: Deploy = Props.defaultDeploy,
timeout: Option[Timeout] = TypedProps.defaultTimeout,
loader: Option[ClassLoader] = TypedProps.defaultLoader) {
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*/
def this(implementation: Class[T]) =
this(interfaces = TypedProps.extractInterfaces(implementation),
creator = () ⇒ implementation.newInstance())
/**
* Uses the supplied Creator as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*
* Java API.
*/
def this(interface: Class[_ >: T], implementation: Creator[T]) =
this(interfaces = TypedProps.extractInterfaces(interface),
creator = () ⇒ implementation.create())
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*
* Java API.
*/
def this(interface: Class[_ >: T], implementation: Class[T]) =
this(interfaces = TypedProps.extractInterfaces(interface),
creator = () ⇒ implementation.newInstance())
/**
* Returns a new TypedProps with the specified dispatcher set.
*/
def withDispatcher(d: String): TypedProps[T] = copy(dispatcher = d)
/**
* Returns a new TypedProps with the specified deployment configuration.
*/
def withDeploy(d: Deploy): TypedProps[T] = copy(deploy = d)
/**
* @return a new TypedProps that will use the specified ClassLoader to create its proxy class in
* If loader is null, it will use the bootstrap classloader.
*
* Java API
*/
def withLoader(loader: ClassLoader): TypedProps[T] = withLoader(Option(loader))
/**
* @return a new TypedProps that will use the specified ClassLoader to create its proxy class in
* If loader is null, it will use the bootstrap classloader.
*
* Scala API
*/
def withLoader(loader: Option[ClassLoader]): TypedProps[T] = this.copy(loader = loader)
/**
* @return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
* if null is specified, it will use the default timeout as specified in the configuration.
*
* Java API
*/
def withTimeout(timeout: Timeout): TypedProps[T] = this.copy(timeout = Option(timeout))
/**
* @return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
* if None is specified, it will use the default timeout as specified in the configuration.
*
* Scala API
*/
def withTimeout(timeout: Option[Timeout]): TypedProps[T] = this.copy(timeout = timeout)
/**
* Returns a new TypedProps that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*/
def withInterface(interface: Class[_ >: T]): TypedProps[T] =
this.copy(interfaces = interfaces ++ TypedProps.extractInterfaces(interface))
/**
* Returns a new TypedProps without the specified interface,
* or if the interface class is not an interface, all the interfaces it implements.
*/
def withoutInterface(interface: Class[_ >: T]): TypedProps[T] =
this.copy(interfaces = interfaces diff TypedProps.extractInterfaces(interface))
import akka.actor.{ Props ⇒ ActorProps }
def actorProps(): ActorProps =
if (dispatcher == ActorProps().dispatcher) ActorProps()
else ActorProps(dispatcher = dispatcher)
}
case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory {
override def getActorRefFor(proxy: AnyRef): ActorRef = typedActor.getActorRefFor(proxy)
override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot)
}
class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory with Extension {
import TypedActor._ //Import the goodies from the companion object
protected def actorFactory: ActorRefFactory = system
protected def typedActor = this
val serialization = SerializationExtension(system)
val settings = system.settings
/**
* Default timeout for typed actor methods with non-void return type
*/
final val DefaultReturnTimeout = Timeout(Duration(settings.config.getMilliseconds("akka.actor.typed.timeout"), MILLISECONDS))
/**
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
*/
def getActorRefFor(proxy: AnyRef): ActorRef = invocationHandlerFor(proxy) match {
case null ⇒ null
case handler ⇒ handler.actor
}
/**
* Returns wether the supplied AnyRef is a TypedActor proxy or not
*/
def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null
// Private API
private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ⇒ ActorRef): R = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null)
val classLoader: ClassLoader = if (props.loader.nonEmpty) props.loader.get else props.interfaces.headOption.map(_.getClassLoader).orNull //If we have no loader, we arbitrarily take the loader of the first interface
val proxy = Proxy.newProxyInstance(
classLoader,
props.interfaces.toArray,
new TypedActorInvocationHandler(
this,
actorVar,
if (props.timeout.isDefined) props.timeout.get else DefaultReturnTimeout)).asInstanceOf[R]
proxyVar match {
case null ⇒
actorVar.set(actorRef)
proxy
case _ ⇒
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
actorVar.set(actorRef) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
proxyVar.get
}
}
private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler =
if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match {
case null ⇒ null
case other ⇒ Proxy.getInvocationHandler(other) match {
case null ⇒ null
case handler: TypedActorInvocationHandler ⇒ handler
case _ ⇒ null
}
}
else null
}