/
ActorLifeCycleSpec.scala
154 lines (137 loc) · 5.35 KB
/
ActorLifeCycleSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import language.postfixOps
import org.scalatest.BeforeAndAfterEach
import akka.actor.Actor._
import akka.testkit._
import scala.concurrent.duration._
import java.util.concurrent.atomic._
import scala.concurrent.Await
import akka.pattern.ask
import java.util.UUID.{ randomUUID ⇒ newUuid }
object ActorLifeCycleSpec {
class LifeCycleTestActor(testActor: ActorRef, id: String, generationProvider: AtomicInteger) extends Actor {
def report(msg: Any) = testActor ! message(msg)
def message(msg: Any): Tuple3[Any, String, Int] = (msg, id, currentGen)
val currentGen = generationProvider.getAndIncrement()
override def preStart(): Unit = { report("preStart") }
override def postStop(): Unit = { report("postStop") }
def receive = { case "status" ⇒ sender() ! message("OK") }
}
}
class ActorLifeCycleSpec extends AkkaSpec("akka.actor.serialize-messages=off") with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
import ActorLifeCycleSpec._
"An Actor" must {
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
filterException[ActorKilledException] {
val id = newUuid.toString
val supervisor = system.actorOf(Props(classOf[Supervisor], OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))))
val gen = new AtomicInteger(0)
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
override def preRestart(reason: Throwable, message: Option[Any]): Unit = { report("preRestart") }
override def postRestart(reason: Throwable): Unit = { report("postRestart") }
}).withDeploy(Deploy.local)
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
expectMsg(("preStart", id, 0))
restarter ! Kill
expectMsg(("preRestart", id, 0))
expectMsg(("postRestart", id, 1))
restarter ! "status"
expectMsg(("OK", id, 1))
restarter ! Kill
expectMsg(("preRestart", id, 1))
expectMsg(("postRestart", id, 2))
restarter ! "status"
expectMsg(("OK", id, 2))
restarter ! Kill
expectMsg(("preRestart", id, 2))
expectMsg(("postRestart", id, 3))
restarter ! "status"
expectMsg(("OK", id, 3))
restarter ! Kill
expectMsg(("postStop", id, 3))
expectNoMsg(1 seconds)
system.stop(supervisor)
}
}
"default for preRestart and postRestart is to call postStop and preStart respectively" in {
filterException[ActorKilledException] {
val id = newUuid().toString
val supervisor = system.actorOf(Props(classOf[Supervisor], OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))))
val gen = new AtomicInteger(0)
val restarterProps = Props(classOf[LifeCycleTestActor], testActor, id, gen)
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
expectMsg(("preStart", id, 0))
restarter ! Kill
expectMsg(("postStop", id, 0))
expectMsg(("preStart", id, 1))
restarter ! "status"
expectMsg(("OK", id, 1))
restarter ! Kill
expectMsg(("postStop", id, 1))
expectMsg(("preStart", id, 2))
restarter ! "status"
expectMsg(("OK", id, 2))
restarter ! Kill
expectMsg(("postStop", id, 2))
expectMsg(("preStart", id, 3))
restarter ! "status"
expectMsg(("OK", id, 3))
restarter ! Kill
expectMsg(("postStop", id, 3))
expectNoMsg(1 seconds)
system.stop(supervisor)
}
}
"not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in {
val id = newUuid().toString
val supervisor = system.actorOf(Props(
classOf[Supervisor],
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))))
val gen = new AtomicInteger(0)
val props = Props(classOf[LifeCycleTestActor], testActor, id, gen)
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
expectMsg(("preStart", id, 0))
a ! "status"
expectMsg(("OK", id, 0))
system.stop(a)
expectMsg(("postStop", id, 0))
expectNoMsg(1 seconds)
system.stop(supervisor)
}
"log failues in postStop" in {
val a = system.actorOf(Props(new Actor {
def receive = Actor.emptyBehavior
override def postStop: Unit = { throw new Exception("hurrah") }
}))
EventFilter[Exception]("hurrah", occurrences = 1) intercept {
a ! PoisonPill
}
}
"clear the behavior stack upon restart" in {
final case class Become(recv: ActorContext ⇒ Receive)
val a = system.actorOf(Props(new Actor {
def receive = {
case Become(beh) ⇒ { context.become(beh(context), discardOld = false); sender() ! "ok" }
case x ⇒ sender() ! 42
}
}))
a ! "hello"
expectMsg(42)
a ! Become(ctx ⇒ {
case "fail" ⇒ throw new RuntimeException("buh")
case x ⇒ ctx.sender() ! 43
})
expectMsg("ok")
a ! "hello"
expectMsg(43)
EventFilter[RuntimeException]("buh", occurrences = 1) intercept {
a ! "fail"
}
a ! "hello"
expectMsg(42)
}
}
}