/
ActorWithBoundedStashSpec.scala
153 lines (125 loc) · 4.63 KB
/
ActorWithBoundedStashSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import language.postfixOps
import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.BoundedDequeBasedMailbox
import akka.pattern.ask
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorSystem.Settings
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.Assertions.intercept
import org.scalatest.BeforeAndAfterEach
object ActorWithBoundedStashSpec {
class StashingActor extends Actor with Stash {
def receive = {
case msg: String if msg.startsWith("hello") ⇒
stash()
sender() ! "ok"
case "world" ⇒
context.become(afterWorldBehaviour)
unstashAll()
}
def afterWorldBehaviour: Receive = {
case _ ⇒ stash()
}
}
class StashingActorWithOverflow extends Actor with Stash {
var numStashed = 0
def receive = {
case msg: String if msg.startsWith("hello") ⇒
numStashed += 1
try { stash(); sender() ! "ok" } catch {
case _: StashOverflowException ⇒
if (numStashed == 21) {
sender() ! "STASHOVERFLOW"
context stop self
} else {
sender() ! "Unexpected StashOverflowException: " + numStashed
}
}
}
}
// bounded deque-based mailbox with capacity 10
class Bounded10(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 500 millis)
class Bounded100(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(100, 500 millis)
val dispatcherId1 = "my-dispatcher-1"
val dispatcherId2 = "my-dispatcher-2"
val mailboxId1 = "my-mailbox-1"
val mailboxId2 = "my-mailbox-2"
val testConf: Config = ConfigFactory.parseString(s"""
$dispatcherId1 {
mailbox-type = "${classOf[Bounded10].getName}"
stash-capacity = 20
}
$dispatcherId2 {
mailbox-type = "${classOf[Bounded100].getName}"
stash-capacity = 20
}
$mailboxId1 {
mailbox-type = "${classOf[Bounded10].getName}"
stash-capacity = 20
}
$mailboxId2 {
mailbox-type = "${classOf[Bounded100].getName}"
stash-capacity = 20
}
""")
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with BeforeAndAfterEach with DefaultTimeout with ImplicitSender {
import ActorWithBoundedStashSpec._
override def atStartup: Unit = {
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello.*")))
}
override def beforeEach(): Unit =
system.eventStream.subscribe(testActor, classOf[DeadLetter])
override def afterEach(): Unit =
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
def testDeadLetters(stasher: ActorRef): Unit = {
// fill up stash
for (n ← 1 to 11) {
stasher ! "hello" + n
expectMsg("ok")
}
// cause unstashAll with capacity violation
stasher ! "world"
expectMsg(DeadLetter("hello1", testActor, stasher))
stasher ! PoisonPill
// stashed messages are sent to deadletters when stasher is stopped
for (n ← 2 to 11) expectMsg(DeadLetter("hello" + n, testActor, stasher))
}
def testStashOverflowException(stasher: ActorRef): Unit = {
// fill up stash
for (n ← 1 to 20) {
stasher ! "hello" + n
expectMsg("ok")
}
stasher ! "hello21"
expectMsg("STASHOVERFLOW")
// stashed messages are sent to deadletters when stasher is stopped,
for (n ← 1 to 20) expectMsg(DeadLetter("hello" + n, testActor, stasher))
}
"An Actor with Stash" must {
"end up in DeadLetters in case of a capacity violation when configured via dispatcher" in {
val stasher = system.actorOf(Props[StashingActor].withDispatcher(dispatcherId1))
testDeadLetters(stasher)
}
"end up in DeadLetters in case of a capacity violation when configured via mailbox" in {
val stasher = system.actorOf(Props[StashingActor].withMailbox(mailboxId1))
testDeadLetters(stasher)
}
"throw a StashOverflowException in case of a stash capacity violation when configured via dispatcher" in {
val stasher = system.actorOf(Props[StashingActorWithOverflow].withDispatcher(dispatcherId2))
testStashOverflowException(stasher)
}
"throw a StashOverflowException in case of a stash capacity violation when configured via mailbox" in {
val stasher = system.actorOf(Props[StashingActorWithOverflow].withMailbox(mailboxId2))
testStashOverflowException(stasher)
}
}
}