-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
RoutersSpec.scala
251 lines (205 loc) · 8.2 KB
/
RoutersSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Dropped
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.internal.routing.GroupRouterImpl
import akka.actor.typed.internal.routing.RoutingLogics
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.adapter._
import akka.testkit.EventFilter
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
class RoutersSpec extends ScalaTestWithActorTestKit("""
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel=debug
""") with WordSpecLike with Matchers {
// needed for the event filter
implicit val untypedSystem = system.toUntyped
def compileOnlyApiCoverage(): Unit = {
Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting()
Routers.pool(10)(() => Behaviors.empty[Any]).withRandomRouting()
Routers.pool(10)(() => Behaviors.empty[Any]).withRoundRobinRouting()
}
"The router pool" must {
"create n children and route messages to" in {
val childCounter = new AtomicInteger(0)
case class Ack(msg: String, recipient: Int)
val probe = createTestProbe[AnyRef]()
val pool = spawn(Routers.pool[String](4)(() =>
Behaviors.setup { _ =>
val id = childCounter.getAndIncrement()
probe.ref ! s"started $id"
Behaviors.receiveMessage { msg =>
probe.ref ! Ack(msg, id)
Behaviors.same
}
}))
// ordering of these msgs is not guaranteed
val expectedStarted = (0 to 3).map { n =>
s"started $n"
}.toSet
probe.receiveMessages(4).toSet should ===(expectedStarted)
// send one message at a time and see we rotate over all children, note that we don't necessarily
// know what order the logic is rotating over the children, so we just check we reach all of them
val sent = (0 to 8).map { n =>
val msg = s"message-$n"
pool ! msg
msg
}
val acks = (0 to 8).map(_ => probe.expectMessageType[Ack])
val (recipients, messages) = acks.foldLeft[(Set[Int], Set[String])]((Set.empty, Set.empty)) {
case ((recipients, messages), ack) =>
(recipients + ack.recipient, messages + ack.msg)
}
recipients should ===(Set(0, 1, 2, 3))
messages should ===(sent.toSet)
}
"keep routing to the rest of the children if some children stops" in {
val probe = createTestProbe[String]()
val pool = spawn(Routers.pool[String](4)(() =>
Behaviors.receiveMessage {
case "stop" =>
Behaviors.stopped
case msg =>
probe.ref ! msg
Behaviors.same
}))
EventFilter.debug(start = "Pool child stopped", occurrences = 2).intercept {
pool ! "stop"
pool ! "stop"
}
// there is a race here where the child stopped but the router did not see that message yet, and may
// deliver messages to it, which will end up in dead letters.
// this test protects against that by waiting for the log entry to show up
val responses = (0 to 4).map { n =>
val msg = s"message-$n"
pool ! msg
probe.expectMessageType[String]
}
(responses should contain).allOf("message-0", "message-1", "message-2", "message-3", "message-4")
}
"stops if all children stops" in {
val probe = createTestProbe()
val pool = spawn(Routers.pool[String](4)(() =>
Behaviors.receiveMessage { _ =>
Behaviors.stopped
}))
EventFilter.info(start = "Last pool child stopped, stopping pool", occurrences = 1).intercept {
(0 to 3).foreach { _ =>
pool ! "stop"
}
probe.expectTerminated(pool)
}
}
}
"The router group" must {
val receptionistDelayMs = 250
"route messages across routees registered to the receptionist" in {
val serviceKey = ServiceKey[String]("group-routing-1")
val probe = createTestProbe[String]()
val routeeBehavior: Behavior[String] = Behaviors.receiveMessage { msg =>
probe.ref ! msg
Behaviors.same
}
(0 to 3).foreach { n =>
val ref = spawn(routeeBehavior, s"group-1-routee-$n")
system.receptionist ! Receptionist.register(serviceKey, ref)
}
val group = spawn(Routers.group(serviceKey), "group-router-1")
// ok to do right away
(0 to 3).foreach { n =>
val msg = s"message-$n"
group ! msg
probe.expectMessage(msg)
}
testKit.stop(group)
}
"publish Dropped messages when there are no routees available" in {
val serviceKey = ServiceKey[String]("group-routing-2")
val group = spawn(Routers.group(serviceKey), "group-router-2")
val probe = TestProbe[Dropped]()
system.toUntyped.eventStream.subscribe(probe.ref.toUntyped, classOf[Dropped])
(0 to 3).foreach { n =>
val msg = s"message-$n"
// EventFilter.info(start = "Message [java.lang.String] ... was not delivered.", occurrences = 1).intercept { */
EventFilter.warning(start = "dropped message", occurrences = 1).intercept {
EventFilter.info(pattern = ".*was dropped. No routees in group router", occurrences = 1).intercept {
group ! msg
probe.expectMessageType[Dropped]
}
}
/* } */
}
testKit.stop(group)
}
"handle a changing set of routees" in {
val serviceKey = ServiceKey[String]("group-routing-3")
val probe = createTestProbe[String]()
val routeeBehavior: Behavior[String] = Behaviors.receiveMessage {
case "stop" =>
Behaviors.stopped
case msg =>
probe.ref ! msg
Behaviors.same
}
val ref1 = spawn(routeeBehavior, s"group-3-routee-1")
system.receptionist ! Receptionist.register(serviceKey, ref1)
val ref2 = spawn(routeeBehavior, s"group-3-routee-2")
system.receptionist ! Receptionist.register(serviceKey, ref2)
val ref3 = spawn(routeeBehavior, s"group-3-routee-3")
system.receptionist ! Receptionist.register(serviceKey, ref3)
val group = spawn(Routers.group(serviceKey), "group-router-3")
// give the group a little time to get a listing from the receptionist
Thread.sleep(receptionistDelayMs)
(0 to 3).foreach { n =>
val msg = s"message-$n"
group ! msg
probe.expectMessage(msg)
}
ref2 ! "stop"
// give the group a little time to get an updated listing from the receptionist
Thread.sleep(receptionistDelayMs)
(0 to 3).foreach { n =>
val msg = s"message-$n"
group ! msg
probe.expectMessage(msg)
}
testKit.stop(group)
}
"not route to unreachable when there are reachable" in {
val serviceKey = ServiceKey[String]("group-routing-4")
val router = spawn(Behaviors.setup[String](context =>
new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true)))
val reachableProbe = createTestProbe[String]
val unreachableProbe = createTestProbe[String]
router
.unsafeUpcast[Any] ! Receptionist.Listing(serviceKey, Set(reachableProbe.ref), Set(unreachableProbe.ref), false)
router ! "one"
router ! "two"
reachableProbe.expectMessage("one")
reachableProbe.expectMessage("two")
}
"route to unreachable when there are no reachable" in {
val serviceKey = ServiceKey[String]("group-routing-4")
val router = spawn(Behaviors.setup[String](context =>
new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true)))
val unreachableProbe = createTestProbe[String]
router.unsafeUpcast[Any] ! Receptionist.Listing(
serviceKey,
Set.empty[ActorRef[String]],
Set(unreachableProbe.ref),
true)
router ! "one"
router ! "two"
unreachableProbe.expectMessage("one")
unreachableProbe.expectMessage("two")
}
}
}