/
Actor.scala
442 lines (408 loc) · 13.1 KB
/
Actor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
/*
// FIXME add support for ActorWithNestedReceive
trait ActorWithNestedReceive extends Actor {
import Actor.actor
private var nestedReactsProcessors: List[ActorRef] = Nil
private val processNestedReacts: Receive = {
case message if !nestedReactsProcessors.isEmpty =>
val processors = nestedReactsProcessors.reverse
processors.head forward message
nestedReactsProcessors = processors.tail.reverse
}
protected def react: Receive
protected def reactAgain(pf: Receive) = nestedReactsProcessors ::= actor(pf)
protected def receive = processNestedReacts orElse react
}
*/
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
* <p/>
* Equivalent to invoking the <code>makeTransactionRequired</code> method in the body of the <code>Actor</code
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
trait Transactor extends Actor {
self.makeTransactionRequired
}
/**
* Extend this abstract class to create a remote actor.
* <p/>
* Equivalent to invoking the <code>makeRemote(..)</code> method in the body of the <code>Actor</code
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
abstract class RemoteActor(hostname: String, port: Int) extends Actor {
self.makeRemote(hostname, port)
}
// Life-cycle messages for the Actors
@serializable sealed trait LifeCycleMessage
case class HotSwap(code: Option[Actor.Receive]) extends LifeCycleMessage
case class Restart(reason: Throwable) extends LifeCycleMessage
case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage
case class Link(child: ActorRef) extends LifeCycleMessage
case class Unlink(child: ActorRef) extends LifeCycleMessage
case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
// Exceptions for Actors
class ActorStartException private[akka](message: String) extends RuntimeException(message)
class ActorKilledException private[akka](message: String) extends RuntimeException(message)
class ActorInitializationException private[akka](message: String) extends RuntimeException(message)
/**
* Actor factory module with factory methods for creating various kinds of Actors.
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
* A Receive is a convenience type that defines actor message behavior currently modeled as
* a PartialFunction[Any, Unit].
*/
type Receive = PartialFunction[Any, Unit]
private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None)
/**
* Creates a Actor.actorOf out of the Actor with type T.
* <pre>
* import Actor._
* val actor = actorOf[MyActor]
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor].start
* </pre>
*/
def actorOf[T <: Actor: Manifest]: ActorRef = new LocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Creates a Actor.actorOf out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* This function should <b>NOT</b> be used for remote actors.
* <pre>
* import Actor._
* val actor = actorOf(new MyActor)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(new MyActor).start
* </pre>
*/
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory)
/**
* Use to create an anonymous event-driven actor.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = actor {
* case msg => ... // handle message
* }
* </pre>
*/
def actor(body: Receive): ActorRef =
actorOf(new Actor() {
self.lifeCycle = Some(LifeCycle(Permanent))
def receive: Receive = body
}).start
/**
* Use to create an anonymous transactional event-driven actor.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = transactor {
* case msg => ... // handle message
* }
* </pre>
*/
def transactor(body: Receive): ActorRef =
actorOf(new Transactor() {
self.lifeCycle = Some(LifeCycle(Permanent))
def receive: Receive = body
}).start
/**
* Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration,
* which means that if the actor is supervised and dies it will *not* be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = temporaryActor {
* case msg => ... // handle message
* }
* </pre>
*/
def temporaryActor(body: Receive): ActorRef =
actorOf(new Actor() {
self.lifeCycle = Some(LifeCycle(Temporary))
def receive = body
}).start
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* val a = Actor.init {
* ... // init stuff
* } receive {
* case msg => ... // handle message
* }
* </pre>
*
*/
def init[A](body: => Unit) = {
def handler[A](body: => Unit) = new {
def receive(handler: Receive) =
actorOf(new Actor() {
self.lifeCycle = Some(LifeCycle(Permanent))
body
def receive = handler
}).start
}
handler(body)
}
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
* <p/>
* NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already.
* Example:
* <pre>
* import Actor._
*
* spawn {
* ... // do stuff
* }
* </pre>
*/
def spawn(body: => Unit): Unit = {
case object Spawn
actorOf(new Actor() {
self.start
self ! Spawn
def receive = {
case Spawn => body; self.stop
}
})
}
}
/**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
* <p/>
* An actor has a well-defined (non-cyclic) life-cycle.
* <pre>
* => NEW (newly created actor) - can't receive messages (yet)
* => STARTED (when 'start' is invoked) - can receive messages
* => SHUT DOWN (when 'exit' is invoked) - can't do anything
* </pre>
*
* <p/>
* The Actor's API is available in the 'self' member variable.
*
* <p/>
* Here you find functions like:
* - !, !!, !!! and forward
* - link, unlink, startLink, spawnLink etc
* - makeTransactional, makeRemote etc.
* - start, stop
* - etc.
*
* <p/>
* Here you also find fields like
* - dispatcher = ...
* - id = ...
* - lifeCycle = ...
* - faultHandler = ...
* - trapExit = ...
* - etc.
*
* <p/>
* This means that to use them you have to prefix them with 'self', like this: <tt>self ! Message</tt>
*
* However, for convenience you can import these functions and fields like below, which will allow you do
* drop the 'self' prefix:
* <pre>
* class MyActor extends Actor {
* import self._
* id = ...
* dispatcher = ...
* spawnLink[OtherActor]
* ...
* }
* </pre>
*
* <p/>
* The Actor trait also has a 'log' member field that can be used for logging within the Actor.
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
trait Actor extends Logging {
/**
* Type alias because traits cannot have companion objects.
*/
type Receive = Actor.Receive
/*
* Option[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
Actor.actorRefInCreation.value = None
if (ref.isEmpty) throw new ActorInitializationException(
"ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
"\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))'" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
else ref
}
/*
* Some[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
/**
* The 'self' field holds the ActorRef for this actor.
* <p/>
* Can be used to send messages to itself:
* <pre>
* self ! message
* </pre>
* Here you also find most of the Actor API.
* <p/>
* For example fields like:
* <pre>
* self.dispactcher = ...
* self.trapExit = ...
* self.faultHandler = ...
* self.lifeCycle = ...
* self.sender
* </pre>
* <p/>
* Here you also find methods like:
* <pre>
* self.reply(..)
* self.link(..)
* self.unlink(..)
* self.start(..)
* self.stop(..)
* </pre>
*/
val self: ActorRef = {
val zelf = optionSelf.get
zelf.id = getClass.getName
zelf
}
/**
* User overridable callback/setting.
* <p/>
* Partial function implementing the actor logic.
* To be implemented by concrete actor class.
* <p/>
* Example code:
* <pre>
* def receive = {
* case Ping =>
* log.info("got a 'Ping' message")
* self.reply("pong")
*
* case OneWay =>
* log.info("got a 'OneWay' message")
*
* case unknown =>
* log.warning("unknown message [%s], ignoring", unknown)
* }
* </pre>
*/
protected def receive: Receive
/**
* User overridable callback.
* <p/>
* Is called when an Actor is started by invoking 'actor.start'.
*/
def init {}
/**
* User overridable callback.
* <p/>
* Is called when 'actor.stop' is invoked.
*/
def shutdown {}
/**
* User overridable callback.
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
*/
def preRestart(reason: Throwable) {}
/**
* User overridable callback.
* <p/>
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
*/
def postRestart(reason: Throwable) {}
/**
* User overridable callback.
* <p/>
* Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction.
*/
def initTransactionalState {}
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] def base: Receive = try {
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalStateException(
"The 'self' ActorRef reference for [" + getClass.getName + "] is NULL, error in the ActorRef initialization process.")
}
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
override def hashCode: Int = self.hashCode
override def equals(that: Any): Boolean = self.equals(that)
override def toString = self.toString
}