From 0e668ec5cf1d9a600d6a36580d8676076b138822 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 16 Nov 2012 14:12:32 +0100 Subject: [PATCH 1/7] Making RoutingSpec a bit more elaborate, to put pressure on the implementation --- .../test/scala/akka/routing/RoutingSpec.scala | 47 ++++++++++--------- .../scala/akka/actor/ActorRefProvider.scala | 4 +- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 6d643005aa2..fce00f51654 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -5,7 +5,6 @@ package akka.routing import language.postfixOps -import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import scala.collection.mutable.LinkedList import akka.testkit._ @@ -18,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap import com.typesafe.config.Config import akka.dispatch.Dispatchers import akka.util.Timeout +import java.util.concurrent.atomic.AtomicInteger object RoutingSpec { @@ -101,33 +101,36 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "be able to send their routees" in { - class TheActor extends Actor { - val routee1 = context.actorOf(Props[TestActor], "routee1") - val routee2 = context.actorOf(Props[TestActor], "routee2") - val routee3 = context.actorOf(Props[TestActor], "routee3") - val router = context.actorOf(Props[TestActor].withRouter( - ScatterGatherFirstCompletedRouter( - routees = List(routee1, routee2, routee3), - within = 5 seconds))) + val actor = system.actorOf(Props(new Actor { def receive = { - case "doIt" ⇒ router ! CurrentRoutees - case routees: RouterRoutees ⇒ testActor forward routees + case (id: String, names: Iterable[_], actors: Int) ⇒ + val routerProps = Props[TestActor].withRouter( + ScatterGatherFirstCompletedRouter( + routees = names collect { case name: String ⇒ context.actorOf(Props(new TestActor), name) }, + within = 5 seconds)) + + 1 to actors foreach { i ⇒ context.actorOf(routerProps, id + i).tell(CurrentRoutees, testActor) } } - } + })) - val theActor = system.actorOf(Props(new TheActor), "theActor") - theActor ! "doIt" - val routees = expectMsgPF() { - case RouterRoutees(routees) ⇒ routees.toSet - } + val actors = 15 + val names = 1 to 20 map { "routee" + _ } toList - routees.map(_.path.name) must be(Set("routee1", "routee2", "routee3")) + actor ! (("test", names, actors)) + + 1 to actors foreach { _ ⇒ + val routees = expectMsgType[RouterRoutees].routees + routees.map(_.path.name) must be === names + } + expectNoMsg(500.millis) + actor ! PoisonPill } "use configured nr-of-instances when FromConfig" in { val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1") - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) watch(router) system.stop(router) expectMsgType[Terminated] @@ -135,7 +138,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with "use configured nr-of-instances when router is specified" in { val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2") - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) system.stop(router) } @@ -150,7 +154,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))), "router3") Await.ready(latch, remaining) - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) system.stop(router) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 818982f1345..df8e6ccbb30 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -323,9 +323,9 @@ private[akka] object SystemGuardian { /** * Local ActorRef provider. - * + * * INTERNAL API! - * + * * Depending on this class is not supported, only the [[ActorRefProvider]] interface is supported. */ class LocalActorRefProvider( From df39691db8a82e16482c73a422072a9f2cbf420b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 13 Nov 2012 20:07:33 +0100 Subject: [PATCH 2/7] Attempting to hunt down and find the race in the RoutingSpec --- .../akka/actor/RepointableActorRef.scala | 119 +++++++++--------- .../src/main/scala/akka/routing/Routing.scala | 29 ++--- 2 files changed, 69 insertions(+), 79 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 8ecb1cbb725..1f1c9cae0a9 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -5,11 +5,11 @@ package akka.actor import java.io.ObjectStreamException +import java.util.{ LinkedList ⇒ JLinkedList, Queue ⇒ JQueue } import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import scala.annotation.tailrec -import scala.collection.mutable.Queue import scala.concurrent.forkjoin.ThreadLocalRandom import akka.actor.dungeon.ChildrenContainer @@ -122,24 +122,26 @@ private[akka] class RepointableActorRef( protected def writeReplace(): AnyRef = SerializedActorRef(path) } -private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: RepointableActorRef, val props: Props, val supervisor: InternalActorRef, val uid: Int) - extends Cell { +private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, + val self: RepointableActorRef, + val props: Props, + val supervisor: InternalActorRef, + val uid: Int) extends Cell { /* * This lock protects all accesses to this cell’s queues. It also ensures * safe switching to the started ActorCell. */ - val lock = new ReentrantLock + private[this] final val lock = new ReentrantLock - // use Envelope to keep on-send checks in the same place - val queue: Queue[Envelope] = Queue() - val systemQueue: Queue[SystemMessage] = Queue() - var suspendCount: Int = 0 + // use Envelope to keep on-send checks in the same place ACCESS MUST BE PROTECTED BY THE LOCK + private[this] final val queue: JQueue[Envelope] = new JLinkedList() + private[this] final val systemQueue: JQueue[SystemMessage] = new JLinkedList() + private[this] var suspendCount: Int = 0 - private def timeout = system.settings.UnstartedPushTimeout.duration.toMillis + import systemImpl.settings.UnstartedPushTimeout.{ duration ⇒ timeout } - def replaceWith(cell: Cell): Unit = { - lock.lock() + def replaceWith(cell: Cell): Unit = locked { try { /* * The CallingThreadDispatcher nicely dives under the ReentrantLock and @@ -149,13 +151,13 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep * lock, double-tap (well, N-tap, really); concurrent modification is * still not possible because we’re the only thread accessing the queues. */ - while (systemQueue.nonEmpty || queue.nonEmpty) { - while (systemQueue.nonEmpty) { - val msg = systemQueue.dequeue() + while (!systemQueue.isEmpty || !queue.isEmpty) { + while (!systemQueue.isEmpty) { + val msg = systemQueue.poll() cell.sendSystemMessage(msg) } - if (queue.nonEmpty) { - val envelope = queue.dequeue() + if (!queue.isEmpty) { + val envelope = queue.poll() cell.tell(envelope.message, envelope.sender) } } @@ -163,76 +165,67 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep self.swapCell(cell) finally try for (_ ← 1 to suspendCount) cell.suspend() - finally - lock.unlock() } def system: ActorSystem = systemImpl - def suspend(): Unit = { - lock.lock() - try suspendCount += 1 - finally lock.unlock() - } - def resume(causedByFailure: Throwable): Unit = { - lock.lock() - try suspendCount -= 1 - finally lock.unlock() - } - def restart(cause: Throwable): Unit = { - lock.lock() - try suspendCount -= 1 - finally lock.unlock() - } + def suspend(): Unit = locked { suspendCount += 1 } + def resume(causedByFailure: Throwable): Unit = locked { suspendCount -= 1 } + def restart(cause: Throwable): Unit = locked { suspendCount -= 1 } def stop(): Unit = sendSystemMessage(Terminate()) def isTerminated: Boolean = false def parent: InternalActorRef = supervisor def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer def getChildByName(name: String): Option[ChildRestartStats] = None + def tell(message: Any, sender: ActorRef): Unit = { val useSender = if (sender eq Actor.noSender) system.deadLetters else sender - if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + if (lock.tryLock(timeout.length, timeout.unit)) { try { - if (self.underlying eq this) queue enqueue Envelope(message, useSender, system) - else self.underlying.tell(message, useSender) - } finally { - lock.unlock() - } + val cell = self.underlying + if (cell ne this) { + cell.tell(message, useSender) + } else if (!queue.offer(Envelope(message, useSender, system))) { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + message.getClass + " due to enqueue failure")) + system.deadLetters ! DeadLetter(message, useSender, self) + } + } finally lock.unlock() } else { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + message.getClass + " due to lock timeout")) system.deadLetters ! DeadLetter(message, useSender, self) } } - def sendSystemMessage(msg: SystemMessage): Unit = { - if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + + // FIXME: once we have guaranteed delivery of system messages, hook this in! + def sendSystemMessage(msg: SystemMessage): Unit = + if (lock.tryLock(timeout.length, timeout.unit)) { try { - if (self.underlying eq this) systemQueue enqueue msg - else self.underlying.sendSystemMessage(msg) - } finally { - lock.unlock() - } + val cell = self.underlying + if (cell ne this) { + cell.sendSystemMessage(msg) + } else if (!systemQueue.offer(msg)) { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure")) + system.deadLetters ! DeadLetter(msg, self, self) + } + } finally lock.unlock() } else { - // FIXME: once we have guaranteed delivery of system messages, hook this in! system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to lock timeout")) system.deadLetters ! DeadLetter(msg, self, self) } - } + def isLocal = true - def hasMessages: Boolean = { - lock.lock() - try { - if (self.underlying eq this) !queue.isEmpty - else self.underlying.hasMessages - } finally { - lock.unlock() - } + def hasMessages: Boolean = locked { + val cell = self.underlying + if (cell eq this) !queue.isEmpty else cell.hasMessages + } + + def numberOfMessages: Int = locked { + val cell = self.underlying + if (cell eq this) queue.size else cell.numberOfMessages } - def numberOfMessages: Int = { + + private[this] final def locked[T](body: ⇒ T): T = { lock.lock() - try { - if (self.underlying eq this) queue.size - else self.underlying.numberOfMessages - } finally { - lock.unlock() - } + try body finally lock.unlock() } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 9ccf43fb584..8c3d05916b9 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -92,7 +92,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo * `RouterConfig.createRoute` and `Resizer.resize` */ private[akka] def addRoutees(newRoutees: Iterable[ActorRef]): Unit = { - _routees = _routees ++ newRoutees + _routees ++= newRoutees // subscribe to Terminated messages for all route destinations, to be handled by Router actor newRoutees foreach watch } @@ -108,30 +108,27 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo } override def tell(message: Any, sender: ActorRef): Unit = { - resize() - + resize() // Mucho importante val s = if (sender eq null) system.deadLetters else sender - - val msg = message match { - case wrapped: RouterEnvelope ⇒ wrapped.message - case m ⇒ m - } - applyRoute(s, message) match { - case Destination(_, x) :: Nil if x == self ⇒ super.tell(message, s) - case refs ⇒ - refs foreach (p ⇒ - if (p.recipient == self) super.tell(msg, p.sender) - else p.recipient.!(msg)(p.sender)) + case Destination(_, x) :: Nil if x == self ⇒ + super.tell(message, s) + case refs ⇒ refs foreach { p ⇒ + val msg = message match { + case wrapped: RouterEnvelope ⇒ wrapped.message + case m ⇒ m + } + if (p.recipient == self) super.tell(msg, p.sender) + else p.recipient.!(msg)(p.sender) + } } } - def resize(): Unit = { + def resize(): Unit = for (r ← routerConfig.resizer) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) super.tell(Router.Resize, self) } - } } /** From 4b514343849155e40d31c9c3a5057d4accc32add Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 14 Nov 2012 14:34:31 +0100 Subject: [PATCH 3/7] Simplifying the code inside Children and RepointableActorRef --- .../akka/actor/RepointableActorRef.scala | 24 ++++++----- .../scala/akka/actor/dungeon/Children.scala | 43 ++++++++++--------- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 1f1c9cae0a9..87c02455bc6 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -52,12 +52,15 @@ private[akka] class RepointableActorRef( * * This is protected so that others can have different initialization. */ - def initialize(): this.type = { - val uid = ThreadLocalRandom.current.nextInt() - swapCell(new UnstartedCell(system, this, props, supervisor, uid)) - supervisor.sendSystemMessage(Supervise(this, uid)) - this - } + def initialize(): this.type = + underlying match { + case null ⇒ + val uid = ThreadLocalRandom.current.nextInt() + swapCell(new UnstartedCell(system, this, props, supervisor, uid)) + supervisor.sendSystemMessage(Supervise(this, uid)) + this + case other ⇒ this + } /** * This method is supposed to be called by the supervisor in handleSupervise() @@ -65,13 +68,12 @@ private[akka] class RepointableActorRef( * modification of the `underlying` field, though it is safe to send messages * at any time. */ - def activate(): this.type = { + def activate(): this.type = underlying match { - case u: UnstartedCell ⇒ u.replaceWith(newCell(u)) - case _ ⇒ // this happens routinely for things which were created async=false + case u: UnstartedCell ⇒ u.replaceWith(newCell(u)); this + case null ⇒ throw new IllegalStateException("underlying cell is null") + case _ ⇒ this // this happens routinely for things which were created async=false } - this - } /** * This is called by activate() to obtain the cell which is to replace the diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index d59b0b71da8..8cd571bd589 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -53,19 +53,24 @@ private[akka] trait Children { this: ActorCell ⇒ } final def stop(actor: ActorRef): Unit = { - val started = actor match { - case r: RepointableRef ⇒ r.isStarted - case _ ⇒ true + if (childrenRefs.getByRef(actor).isDefined) { + @tailrec def shallDie(ref: ActorRef): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) + } + + if (actor match { + case r: RepointableRef ⇒ r.isStarted + case _ ⇒ true + }) shallDie(actor) } - if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor) actor.asInstanceOf[InternalActorRef].stop() } /* * low level CAS helpers */ - - @inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = + @inline private final def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren) @tailrec final def reserveChild(name: String): Boolean = { @@ -90,18 +95,6 @@ private[akka] trait Children { this: ActorCell ⇒ } } - @tailrec final protected def shallDie(ref: ActorRef): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) - } - - @tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = { - val c = childrenRefs - val n = c.remove(ref) - if (swapChildrenRefs(c, n)) n - else removeChild(ref) - } - @tailrec final protected def setChildrenTerminationReason(reason: ChildrenContainer.SuspendReason): Boolean = { childrenRefs match { case c: ChildrenContainer.TerminatingChildrenContainer ⇒ @@ -144,10 +137,18 @@ private[akka] trait Children { this: ActorCell ⇒ protected def getAllChildStats: Iterable[ChildRestartStats] = childrenRefs.stats protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = { - childrenRefs match { + @tailrec def removeChild(ref: ActorRef): ChildrenContainer = { + val c = childrenRefs + val n = c.remove(ref) + if (swapChildrenRefs(c, n)) n else removeChild(ref) + } + + childrenRefs match { // The match must be performed BEFORE the removeChild case TerminatingChildrenContainer(_, _, reason) ⇒ - val newContainer = removeChild(child) - if (!newContainer.isInstanceOf[TerminatingChildrenContainer]) Some(reason) else None + removeChild(child) match { + case c: TerminatingChildrenContainer ⇒ None + case _ ⇒ Some(reason) + } case _ ⇒ removeChild(child) None From 2060528b6e9f6cde728a8e599e3a36b2ad6ef3df Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 13 Nov 2012 20:07:33 +0100 Subject: [PATCH 4/7] Attempting to hunt down and find the race in the RoutingSpec --- .../src/test/scala/akka/routing/RoutingSpec.scala | 8 ++++---- akka-actor/src/main/scala/akka/routing/Routing.scala | 3 ++- akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala | 2 +- .../main/scala/akka/remote/RemoteActorRefProvider.scala | 4 ++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index fce00f51654..e52eb9152f8 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -102,12 +102,13 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with "be able to send their routees" in { + case class TestRun(id: String, names: Iterable[String], actors: Int) val actor = system.actorOf(Props(new Actor { def receive = { - case (id: String, names: Iterable[_], actors: Int) ⇒ + case TestRun(id, names, actors) ⇒ val routerProps = Props[TestActor].withRouter( ScatterGatherFirstCompletedRouter( - routees = names collect { case name: String ⇒ context.actorOf(Props(new TestActor), name) }, + routees = names map { context.actorOf(Props(new TestActor), _) }, within = 5 seconds)) 1 to actors foreach { i ⇒ context.actorOf(routerProps, id + i).tell(CurrentRoutees, testActor) } @@ -117,14 +118,13 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with val actors = 15 val names = 1 to 20 map { "routee" + _ } toList - actor ! (("test", names, actors)) + actor ! TestRun("test", names, actors) 1 to actors foreach { _ ⇒ val routees = expectMsgType[RouterRoutees].routees routees.map(_.path.name) must be === names } expectNoMsg(500.millis) - actor ! PoisonPill } "use configured nr-of-instances when FromConfig" in { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 8c3d05916b9..b22c9a12ead 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -5,8 +5,8 @@ package akka.routing import language.implicitConversions import language.postfixOps -import akka.actor._ import scala.concurrent.duration._ +import akka.actor._ import akka.ConfigurationException import akka.pattern.pipe import com.typesafe.config.Config @@ -17,6 +17,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom import akka.dispatch.Dispatchers import scala.annotation.tailrec import concurrent.ExecutionContext +import akka.event.Logging.Warning /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to diff --git a/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala b/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala index a51b8649534..5e6ec5d50f6 100644 --- a/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala @@ -29,7 +29,7 @@ object ZeromqDocSpec { class HealthProbe extends Actor { - val pubSocket = ZeroMQExtension(context.system).newSocket(SocketType.Pub, + val pubSocket = ZeroMQExtension(context.system).newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1235")) val memory = ManagementFactory.getMemoryMXBean val os = ManagementFactory.getOperatingSystemMXBean diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 28e651639db..4ac79904b50 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -14,9 +14,9 @@ import scala.util.control.NonFatal /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. - * + * * INTERNAL API! - * + * * Depending on this class is not supported, only the [[ActorRefProvider]] interface is supported. */ class RemoteActorRefProvider( From a0418f14b4f481ea3ada27ae2efae9515e3f59e5 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 16 Nov 2012 15:26:01 +0100 Subject: [PATCH 5/7] #2575 - attempting to fix RepointableActors --- .../akka/actor/RepointableActorRef.scala | 83 +++++++++++-------- .../scala/akka/actor/dungeon/Children.scala | 2 +- .../src/main/scala/akka/routing/Routing.scala | 1 + 3 files changed, 49 insertions(+), 37 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 87c02455bc6..8621ca116e2 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -5,7 +5,7 @@ package akka.actor import java.io.ObjectStreamException -import java.util.{ LinkedList ⇒ JLinkedList, Queue ⇒ JQueue } +import java.util.{ LinkedList ⇒ JLinkedList, ListIterator ⇒ JListIterator } import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock @@ -13,9 +13,10 @@ import scala.annotation.tailrec import scala.concurrent.forkjoin.ThreadLocalRandom import akka.actor.dungeon.ChildrenContainer -import akka.dispatch.{ Envelope, Supervise, SystemMessage, Terminate } import akka.event.Logging.Warning import akka.util.Unsafe +import akka.dispatch._ +import util.Try /** * This actor ref starts out with some dummy cell (by default just enqueuing @@ -93,7 +94,11 @@ private[akka] class RepointableActorRef( def restart(cause: Throwable): Unit = underlying.restart(cause) - def isStarted: Boolean = !underlying.isInstanceOf[UnstartedCell] + def isStarted: Boolean = underlying match { + case _: UnstartedCell ⇒ false + case null ⇒ throw new IllegalStateException("isStarted called before initialized") + case _ ⇒ true + } def isTerminated: Boolean = underlying.isTerminated @@ -137,44 +142,36 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, private[this] final val lock = new ReentrantLock // use Envelope to keep on-send checks in the same place ACCESS MUST BE PROTECTED BY THE LOCK - private[this] final val queue: JQueue[Envelope] = new JLinkedList() - private[this] final val systemQueue: JQueue[SystemMessage] = new JLinkedList() - private[this] var suspendCount: Int = 0 + private[this] final val queue = new JLinkedList[Any]() + // ACCESS MUST BE PROTECTED BY THE LOCK, is used to detect when messages are sent during replace + private[this] final var isBeingReplaced = false import systemImpl.settings.UnstartedPushTimeout.{ duration ⇒ timeout } def replaceWith(cell: Cell): Unit = locked { + isBeingReplaced = true try { - /* - * The CallingThreadDispatcher nicely dives under the ReentrantLock and - * breaks things by enqueueing into stale queues from within the message - * processing which happens in-line for sendSystemMessage() and tell(). - * Since this is the only possible way to f*ck things up within this - * lock, double-tap (well, N-tap, really); concurrent modification is - * still not possible because we’re the only thread accessing the queues. - */ - while (!systemQueue.isEmpty || !queue.isEmpty) { - while (!systemQueue.isEmpty) { - val msg = systemQueue.poll() - cell.sendSystemMessage(msg) - } - if (!queue.isEmpty) { - val envelope = queue.poll() - cell.tell(envelope.message, envelope.sender) + while (!queue.isEmpty) { + queue.poll() match { + case s: SystemMessage ⇒ cell.sendSystemMessage(s) + case e: Envelope ⇒ cell.tell(e.message, e.sender) } } - } finally try + } finally { + isBeingReplaced = false self.swapCell(cell) - finally try - for (_ ← 1 to suspendCount) cell.suspend() + } } def system: ActorSystem = systemImpl - def suspend(): Unit = locked { suspendCount += 1 } - def resume(causedByFailure: Throwable): Unit = locked { suspendCount -= 1 } - def restart(cause: Throwable): Unit = locked { suspendCount -= 1 } + def suspend(): Unit = sendSystemMessage(Suspend()) + def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure)) + def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause)) def stop(): Unit = sendSystemMessage(Terminate()) - def isTerminated: Boolean = false + def isTerminated: Boolean = locked { + val cell = self.underlying + if (cellIsReady(cell)) cell.isTerminated else false + } def parent: InternalActorRef = supervisor def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer def getChildByName(name: String): Option[ChildRestartStats] = None @@ -184,7 +181,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, if (lock.tryLock(timeout.length, timeout.unit)) { try { val cell = self.underlying - if (cell ne this) { + if (cellIsReady(cell)) { cell.tell(message, useSender) } else if (!queue.offer(Envelope(message, useSender, system))) { system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + message.getClass + " due to enqueue failure")) @@ -202,11 +199,22 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, if (lock.tryLock(timeout.length, timeout.unit)) { try { val cell = self.underlying - if (cell ne this) { + if (cellIsReady(cell)) { cell.sendSystemMessage(msg) - } else if (!systemQueue.offer(msg)) { - system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure")) - system.deadLetters ! DeadLetter(msg, self, self) + } else { + // systemMessages that are sent during replace need to jump to just after the last system message in the queue, so it's processed before other messages + val wasEnqueued = if (isBeingReplaced && !queue.isEmpty()) { + @tailrec def tryEnqueue(i: JListIterator[Any] = queue.listIterator(), insertIntoIndex: Int = -1): Boolean = + if (i.hasNext()) tryEnqueue(i, if (i.next().isInstanceOf[SystemMessage]) i.nextIndex() else insertIntoIndex) + else if (insertIntoIndex == -1) queue.offer(msg) + else Try(queue.add(insertIntoIndex, msg)).isSuccess + tryEnqueue() + } else queue.offer(msg) + + if (!wasEnqueued) { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure")) + system.deadLetters ! DeadLetter(msg, self, self) + } } } finally lock.unlock() } else { @@ -215,14 +223,17 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, } def isLocal = true + + private[this] final def cellIsReady(cell: Cell): Boolean = (cell ne this) && (cell ne null) + def hasMessages: Boolean = locked { val cell = self.underlying - if (cell eq this) !queue.isEmpty else cell.hasMessages + if (cellIsReady(cell)) cell.hasMessages else !queue.isEmpty } def numberOfMessages: Int = locked { val cell = self.underlying - if (cell eq this) queue.size else cell.numberOfMessages + if (cellIsReady(cell)) cell.numberOfMessages else queue.size } private[this] final def locked[T](body: ⇒ T): T = { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 8cd571bd589..d701f22a3d5 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -146,7 +146,7 @@ private[akka] trait Children { this: ActorCell ⇒ childrenRefs match { // The match must be performed BEFORE the removeChild case TerminatingChildrenContainer(_, _, reason) ⇒ removeChild(child) match { - case c: TerminatingChildrenContainer ⇒ None + case _: TerminatingChildrenContainer ⇒ None case _ ⇒ Some(reason) } case _ ⇒ diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b22c9a12ead..8b2b7b9d6cb 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -13,6 +13,7 @@ import com.typesafe.config.Config import scala.collection.JavaConverters.iterableAsScalaIterableConverter import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } import java.util.concurrent.TimeUnit +import akka.event.Logging.Warning import scala.concurrent.forkjoin.ThreadLocalRandom import akka.dispatch.Dispatchers import scala.annotation.tailrec From b024a2c6a7c9e5d5ceea896fb6c9a20b40ad422a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 23 Nov 2012 17:53:33 +0100 Subject: [PATCH 6/7] Roland found a nasty race between activate in supervisor and locally, this commit should fix it, and rename activate to point --- .../src/main/scala/akka/actor/ActorCell.scala | 18 +++++++++--------- .../scala/akka/actor/ActorRefProvider.scala | 7 +++---- .../scala/akka/actor/RepointableActorRef.scala | 14 +++++++------- .../scala/akka/actor/dungeon/Dispatch.scala | 2 +- .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../src/main/scala/akka/routing/Routing.scala | 2 +- 6 files changed, 22 insertions(+), 23 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index ac5abda1833..d47d67814c5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -361,10 +361,10 @@ private[akka] class ActorCell( case null ⇒ faultResume(inRespToFailure) case w: WaitingForChildren ⇒ w.enqueue(message) } - case Terminate() ⇒ terminate() - case Supervise(child, uid) ⇒ supervise(child, uid) - case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) - case NoMessage ⇒ // only here to suppress warning + case Terminate() ⇒ terminate() + case Supervise(child, async, uid) ⇒ supervise(child, async, uid) + case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) + case NoMessage ⇒ // only here to suppress warning } } catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(Nil, e, "error while processing " + message) @@ -492,21 +492,21 @@ private[akka] class ActorCell( } } - private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) { + private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) { // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() initChild(child) match { case Some(crs) ⇒ crs.uid = uid - handleSupervise(child) + handleSupervise(child, async) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) } } // future extension point - protected def handleSupervise(child: ActorRef): Unit = child match { - case r: RepointableActorRef ⇒ r.activate() - case _ ⇒ + protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match { + case r: RepointableActorRef if async ⇒ r.point() + case _ ⇒ } final protected def clearActorFields(actorInstance: Actor): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index df8e6ccbb30..3a12aead934 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -391,7 +391,7 @@ class LocalActorRefProvider( override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { - case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case Supervise(_, _, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead case ChildTerminated(_) ⇒ stop() case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") } @@ -595,14 +595,13 @@ class LocalActorRefProvider( if (settings.DebugRouterMisconfiguration && deployer.lookup(path).isDefined) log.warning("Configuration says that {} should be a router, but code disagrees. Remove the config or add a routerConfig to its Props.") - if (async) new RepointableActorRef(system, props, supervisor, path).initialize() + if (async) new RepointableActorRef(system, props, supervisor, path).initialize(async) else new LocalActorRef(system, props, supervisor, path) case router ⇒ val lookup = if (lookupDeploy) deployer.lookup(path) else None val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router)) val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a) - val ref = new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize() - if (async) ref else ref.activate() + new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize(async) } } diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 8621ca116e2..ddc49b1e228 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -53,14 +53,15 @@ private[akka] class RepointableActorRef( * * This is protected so that others can have different initialization. */ - def initialize(): this.type = + def initialize(async: Boolean): this.type = underlying match { case null ⇒ val uid = ThreadLocalRandom.current.nextInt() swapCell(new UnstartedCell(system, this, props, supervisor, uid)) - supervisor.sendSystemMessage(Supervise(this, uid)) + supervisor.sendSystemMessage(Supervise(this, async, uid)) + if (!async) point() this - case other ⇒ this + case other ⇒ throw new IllegalStateException("initialize called more than once!") } /** @@ -69,7 +70,7 @@ private[akka] class RepointableActorRef( * modification of the `underlying` field, though it is safe to send messages * at any time. */ - def activate(): this.type = + def point(): this.type = underlying match { case u: UnstartedCell ⇒ u.replaceWith(newCell(u)); this case null ⇒ throw new IllegalStateException("underlying cell is null") @@ -80,9 +81,8 @@ private[akka] class RepointableActorRef( * This is called by activate() to obtain the cell which is to replace the * unstarted cell. The cell must be fully functional. */ - def newCell(old: Cell): Cell = - new ActorCell(system, this, props, supervisor). - init(old.asInstanceOf[UnstartedCell].uid, sendSupervise = false).start() + def newCell(old: UnstartedCell): Cell = + new ActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false).start() def start(): Unit = () diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index aefd2bcc556..469aac78c28 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -55,7 +55,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ if (sendSupervise) { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - parent.sendSystemMessage(akka.dispatch.Supervise(self, uid)) + parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false, uid)) parent ! NullMessage // read ScalaDoc of NullMessage to see why } this diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 36afc8a24cd..959cb416aec 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -108,7 +108,7 @@ private[akka] case class Terminate() extends SystemMessage // sent to self from /** * INTERNAL API */ -private[akka] case class Supervise(child: ActorRef, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start +private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start /** * INTERNAL API */ diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 8b2b7b9d6cb..b7fbcfbf703 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -36,7 +36,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _props.routerConfig.verifyConfig() - override def newCell(old: Cell): Cell = new RoutedActorCell(system, this, props, supervisor, old.asInstanceOf[UnstartedCell].uid) + override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor, old.uid) } From 5ce0cb8032842d705754c0f2ddef30559a26a467 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 26 Nov 2012 11:18:26 +0100 Subject: [PATCH 7/7] =?UTF-8?q?fix=20lookup=20of=20top-level=E2=80=99s=20c?= =?UTF-8?q?hildren,=20see=20#2675?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - the problem existed only when creating children within the constructor of a top-level actor - add a test which verifies this --- .../scala/akka/actor/ActorSystemSpec.scala | 58 ++++++++++++++++++- .../java/akka/actor/AbstractActorRef.java | 2 + .../src/main/scala/akka/actor/ActorCell.scala | 5 ++ .../akka/actor/RepointableActorRef.scala | 48 ++++++++++++--- .../src/main/scala/akka/routing/Routing.scala | 6 +- 5 files changed, 107 insertions(+), 12 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index bf0128a7e36..257c7be377c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -14,6 +14,10 @@ import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLat import akka.util.Timeout import scala.concurrent.Future import akka.pattern.ask +import akka.dispatch._ +import com.typesafe.config.Config +import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue, TimeUnit } +import akka.util.Switch class JavaExtensionSpec extends JavaExtension with JUnitSuite @@ -67,10 +71,57 @@ object ActorSystemSpec { } } + case class FastActor(latch: TestLatch, testActor: ActorRef) extends Actor { + val ref1 = context.actorOf(Props.empty) + val ref2 = context.actorFor(ref1.path.toString) + testActor ! ref2.getClass + latch.countDown() + + def receive = { + case _ ⇒ + } + } + + class SlowDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(_config, _prerequisites) { + private val instance = new Dispatcher( + prerequisites, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { + val doneIt = new Switch + override protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { + val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint) + doneIt.switchOn { + TestKit.awaitCond(mbox.actor.actor != null, 1.second) + mbox.actor.actor match { + case FastActor(latch, _) ⇒ Await.ready(latch, 1.second) + } + } + ret + } + } + + /** + * Returns the same dispatcher instance for each invocation + */ + override def dispatcher(): MessageDispatcher = instance + } + + val config = s""" + akka.extensions = ["akka.actor.TestExtension"] + slow { + type="${classOf[SlowDispatcher].getName}" + }""" + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension"]""") with ImplicitSender { +class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSender { + + import ActorSystemSpec.FastActor "An ActorSystem" must { @@ -168,6 +219,11 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done") } + "find actors that just have been created" in { + system.actorOf(Props(new FastActor(TestLatch(), testActor)).withDispatcher("slow")) + expectMsgType[Class[_]] must be(classOf[LocalActorRef]) + } + "reliable deny creation of actors while shutting down" in { val system = ActorSystem() import system.dispatcher diff --git a/akka-actor/src/main/java/akka/actor/AbstractActorRef.java b/akka-actor/src/main/java/akka/actor/AbstractActorRef.java index 97ef09c5018..650182a457c 100644 --- a/akka-actor/src/main/java/akka/actor/AbstractActorRef.java +++ b/akka-actor/src/main/java/akka/actor/AbstractActorRef.java @@ -8,10 +8,12 @@ final class AbstractActorRef { final static long cellOffset; + final static long lookupOffset; static { try { cellOffset = Unsafe.instance.objectFieldOffset(RepointableActorRef.class.getDeclaredField("_cellDoNotCallMeDirectly")); + lookupOffset = Unsafe.instance.objectFieldOffset(RepointableActorRef.class.getDeclaredField("_lookupDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index d47d67814c5..fca0a26451c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -208,6 +208,11 @@ private[akka] trait Cell { * The system internals where this Cell lives. */ def systemImpl: ActorSystemImpl + /** + * Start the cell: enqueued message must not be processed before this has + * been called. The usual action is to attach the mailbox to a dispatcher. + */ + def start(): this.type /** * Recursively suspend this actor and all its children. Must not throw exceptions. */ diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index ddc49b1e228..1385e7053cc 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -33,17 +33,34 @@ private[akka] class RepointableActorRef( val path: ActorPath) extends ActorRefWithCell with RepointableRef { - import AbstractActorRef.cellOffset + import AbstractActorRef.{ cellOffset, lookupOffset } + /* + * H E R E B E D R A G O N S ! + * + * There are two main functions of a Cell: message queueing and child lookup. + * When switching out the UnstartedCell for its real replacement, the former + * must be switched after all messages have been drained from the temporary + * queue into the real mailbox, while the latter must be switched before + * processing the very first message (i.e. before Cell.start()). Hence there + * are two refs here, one for each function, and they are switched just so. + */ @volatile private var _cellDoNotCallMeDirectly: Cell = _ + @volatile private var _lookupDoNotCallMeDirectly: Cell = _ def underlying: Cell = Unsafe.instance.getObjectVolatile(this, cellOffset).asInstanceOf[Cell] + private def lookup = Unsafe.instance.getObjectVolatile(this, lookupOffset).asInstanceOf[Cell] @tailrec final def swapCell(next: Cell): Cell = { val old = underlying if (Unsafe.instance.compareAndSwapObject(this, cellOffset, old, next)) old else swapCell(next) } + @tailrec final def swapLookup(next: Cell): Cell = { + val old = lookup + if (Unsafe.instance.compareAndSwapObject(this, lookupOffset, old, next)) old else swapLookup(next) + } + /** * Initialize: make a dummy cell which holds just a mailbox, then tell our * supervisor that we exist so that he can create the real Cell in @@ -58,6 +75,7 @@ private[akka] class RepointableActorRef( case null ⇒ val uid = ThreadLocalRandom.current.nextInt() swapCell(new UnstartedCell(system, this, props, supervisor, uid)) + swapLookup(underlying) supervisor.sendSystemMessage(Supervise(this, async, uid)) if (!async) point() this @@ -72,9 +90,21 @@ private[akka] class RepointableActorRef( */ def point(): this.type = underlying match { - case u: UnstartedCell ⇒ u.replaceWith(newCell(u)); this - case null ⇒ throw new IllegalStateException("underlying cell is null") - case _ ⇒ this // this happens routinely for things which were created async=false + case u: UnstartedCell ⇒ + /* + * The problem here was that if the real actor (which will start running + * at cell.start()) creates children in its constructor, then this may + * happen before the swapCell in u.replaceWith, meaning that those + * children cannot be looked up immediately, e.g. if they shall become + * routees. + */ + val cell = newCell(u) + swapLookup(cell) + cell.start() + u.replaceWith(cell) + this + case null ⇒ throw new IllegalStateException("underlying cell is null") + case _ ⇒ this // this happens routinely for things which were created async=false } /** @@ -82,7 +112,7 @@ private[akka] class RepointableActorRef( * unstarted cell. The cell must be fully functional. */ def newCell(old: UnstartedCell): Cell = - new ActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false).start() + new ActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false) def start(): Unit = () @@ -114,7 +144,7 @@ private[akka] class RepointableActorRef( case ".." ⇒ getParent.getChild(name) case "" ⇒ getChild(name) case other ⇒ - underlying.getChildByName(other) match { + lookup.getChildByName(other) match { case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) case _ ⇒ Nobody } @@ -164,6 +194,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, } def system: ActorSystem = systemImpl + def start(): this.type = this def suspend(): Unit = sendSystemMessage(Suspend()) def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure)) def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause)) @@ -205,7 +236,10 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, // systemMessages that are sent during replace need to jump to just after the last system message in the queue, so it's processed before other messages val wasEnqueued = if (isBeingReplaced && !queue.isEmpty()) { @tailrec def tryEnqueue(i: JListIterator[Any] = queue.listIterator(), insertIntoIndex: Int = -1): Boolean = - if (i.hasNext()) tryEnqueue(i, if (i.next().isInstanceOf[SystemMessage]) i.nextIndex() else insertIntoIndex) + if (i.hasNext()) + tryEnqueue(i, + if (i.next().isInstanceOf[SystemMessage]) i.nextIndex() // update last sysmsg seen so far + else insertIntoIndex) // or just keep the last seen one else if (insertIntoIndex == -1) queue.offer(msg) else Try(queue.add(insertIntoIndex, msg)).isSuccess tryEnqueue() diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b7fbcfbf703..a664c8b0d68 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -36,11 +36,11 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _props.routerConfig.verifyConfig() - override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor, old.uid) + override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false) } -private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActorRef, _props: Props, _supervisor: InternalActorRef, _uid: Int) +private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActorRef, _props: Props, _supervisor: InternalActorRef) extends ActorCell( _system, _ref, @@ -71,8 +71,6 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo r } - init(_uid, sendSupervise = false).start() - /* * end of construction */