/
Dispatcher.scala
121 lines (106 loc) · 4.4 KB
/
Dispatcher.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
import akka.event.Logging.Error
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorCell
import akka.util.Duration
import java.util.concurrent._
/**
* The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
* `BlockingQueue`.
*
* The preferred way of creating dispatchers is to define configuration of it and use the
* the `lookup` method in [[akka.dispatch.Dispatchers]].
*
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
* mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
* always continues until the mailbox is empty.
* Larger values (or zero or negative) increase throughput, smaller values increase fairness
*/
class Dispatcher(
_prerequisites: DispatcherPrerequisites,
val id: String,
val throughput: Int,
val throughputDeadlineTime: Duration,
val mailboxType: MailboxType,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
val shutdownTimeout: Duration)
extends MessageDispatcher(_prerequisites) {
protected val executorServiceFactory: ExecutorServiceFactory =
executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory)
protected val executorService = new AtomicReference[ExecutorServiceDelegate](
new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService })
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}
protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) = {
val mbox = receiver.mailbox
mbox.systemEnqueue(receiver.self, invocation)
registerForExecution(mbox, false, true)
}
protected[akka] def executeTask(invocation: TaskInvocation) {
try {
executorService.get() execute invocation
} catch {
case e: RejectedExecutionException ⇒
try {
executorService.get() execute invocation
} catch {
case e2: RejectedExecutionException ⇒
prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!"))
throw e2
}
}
}
protected[akka] def createMailbox(actor: ActorCell): Mailbox = new Mailbox(actor, mailboxType.create(Some(actor))) with DefaultSystemMessageQueue
protected[akka] def shutdown: Unit =
Option(executorService.getAndSet(new ExecutorServiceDelegate {
lazy val executor = executorServiceFactory.createExecutorService
})) foreach { _.shutdown() }
/**
* Returns if it was registered
*/
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService.get() execute mbox
true
} catch {
case e: RejectedExecutionException ⇒
try {
executorService.get() execute mbox
true
} catch { //Retry once
case e: RejectedExecutionException ⇒
mbox.setAsIdle()
prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
throw e
}
}
} else false
} else false
}
override val toString = getClass.getSimpleName + "[" + id + "]"
}
object PriorityGenerator {
/**
* Creates a PriorityGenerator that uses the supplied function as priority generator
*/
def apply(priorityFunction: Any ⇒ Int): PriorityGenerator = new PriorityGenerator {
def gen(message: Any): Int = priorityFunction(message)
}
}
/**
* A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
* PriorityDispatcher
*/
abstract class PriorityGenerator extends java.util.Comparator[Envelope] {
def gen(message: Any): Int
final def compare(thisMessage: Envelope, thatMessage: Envelope): Int =
gen(thisMessage.message) - gen(thatMessage.message)
}