-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathAbstractDispatcher.scala
407 lines (352 loc) · 16 KB
/
AbstractDispatcher.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.dispatch
import java.util.concurrent._
import java.{ util ⇒ ju }
import akka.actor._
import akka.dispatch.affinity.AffinityPoolConfigurator
import akka.dispatch.sysmsg._
import akka.event.EventStream
import akka.event.Logging.{ Debug, Error, LogEventException }
import akka.util.{ Index, Unsafe }
import com.typesafe.config.Config
import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.control.NonFatal
final case class Envelope private (val message: Any, val sender: ActorRef)
object Envelope {
def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope = {
if (message == null) throw InvalidMessageException("Message is null")
new Envelope(message, if (sender ne Actor.noSender) sender else system.deadLetters)
}
}
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Batchable {
final override def isBatchable: Boolean = runnable match {
case b: Batchable ⇒ b.isBatchable
case _: scala.concurrent.OnCompleteRunnable ⇒ true
case _ ⇒ false
}
def run(): Unit =
try runnable.run() catch {
case NonFatal(e) ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
} finally cleanup()
}
/**
* INTERNAL API
*/
private[akka] trait LoadMetrics { self: Executor ⇒
def atFullThrottle(): Boolean
}
/**
* INTERNAL API
*/
private[akka] object MessageDispatcher {
val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher
val SCHEDULED = 1
val RESCHEDULED = 2
// dispatcher debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
final val debug = false // Deliberately without type ascription to make it a compile-time constant
lazy val actors = new Index[MessageDispatcher, ActorRef](16, new ju.Comparator[ActorRef] {
override def compare(a: ActorRef, b: ActorRef): Int = a.compareTo(b)
})
def printActors(): Unit =
if (debug) {
for {
d ← actors.keys
a ← { println(d + " inhabitants: " + d.inhabitants); actors.valueIterator(d) }
} {
val status = if (a.isTerminated) " (terminated)" else " (alive)"
val messages = a match {
case r: ActorRefWithCell ⇒ " " + r.underlying.numberOfMessages + " messages"
case _ ⇒ " " + a.getClass
}
val parent = a match {
case i: InternalActorRef ⇒ ", parent: " + i.getParent
case _ ⇒ ""
}
println(" -> " + a + status + messages + parent)
}
}
}
abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContextExecutor {
import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset }
import MessageDispatcher._
import configurator.prerequisites
val mailboxes = prerequisites.mailboxes
val eventStream = prerequisites.eventStream
@volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH!
@volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH!
private final def addInhabitants(add: Long): Long = {
val old = Unsafe.instance.getAndAddLong(this, inhabitantsOffset, add)
val ret = old + add
if (ret < 0) {
// We haven't succeeded in decreasing the inhabitants yet but the simple fact that we're trying to
// go below zero means that there is an imbalance and we might as well throw the exception
val e = new IllegalStateException("ACTOR SYSTEM CORRUPTED!!! A dispatcher can't have less than 0 inhabitants!")
reportFailure(e)
throw e
}
ret
}
final def inhabitants: Long = Unsafe.instance.getLongVolatile(this, inhabitantsOffset)
private final def shutdownSchedule: Int = Unsafe.instance.getIntVolatile(this, shutdownScheduleOffset)
private final def updateShutdownSchedule(expect: Int, update: Int): Boolean = Unsafe.instance.compareAndSwapInt(this, shutdownScheduleOffset, expect, update)
/**
* Creates and returns a mailbox for the given actor.
*/
protected[akka] def createMailbox(actor: Cell, mailboxType: MailboxType): Mailbox
/**
* Identifier of this dispatcher, corresponds to the full key
* of the dispatcher configuration.
*/
def id: String
/**
* Attaches the specified actor instance to this dispatcher, which includes
* scheduling it to run for the first time (Create() is expected to have
* been enqueued by the ActorCell upon mailbox creation).
*/
final def attach(actor: ActorCell): Unit = {
register(actor)
registerForExecution(actor.mailbox, false, true)
}
/**
* Detaches the specified actor instance from this dispatcher
*/
final def detach(actor: ActorCell): Unit = try unregister(actor) finally ifSensibleToDoSoThenScheduleShutdown()
final protected def resubmitOnBlock: Boolean = true // We want to avoid starvation
final override protected def unbatchedExecute(r: Runnable): Unit = {
val invocation = TaskInvocation(eventStream, r, taskCleanup)
addInhabitants(+1)
try {
executeTask(invocation)
} catch {
case t: Throwable ⇒
addInhabitants(-1)
throw t
}
}
override def reportFailure(t: Throwable): Unit = t match {
case e: LogEventException ⇒ eventStream.publish(e.event)
case _ ⇒ eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage))
}
@tailrec
private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = {
if (inhabitants <= 0) shutdownSchedule match {
case UNSCHEDULED ⇒
if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) scheduleShutdownAction()
else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED ⇒
if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) ()
else ifSensibleToDoSoThenScheduleShutdown()
case RESCHEDULED ⇒
}
}
private def scheduleShutdownAction(): Unit = {
// IllegalStateException is thrown if scheduler has been shutdown
try prerequisites.scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext {
override def execute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t)
}) catch {
case _: IllegalStateException ⇒
shutdown()
// Since there is no scheduler anymore, restore the state to UNSCHEDULED.
// When this dispatcher is used again,
// shutdown is only attempted if the state is UNSCHEDULED
// (as per ifSensibleToDoSoThenScheduleShutdown above)
updateShutdownSchedule(SCHEDULED, UNSCHEDULED)
}
}
private final val taskCleanup: () ⇒ Unit = () ⇒ if (addInhabitants(-1) == 0) ifSensibleToDoSoThenScheduleShutdown()
/**
* If you override it, you must call it. But only ever once. See "attach" for only invocation.
*
* INTERNAL API
*/
protected[akka] def register(actor: ActorCell) {
if (debug) actors.put(this, actor.self)
addInhabitants(+1)
}
/**
* If you override it, you must call it. But only ever once. See "detach" for the only invocation
*
* INTERNAL API
*/
protected[akka] def unregister(actor: ActorCell) {
if (debug) actors.remove(this, actor.self)
addInhabitants(-1)
val mailBox = actor.swapMailbox(mailboxes.deadLetterMailbox)
mailBox.becomeClosed()
mailBox.cleanUp()
}
private val shutdownAction = new Runnable {
@tailrec
final def run() {
shutdownSchedule match {
case SCHEDULED ⇒
try {
if (inhabitants == 0) shutdown() //Warning, racy
} finally {
while (!updateShutdownSchedule(shutdownSchedule, UNSCHEDULED)) {}
}
case RESCHEDULED ⇒
if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction()
else run()
case UNSCHEDULED ⇒
}
}
}
/**
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down,
* defaulting to your akka configs "akka.actor.default-dispatcher.shutdown-timeout" or default specified in
* reference.conf
*
* INTERNAL API
*/
protected[akka] def shutdownTimeout: FiniteDuration
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
*/
protected[akka] def suspend(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if ((mbox.actor eq actor) && (mbox.dispatcher eq this))
mbox.suspend()
}
/*
* After the call to this method, the dispatcher must begin any new message processing for the specified reference
*/
protected[akka] def resume(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume())
registerForExecution(mbox, false, false)
}
/**
* Will be called when the dispatcher is to queue an invocation for execution
*
* INTERNAL API
*/
protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage)
/**
* Will be called when the dispatcher is to queue an invocation for execution
*
* INTERNAL API
*/
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope)
/**
* Suggest to register the provided mailbox for execution
*
* INTERNAL API
*/
protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean
// TODO check whether this should not actually be a property of the mailbox
/**
* INTERNAL API
*/
protected[akka] def throughput: Int
/**
* INTERNAL API
*/
protected[akka] def throughputDeadlineTime: Duration
/**
* INTERNAL API
*/
@inline protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime.toMillis > 0
/**
* INTERNAL API
*/
protected[akka] def executeTask(invocation: TaskInvocation)
/**
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
* Must be idempotent
*
* INTERNAL API
*/
protected[akka] def shutdown(): Unit
}
/**
* An ExecutorServiceConfigurator is a class that given some prerequisites and a configuration can create instances of ExecutorService
*/
abstract class ExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceFactoryProvider
/**
* Base class to be used for hooking in new dispatchers into Dispatchers.
*/
abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: DispatcherPrerequisites) {
val config: Config = new CachingConfig(_config)
/**
* Returns an instance of MessageDispatcher given the configuration.
* Depending on the needs the implementation may return a new instance for
* each invocation or return the same instance every time.
*/
def dispatcher(): MessageDispatcher
def configureExecutor(): ExecutorServiceConfigurator = {
def configurator(executor: String): ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case "affinity-pool-executor" ⇒ new AffinityPoolConfigurator(config.getConfig("affinity-pool-executor"), prerequisites)
case fqcn ⇒
val args = List(
classOf[Config] → config,
classOf[DispatcherPrerequisites] → prerequisites)
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
case exception ⇒ throw new IllegalArgumentException(
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature""")
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
}).get
}
config.getString("executor") match {
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
case other ⇒ configurator(other)
}
}
}
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
protected def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
import akka.util.Helpers.ConfigOps
val builder =
ThreadPoolConfigBuilder(ThreadPoolConfig())
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.configure(
Some(config getInt "task-queue-size") flatMap {
case size if size > 0 ⇒
Some(config getString "task-queue-type") map {
case "array" ⇒ ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" ⇒ ThreadPoolConfig.linkedBlockingQueue(size)
case x ⇒ throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
} map { qf ⇒ (q: ThreadPoolConfigBuilder) ⇒ q.setQueueFactory(qf) }
case _ ⇒ None
})
if (config.getString("fixed-pool-size") == "off")
builder
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
.setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max")
else
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))
}
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
threadPoolConfig.createExecutorServiceFactory(id, threadFactory)
}
class DefaultExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites, fallback: ExecutorServiceConfigurator) extends ExecutorServiceConfigurator(config, prerequisites) {
val provider: ExecutorServiceFactoryProvider =
prerequisites.defaultExecutionContext match {
case Some(ec) ⇒
prerequisites.eventStream.publish(Debug("DefaultExecutorServiceConfigurator", this.getClass, s"Using passed in ExecutionContext as default executor for this ActorSystem. If you want to use a different executor, please specify one in akka.actor.default-dispatcher.default-executor."))
new AbstractExecutorService with ExecutorServiceFactory with ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = this
def createExecutorService: ExecutorService = this
def shutdown(): Unit = ()
def isTerminated: Boolean = false
def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false
def shutdownNow(): ju.List[Runnable] = ju.Collections.emptyList()
def execute(command: Runnable): Unit = ec.execute(command)
def isShutdown: Boolean = false
}
case None ⇒ fallback
}
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
provider.createExecutorServiceFactory(id, threadFactory)
}