Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #416 from akka/wip-1896-gracefully-handle-identity…

…-recreates


#1896 - Sprinkling some magic sauce so that we can support recreating th...
  • Loading branch information...
commit 976f7dde38785a8ebcdccf993fbca0e1cf0dd10c 2 parents e462149 + 96d657f
@viktorklang viktorklang authored
View
49 akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala
@@ -157,6 +157,55 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
expectNoMsg(1 second)
}
+ "restart properly when same instance is returned" in {
+ val restarts = 3 //max number of restarts
+ lazy val childInstance = new Actor {
+ var preRestarts = 0
+ var postRestarts = 0
+ var preStarts = 0
+ var postStops = 0
+ override def preRestart(reason: Throwable, message: Option[Any]) { preRestarts += 1; testActor ! ("preRestart" + preRestarts) }
+ override def postRestart(reason: Throwable) { postRestarts += 1; testActor ! ("postRestart" + postRestarts) }
+ override def preStart() { preStarts += 1; testActor ! ("preStart" + preStarts) }
+ override def postStop() { postStops += 1; testActor ! ("postStop" + postStops) }
+ def receive = {
+ case "crash" testActor ! "crashed"; throw new RuntimeException("Expected")
+ case "ping" sender ! "pong"
+ }
+ }
+ val master = system.actorOf(Props(new Actor {
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = restarts)(List(classOf[Exception]))
+ val child = context.actorOf(Props(childInstance))
+ def receive = {
+ case msg child forward msg
+ }
+ }))
+
+ expectMsg("preStart1")
+
+ master ! "ping"
+ expectMsg("pong")
+
+ filterEvents(EventFilter[RuntimeException]("Expected", occurrences = restarts + 1)) {
+ (1 to restarts) foreach {
+ i
+ master ! "crash"
+ expectMsg("crashed")
+
+ expectMsg("preRestart" + i)
+ expectMsg("postRestart" + i)
+
+ master ! "ping"
+ expectMsg("pong")
+ }
+ master ! "crash"
+ expectMsg("crashed")
+ expectMsg("postStop1")
+ }
+
+ expectNoMsg(1 second)
+ }
+
"not restart temporary actor" in {
val (temporaryActor, _) = temporaryActorAllForOne
View
25 akka-actor/src/main/scala/akka/actor/ActorCell.scala
@@ -529,7 +529,7 @@ private[akka] class ActorCell(
try {
if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
} finally {
- clearActorFields()
+ clearActorFields(failedActor)
}
}
childrenRefs match {
@@ -537,7 +537,7 @@ private[akka] class ActorCell(
childrenRefs = ct.copy(reason = Recreation(cause))
dispatcher suspend this
case _
- doRecreate(cause)
+ doRecreate(cause, failedActor)
}
} catch {
case NonFatal(e) try {
@@ -696,18 +696,19 @@ private[akka] class ActorCell(
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped"))
} finally {
if (a ne null) a.clearBehaviorStack()
- clearActorFields()
+ clearActorFields(a)
actor = null
}
}
}
- private def doRecreate(cause: Throwable): Unit = try {
+ private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try {
// after all killed children have terminated, recreate the rest, then go on to start the new instance
actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
+ if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
@@ -719,6 +720,7 @@ private[akka] class ActorCell(
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
actor.supervisorStrategy.handleSupervisorFailing(self, children)
+ clearActorFields(actor) // If this fails, we need to ensure that preRestart isn't called.
} finally {
parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self)
}
@@ -736,7 +738,7 @@ private[akka] class ActorCell(
childrenRefs = n
actor.supervisorStrategy.handleChildTerminated(this, child, children)
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
- case Recreation(cause) doRecreate(cause)
+ case Recreation(cause) doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate"
case Termination doTerminate()
case _
}
@@ -775,12 +777,12 @@ private[akka] class ActorCell(
}
}
- final def clearActorFields(): Unit = {
- setActorFields(context = null, self = system.deadLetters)
+ final def clearActorFields(actorInstance: Actor): Unit = {
+ setActorFields(actorInstance, context = null, self = system.deadLetters)
currentMessage = null
}
- final def setActorFields(context: ActorContext, self: ActorRef) {
+ final def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) {
@tailrec
def lookupAndSetField(clazz: Class[_], actor: Actor, name: String, value: Any): Boolean = {
val success = try {
@@ -799,10 +801,9 @@ private[akka] class ActorCell(
lookupAndSetField(parent, actor, name, value)
}
}
- val a = actor
- if (a ne null) {
- lookupAndSetField(a.getClass, a, "context", context)
- lookupAndSetField(a.getClass, a, "self", self)
+ if (actorInstance ne null) {
+ lookupAndSetField(actorInstance.getClass, actorInstance, "context", context)
+ lookupAndSetField(actorInstance.getClass, actorInstance, "self", self)
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.