-
Notifications
You must be signed in to change notification settings - Fork 3k
/
Actor.scala
138 lines (119 loc) · 4.25 KB
/
Actor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package fpinscala.parallelism
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.{Callable,ExecutorService}
import annotation.tailrec
/*
* Implementation is taken from `scalaz` library, with only minor changes. See:
*
* https://github.com/scalaz/scalaz/blob/scalaz-seven/concurrent/src/main/scala/scalaz/concurrent/Actor.scala
*
* This code is copyright Andriy Plokhotnyuk, Runar Bjarnason, and other contributors,
* and is licensed using 3-clause BSD, see LICENSE file at:
*
* https://github.com/scalaz/scalaz/blob/scalaz-seven/etc/LICENCE
*/
/**
* Processes messages of type `A`, one at a time. Messages are submitted to
* the actor with the method `!`. Processing is typically performed asynchronously,
* this is controlled by the provided `strategy`.
*
* Memory consistency guarantee: when each message is processed by the `handler`, any memory that it
* mutates is guaranteed to be visible by the `handler` when it processes the next message, even if
* the `strategy` runs the invocations of `handler` on separate threads. This is achieved because
* the `Actor` reads a volatile memory location before entering its event loop, and writes to the same
* location before suspending.
*
* Implementation based on non-intrusive MPSC node-based queue, described by Dmitriy Vyukov:
* [[http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue]]
*
* @see scalaz.concurrent.Promise for a use case.
*
* @param handler The message handler
* @param onError Exception handler, called if the message handler throws any `Throwable`.
* @param strategy Execution strategy, for example, a strategy that is backed by an `ExecutorService`
* @tparam A The type of messages accepted by this actor.
*/
final class Actor[A](strategy: Strategy)(handler: A => Unit, onError: Throwable => Unit = throw(_)) {
self =>
private val tail = new AtomicReference(new Node[A]())
private val suspended = new AtomicInteger(1)
private val head = new AtomicReference(tail.get)
/** Alias for `apply` */
def !(a: A): Unit = {
val n = new Node(a)
head.getAndSet(n).lazySet(n)
trySchedule()
}
/** Pass the message `a` to the mailbox of this actor */
def apply(a: A): Unit = {
this ! a
}
def contramap[B](f: B => A): Actor[B] =
new Actor[B](strategy)((b: B) => (this ! f(b)), onError)
private def trySchedule(): Unit = {
if (suspended.compareAndSet(1, 0)) schedule()
}
private def schedule(): Unit = {
strategy(act())
}
private def act(): Unit = {
val t = tail.get
val n = batchHandle(t, 1024)
if (n ne t) {
n.a = null.asInstanceOf[A]
tail.lazySet(n)
schedule()
} else {
suspended.set(1)
if (n.get ne null) trySchedule()
}
}
@tailrec
private def batchHandle(t: Node[A], i: Int): Node[A] = {
val n = t.get
if (n ne null) {
try {
handler(n.a)
} catch {
case ex: Throwable => onError(ex)
}
if (i > 0) batchHandle(n, i - 1) else n
} else t
}
}
private class Node[A](var a: A = null.asInstanceOf[A]) extends AtomicReference[Node[A]]
object Actor {
/** Create an `Actor` backed by the given `ExecutorService`. */
def apply[A](es: ExecutorService)(handler: A => Unit, onError: Throwable => Unit = throw(_)): Actor[A] =
new Actor(Strategy.fromExecutorService(es))(handler, onError)
}
/**
* Provides a function for evaluating expressions, possibly asynchronously.
* The `apply` function should typically begin evaluating its argument
* immediately. The returned thunk can be used to block until the resulting `A`
* is available.
*/
trait Strategy {
def apply[A](a: => A): () => A
}
object Strategy {
/**
* We can create a `Strategy` from any `ExecutorService`. It's a little more
* convenient than submitting `Callable` objects directly.
*/
def fromExecutorService(es: ExecutorService): Strategy = new Strategy {
def apply[A](a: => A): () => A = {
val f = es.submit { new Callable[A] { def call = a} }
() => f.get
}
}
/**
* A `Strategy` which begins executing its argument immediately in the calling thread.
*/
def sequential: Strategy = new Strategy {
def apply[A](a: => A): () => A = {
val r = a
() => r
}
}
}