-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Mailbox.scala
479 lines (401 loc) · 16.9 KB
/
Mailbox.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import akka.util._
import akka.actor.{ ActorCell, ActorRef }
import java.util.concurrent._
import annotation.tailrec
import akka.event.Logging.Error
import akka.actor.ActorContext
import com.typesafe.config.Config
import akka.actor.ActorSystem
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
@deprecated("Will become private[akka] in 2.1, this is not user-api", "2.0.2")
object Mailbox {
type Status = Int
/*
* the following assigned numbers CANNOT be changed without looking at the code which uses them!
*/
// primary status: only first three
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero!
final val Suspended = 1
final val Closed = 2
// secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4
// mailbox debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
final val debug = false
}
/**
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
* but can't be exposed to user defined mailbox subclasses.
*
*/
private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
import Mailbox._
/**
* Try to enqueue the message to this queue, or throw an exception.
*/
def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)
/**
* Try to dequeue the next message from this queue, return null failing that.
*/
def dequeue(): Envelope = messageQueue.dequeue()
/**
* Indicates whether this queue is non-empty.
*/
def hasMessages: Boolean = messageQueue.hasMessages
/**
* Should return the current number of messages held in this queue; may
* always return 0 if no other value is available efficiently. Do not use
* this for testing for presence of messages, use `hasMessages` instead.
*/
def numberOfMessages: Int = messageQueue.numberOfMessages
@volatile
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
@volatile
protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default
@inline
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline
final def shouldProcessMessage: Boolean = (status & 3) == Open
@inline
final def isSuspended: Boolean = (status & 3) == Suspended
@inline
final def isClosed: Boolean = status == Closed
@inline
final def isScheduled: Boolean = (status & Scheduled) != 0
@inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)
@inline
protected final def setStatus(newStatus: Status): Unit =
Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus)
/**
* set new primary status Open. Caller does not need to worry about whether
* status was Scheduled or not.
*/
@tailrec
final def becomeOpen(): Boolean = status match {
case Closed ⇒ setStatus(Closed); false
case s ⇒ updateStatus(s, Open | s & Scheduled) || becomeOpen()
}
/**
* set new primary status Suspended. Caller does not need to worry about whether
* status was Scheduled or not.
*/
@tailrec
final def becomeSuspended(): Boolean = status match {
case Closed ⇒ setStatus(Closed); false
case s ⇒ updateStatus(s, Suspended | s & Scheduled) || becomeSuspended()
}
/**
* set new primary status Closed. Caller does not need to worry about whether
* status was Scheduled or not.
*/
@tailrec
final def becomeClosed(): Boolean = status match {
case Closed ⇒ setStatus(Closed); false
case s ⇒ updateStatus(s, Closed) || becomeClosed()
}
/**
* Set Scheduled status, keeping primary status as is.
*/
@tailrec
final def setAsScheduled(): Boolean = {
val s = status
/*
* only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set (this is one of the reasons why the numbers
* cannot be changed in object Mailbox above)
*/
if (s <= Suspended) updateStatus(s, s | Scheduled) || setAsScheduled()
else false
}
/**
* Reset Scheduled status, keeping primary status as is.
*/
@tailrec
final def setAsIdle(): Boolean = {
val s = status
/*
* only try to remove Scheduled bit if currently Scheduled, not Closed or
* without Scheduled bit set (this is one of the reasons why the numbers
* cannot be changed in object Mailbox above)
*/
updateStatus(s, s & ~Scheduled) || setAsIdle()
}
/*
* AtomicReferenceFieldUpdater for system queue
*/
protected final def systemQueueGet: SystemMessage =
Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage]
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new)
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed ⇒ false
case _ ⇒ hasSystemMessageHint || hasSystemMessages
}
final def run = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
/**
* Process the messages in the mailbox
*/
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue()
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
}
}
final def processAllSystemMessages() {
var nextMessage = systemDrain()
try {
while ((nextMessage ne null) && !isClosed) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with " + actor.childrenRefs)
actor systemInvoke nextMessage
nextMessage = nextMessage.next
// don’t ever execute normal message when system message present!
if (nextMessage eq null) nextMessage = systemDrain()
}
} catch {
case NonFatal(e) ⇒
actor.system.eventStream.publish(Error(e, actor.self.path.toString, this.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
throw e
}
}
@inline
final def dispatcher: MessageDispatcher = actor.dispatcher
/**
* Overridable callback to clean up the mailbox,
* called when an actor is unregistered.
* By default it dequeues all system messages + messages and ships them to the owning actors' systems' DeadLetterMailbox
*/
protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox
val dlm = actor.systemImpl.deadLetterMailbox
if (hasSystemMessages) {
var message = systemDrain()
while (message ne null) {
// message must be “virgin” before being able to systemEnqueue again
val next = message.next
message.next = null
dlm.systemEnqueue(actor.self, message)
message = next
}
}
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue)
}
}
trait MessageQueue {
/**
* Try to enqueue the message to this queue, or throw an exception.
*/
def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed
/**
* Try to dequeue the next message from this queue, return null failing that.
*/
def dequeue(): Envelope
/**
* Should return the current number of messages held in this queue; may
* always return 0 if no other value is available efficiently. Do not use
* this for testing for presence of messages, use `hasMessages` instead.
*/
def numberOfMessages: Int
/**
* Indicates whether this queue is non-empty.
*/
def hasMessages: Boolean
/**
* Called when the mailbox this queue belongs to is disposed of. Normally it
* is expected to transfer all remaining messages into the dead letter queue
* which is passed in. The owner of this MessageQueue is passed in if
* available (e.g. for creating DeadLetters()), “/deadletters” otherwise.
*/
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit
}
/**
* Internal mailbox implementation detail.
*/
private[akka] trait SystemMessageQueue {
/**
* Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list.
*/
def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit
/**
* Dequeue all messages from system queue and return them as single-linked list.
*/
def systemDrain(): SystemMessage
def hasSystemMessages: Boolean
}
/**
* Internal mailbox implementation detail.
*/
private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null)
if (Mailbox.debug) println(actor.self + " having enqueued " + message)
val head = systemQueueGet
/*
* this write is safely published by the compareAndSet contained within
* systemQueuePut; “Intra-Thread Semantics” on page 12 of the JSR133 spec
* guarantees that “head” uses the value obtained from systemQueueGet above.
* Hence, SystemMessage.next does not need to be volatile.
*/
message.next = head
if (!systemQueuePut(head, message)) {
message.next = null
systemEnqueue(receiver, message)
}
}
@tailrec
final def systemDrain(): SystemMessage = {
val head = systemQueueGet
if (systemQueuePut(head, null)) SystemMessage.reverse(head) else systemDrain()
}
def hasSystemMessages: Boolean = systemQueueGet ne null
}
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
if (hasMessages) {
var envelope = dequeue
while (envelope ne null) {
deadLetters.enqueue(owner.self, envelope)
envelope = dequeue
}
}
}
}
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def dequeue(): Envelope = queue.poll()
}
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingQueue[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope) {
if (pushTimeOut.length > 0) {
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
} else queue put handle
}
def dequeue(): Envelope = queue.poll()
}
trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
def queue: Deque[Envelope]
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit
}
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle
def dequeue(): Envelope = queue.poll()
}
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingDeque[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue put handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue putFirst handle
def dequeue(): Envelope = queue.poll()
}
/**
* Mailbox configuration.
*/
trait MailboxType {
def create(owner: Option[ActorContext]): MessageQueue
}
/**
* It's a case class for Java (new UnboundedMailbox)
*/
case class UnboundedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
}
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
}
/**
* Extend me to provide the comparator
*/
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
final override def create(owner: Option[ActorContext]): MessageQueue =
new PriorityBlockingQueue[Envelope](11, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
}
/**
* Extend me to provide the comparator
*/
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}
}
case class UnboundedDequeBasedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
new LinkedBlockingDeque[Envelope]() with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
final val queue = this
}
}
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedDequeBasedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
new LinkedBlockingDeque[Envelope](capacity) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
final val queue = this
final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut
}
}