diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index bf0128a7e36..257c7be377c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -14,6 +14,10 @@ import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLat import akka.util.Timeout import scala.concurrent.Future import akka.pattern.ask +import akka.dispatch._ +import com.typesafe.config.Config +import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue, TimeUnit } +import akka.util.Switch class JavaExtensionSpec extends JavaExtension with JUnitSuite @@ -67,10 +71,57 @@ object ActorSystemSpec { } } + case class FastActor(latch: TestLatch, testActor: ActorRef) extends Actor { + val ref1 = context.actorOf(Props.empty) + val ref2 = context.actorFor(ref1.path.toString) + testActor ! ref2.getClass + latch.countDown() + + def receive = { + case _ ⇒ + } + } + + class SlowDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(_config, _prerequisites) { + private val instance = new Dispatcher( + prerequisites, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { + val doneIt = new Switch + override protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { + val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint) + doneIt.switchOn { + TestKit.awaitCond(mbox.actor.actor != null, 1.second) + mbox.actor.actor match { + case FastActor(latch, _) ⇒ Await.ready(latch, 1.second) + } + } + ret + } + } + + /** + * Returns the same dispatcher instance for each invocation + */ + override def dispatcher(): MessageDispatcher = instance + } + + val config = s""" + akka.extensions = ["akka.actor.TestExtension"] + slow { + type="${classOf[SlowDispatcher].getName}" + }""" + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension"]""") with ImplicitSender { +class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSender { + + import ActorSystemSpec.FastActor "An ActorSystem" must { @@ -168,6 +219,11 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done") } + "find actors that just have been created" in { + system.actorOf(Props(new FastActor(TestLatch(), testActor)).withDispatcher("slow")) + expectMsgType[Class[_]] must be(classOf[LocalActorRef]) + } + "reliable deny creation of actors while shutting down" in { val system = ActorSystem() import system.dispatcher diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 6d643005aa2..e52eb9152f8 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -5,7 +5,6 @@ package akka.routing import language.postfixOps -import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import scala.collection.mutable.LinkedList import akka.testkit._ @@ -18,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap import com.typesafe.config.Config import akka.dispatch.Dispatchers import akka.util.Timeout +import java.util.concurrent.atomic.AtomicInteger object RoutingSpec { @@ -101,33 +101,36 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "be able to send their routees" in { - class TheActor extends Actor { - val routee1 = context.actorOf(Props[TestActor], "routee1") - val routee2 = context.actorOf(Props[TestActor], "routee2") - val routee3 = context.actorOf(Props[TestActor], "routee3") - val router = context.actorOf(Props[TestActor].withRouter( - ScatterGatherFirstCompletedRouter( - routees = List(routee1, routee2, routee3), - within = 5 seconds))) + case class TestRun(id: String, names: Iterable[String], actors: Int) + val actor = system.actorOf(Props(new Actor { def receive = { - case "doIt" ⇒ router ! CurrentRoutees - case routees: RouterRoutees ⇒ testActor forward routees + case TestRun(id, names, actors) ⇒ + val routerProps = Props[TestActor].withRouter( + ScatterGatherFirstCompletedRouter( + routees = names map { context.actorOf(Props(new TestActor), _) }, + within = 5 seconds)) + + 1 to actors foreach { i ⇒ context.actorOf(routerProps, id + i).tell(CurrentRoutees, testActor) } } - } + })) - val theActor = system.actorOf(Props(new TheActor), "theActor") - theActor ! "doIt" - val routees = expectMsgPF() { - case RouterRoutees(routees) ⇒ routees.toSet - } + val actors = 15 + val names = 1 to 20 map { "routee" + _ } toList - routees.map(_.path.name) must be(Set("routee1", "routee2", "routee3")) + actor ! TestRun("test", names, actors) + + 1 to actors foreach { _ ⇒ + val routees = expectMsgType[RouterRoutees].routees + routees.map(_.path.name) must be === names + } + expectNoMsg(500.millis) } "use configured nr-of-instances when FromConfig" in { val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1") - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) watch(router) system.stop(router) expectMsgType[Terminated] @@ -135,7 +138,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with "use configured nr-of-instances when router is specified" in { val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2") - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) system.stop(router) } @@ -150,7 +154,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))), "router3") Await.ready(latch, remaining) - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) system.stop(router) } diff --git a/akka-actor/src/main/java/akka/actor/AbstractActorRef.java b/akka-actor/src/main/java/akka/actor/AbstractActorRef.java index 97ef09c5018..650182a457c 100644 --- a/akka-actor/src/main/java/akka/actor/AbstractActorRef.java +++ b/akka-actor/src/main/java/akka/actor/AbstractActorRef.java @@ -8,10 +8,12 @@ final class AbstractActorRef { final static long cellOffset; + final static long lookupOffset; static { try { cellOffset = Unsafe.instance.objectFieldOffset(RepointableActorRef.class.getDeclaredField("_cellDoNotCallMeDirectly")); + lookupOffset = Unsafe.instance.objectFieldOffset(RepointableActorRef.class.getDeclaredField("_lookupDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index ac5abda1833..fca0a26451c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -208,6 +208,11 @@ private[akka] trait Cell { * The system internals where this Cell lives. */ def systemImpl: ActorSystemImpl + /** + * Start the cell: enqueued message must not be processed before this has + * been called. The usual action is to attach the mailbox to a dispatcher. + */ + def start(): this.type /** * Recursively suspend this actor and all its children. Must not throw exceptions. */ @@ -361,10 +366,10 @@ private[akka] class ActorCell( case null ⇒ faultResume(inRespToFailure) case w: WaitingForChildren ⇒ w.enqueue(message) } - case Terminate() ⇒ terminate() - case Supervise(child, uid) ⇒ supervise(child, uid) - case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) - case NoMessage ⇒ // only here to suppress warning + case Terminate() ⇒ terminate() + case Supervise(child, async, uid) ⇒ supervise(child, async, uid) + case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) + case NoMessage ⇒ // only here to suppress warning } } catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(Nil, e, "error while processing " + message) @@ -492,21 +497,21 @@ private[akka] class ActorCell( } } - private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) { + private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) { // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() initChild(child) match { case Some(crs) ⇒ crs.uid = uid - handleSupervise(child) + handleSupervise(child, async) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) } } // future extension point - protected def handleSupervise(child: ActorRef): Unit = child match { - case r: RepointableActorRef ⇒ r.activate() - case _ ⇒ + protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match { + case r: RepointableActorRef if async ⇒ r.point() + case _ ⇒ } final protected def clearActorFields(actorInstance: Actor): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index df8e6ccbb30..3a12aead934 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -391,7 +391,7 @@ class LocalActorRefProvider( override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { - case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case Supervise(_, _, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead case ChildTerminated(_) ⇒ stop() case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") } @@ -595,14 +595,13 @@ class LocalActorRefProvider( if (settings.DebugRouterMisconfiguration && deployer.lookup(path).isDefined) log.warning("Configuration says that {} should be a router, but code disagrees. Remove the config or add a routerConfig to its Props.") - if (async) new RepointableActorRef(system, props, supervisor, path).initialize() + if (async) new RepointableActorRef(system, props, supervisor, path).initialize(async) else new LocalActorRef(system, props, supervisor, path) case router ⇒ val lookup = if (lookupDeploy) deployer.lookup(path) else None val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router)) val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a) - val ref = new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize() - if (async) ref else ref.activate() + new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize(async) } } diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 8ecb1cbb725..1385e7053cc 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -5,17 +5,18 @@ package akka.actor import java.io.ObjectStreamException +import java.util.{ LinkedList ⇒ JLinkedList, ListIterator ⇒ JListIterator } import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import scala.annotation.tailrec -import scala.collection.mutable.Queue import scala.concurrent.forkjoin.ThreadLocalRandom import akka.actor.dungeon.ChildrenContainer -import akka.dispatch.{ Envelope, Supervise, SystemMessage, Terminate } import akka.event.Logging.Warning import akka.util.Unsafe +import akka.dispatch._ +import util.Try /** * This actor ref starts out with some dummy cell (by default just enqueuing @@ -32,17 +33,34 @@ private[akka] class RepointableActorRef( val path: ActorPath) extends ActorRefWithCell with RepointableRef { - import AbstractActorRef.cellOffset + import AbstractActorRef.{ cellOffset, lookupOffset } + /* + * H E R E B E D R A G O N S ! + * + * There are two main functions of a Cell: message queueing and child lookup. + * When switching out the UnstartedCell for its real replacement, the former + * must be switched after all messages have been drained from the temporary + * queue into the real mailbox, while the latter must be switched before + * processing the very first message (i.e. before Cell.start()). Hence there + * are two refs here, one for each function, and they are switched just so. + */ @volatile private var _cellDoNotCallMeDirectly: Cell = _ + @volatile private var _lookupDoNotCallMeDirectly: Cell = _ def underlying: Cell = Unsafe.instance.getObjectVolatile(this, cellOffset).asInstanceOf[Cell] + private def lookup = Unsafe.instance.getObjectVolatile(this, lookupOffset).asInstanceOf[Cell] @tailrec final def swapCell(next: Cell): Cell = { val old = underlying if (Unsafe.instance.compareAndSwapObject(this, cellOffset, old, next)) old else swapCell(next) } + @tailrec final def swapLookup(next: Cell): Cell = { + val old = lookup + if (Unsafe.instance.compareAndSwapObject(this, lookupOffset, old, next)) old else swapLookup(next) + } + /** * Initialize: make a dummy cell which holds just a mailbox, then tell our * supervisor that we exist so that he can create the real Cell in @@ -52,12 +70,17 @@ private[akka] class RepointableActorRef( * * This is protected so that others can have different initialization. */ - def initialize(): this.type = { - val uid = ThreadLocalRandom.current.nextInt() - swapCell(new UnstartedCell(system, this, props, supervisor, uid)) - supervisor.sendSystemMessage(Supervise(this, uid)) - this - } + def initialize(async: Boolean): this.type = + underlying match { + case null ⇒ + val uid = ThreadLocalRandom.current.nextInt() + swapCell(new UnstartedCell(system, this, props, supervisor, uid)) + swapLookup(underlying) + supervisor.sendSystemMessage(Supervise(this, async, uid)) + if (!async) point() + this + case other ⇒ throw new IllegalStateException("initialize called more than once!") + } /** * This method is supposed to be called by the supervisor in handleSupervise() @@ -65,21 +88,31 @@ private[akka] class RepointableActorRef( * modification of the `underlying` field, though it is safe to send messages * at any time. */ - def activate(): this.type = { + def point(): this.type = underlying match { - case u: UnstartedCell ⇒ u.replaceWith(newCell(u)) - case _ ⇒ // this happens routinely for things which were created async=false + case u: UnstartedCell ⇒ + /* + * The problem here was that if the real actor (which will start running + * at cell.start()) creates children in its constructor, then this may + * happen before the swapCell in u.replaceWith, meaning that those + * children cannot be looked up immediately, e.g. if they shall become + * routees. + */ + val cell = newCell(u) + swapLookup(cell) + cell.start() + u.replaceWith(cell) + this + case null ⇒ throw new IllegalStateException("underlying cell is null") + case _ ⇒ this // this happens routinely for things which were created async=false } - this - } /** * This is called by activate() to obtain the cell which is to replace the * unstarted cell. The cell must be fully functional. */ - def newCell(old: Cell): Cell = - new ActorCell(system, this, props, supervisor). - init(old.asInstanceOf[UnstartedCell].uid, sendSupervise = false).start() + def newCell(old: UnstartedCell): Cell = + new ActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false) def start(): Unit = () @@ -91,7 +124,11 @@ private[akka] class RepointableActorRef( def restart(cause: Throwable): Unit = underlying.restart(cause) - def isStarted: Boolean = !underlying.isInstanceOf[UnstartedCell] + def isStarted: Boolean = underlying match { + case _: UnstartedCell ⇒ false + case null ⇒ throw new IllegalStateException("isStarted called before initialized") + case _ ⇒ true + } def isTerminated: Boolean = underlying.isTerminated @@ -107,7 +144,7 @@ private[akka] class RepointableActorRef( case ".." ⇒ getParent.getChild(name) case "" ⇒ getChild(name) case other ⇒ - underlying.getChildByName(other) match { + lookup.getChildByName(other) match { case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) case _ ⇒ Nobody } @@ -122,117 +159,120 @@ private[akka] class RepointableActorRef( protected def writeReplace(): AnyRef = SerializedActorRef(path) } -private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: RepointableActorRef, val props: Props, val supervisor: InternalActorRef, val uid: Int) - extends Cell { +private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, + val self: RepointableActorRef, + val props: Props, + val supervisor: InternalActorRef, + val uid: Int) extends Cell { /* * This lock protects all accesses to this cell’s queues. It also ensures * safe switching to the started ActorCell. */ - val lock = new ReentrantLock + private[this] final val lock = new ReentrantLock - // use Envelope to keep on-send checks in the same place - val queue: Queue[Envelope] = Queue() - val systemQueue: Queue[SystemMessage] = Queue() - var suspendCount: Int = 0 + // use Envelope to keep on-send checks in the same place ACCESS MUST BE PROTECTED BY THE LOCK + private[this] final val queue = new JLinkedList[Any]() + // ACCESS MUST BE PROTECTED BY THE LOCK, is used to detect when messages are sent during replace + private[this] final var isBeingReplaced = false - private def timeout = system.settings.UnstartedPushTimeout.duration.toMillis + import systemImpl.settings.UnstartedPushTimeout.{ duration ⇒ timeout } - def replaceWith(cell: Cell): Unit = { - lock.lock() + def replaceWith(cell: Cell): Unit = locked { + isBeingReplaced = true try { - /* - * The CallingThreadDispatcher nicely dives under the ReentrantLock and - * breaks things by enqueueing into stale queues from within the message - * processing which happens in-line for sendSystemMessage() and tell(). - * Since this is the only possible way to f*ck things up within this - * lock, double-tap (well, N-tap, really); concurrent modification is - * still not possible because we’re the only thread accessing the queues. - */ - while (systemQueue.nonEmpty || queue.nonEmpty) { - while (systemQueue.nonEmpty) { - val msg = systemQueue.dequeue() - cell.sendSystemMessage(msg) - } - if (queue.nonEmpty) { - val envelope = queue.dequeue() - cell.tell(envelope.message, envelope.sender) + while (!queue.isEmpty) { + queue.poll() match { + case s: SystemMessage ⇒ cell.sendSystemMessage(s) + case e: Envelope ⇒ cell.tell(e.message, e.sender) } } - } finally try + } finally { + isBeingReplaced = false self.swapCell(cell) - finally try - for (_ ← 1 to suspendCount) cell.suspend() - finally - lock.unlock() + } } def system: ActorSystem = systemImpl - def suspend(): Unit = { - lock.lock() - try suspendCount += 1 - finally lock.unlock() - } - def resume(causedByFailure: Throwable): Unit = { - lock.lock() - try suspendCount -= 1 - finally lock.unlock() - } - def restart(cause: Throwable): Unit = { - lock.lock() - try suspendCount -= 1 - finally lock.unlock() - } + def start(): this.type = this + def suspend(): Unit = sendSystemMessage(Suspend()) + def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure)) + def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause)) def stop(): Unit = sendSystemMessage(Terminate()) - def isTerminated: Boolean = false + def isTerminated: Boolean = locked { + val cell = self.underlying + if (cellIsReady(cell)) cell.isTerminated else false + } def parent: InternalActorRef = supervisor def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer def getChildByName(name: String): Option[ChildRestartStats] = None + def tell(message: Any, sender: ActorRef): Unit = { val useSender = if (sender eq Actor.noSender) system.deadLetters else sender - if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + if (lock.tryLock(timeout.length, timeout.unit)) { try { - if (self.underlying eq this) queue enqueue Envelope(message, useSender, system) - else self.underlying.tell(message, useSender) - } finally { - lock.unlock() - } + val cell = self.underlying + if (cellIsReady(cell)) { + cell.tell(message, useSender) + } else if (!queue.offer(Envelope(message, useSender, system))) { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + message.getClass + " due to enqueue failure")) + system.deadLetters ! DeadLetter(message, useSender, self) + } + } finally lock.unlock() } else { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + message.getClass + " due to lock timeout")) system.deadLetters ! DeadLetter(message, useSender, self) } } - def sendSystemMessage(msg: SystemMessage): Unit = { - if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + + // FIXME: once we have guaranteed delivery of system messages, hook this in! + def sendSystemMessage(msg: SystemMessage): Unit = + if (lock.tryLock(timeout.length, timeout.unit)) { try { - if (self.underlying eq this) systemQueue enqueue msg - else self.underlying.sendSystemMessage(msg) - } finally { - lock.unlock() - } + val cell = self.underlying + if (cellIsReady(cell)) { + cell.sendSystemMessage(msg) + } else { + // systemMessages that are sent during replace need to jump to just after the last system message in the queue, so it's processed before other messages + val wasEnqueued = if (isBeingReplaced && !queue.isEmpty()) { + @tailrec def tryEnqueue(i: JListIterator[Any] = queue.listIterator(), insertIntoIndex: Int = -1): Boolean = + if (i.hasNext()) + tryEnqueue(i, + if (i.next().isInstanceOf[SystemMessage]) i.nextIndex() // update last sysmsg seen so far + else insertIntoIndex) // or just keep the last seen one + else if (insertIntoIndex == -1) queue.offer(msg) + else Try(queue.add(insertIntoIndex, msg)).isSuccess + tryEnqueue() + } else queue.offer(msg) + + if (!wasEnqueued) { + system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure")) + system.deadLetters ! DeadLetter(msg, self, self) + } + } + } finally lock.unlock() } else { - // FIXME: once we have guaranteed delivery of system messages, hook this in! system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to lock timeout")) system.deadLetters ! DeadLetter(msg, self, self) } - } + def isLocal = true - def hasMessages: Boolean = { - lock.lock() - try { - if (self.underlying eq this) !queue.isEmpty - else self.underlying.hasMessages - } finally { - lock.unlock() - } + + private[this] final def cellIsReady(cell: Cell): Boolean = (cell ne this) && (cell ne null) + + def hasMessages: Boolean = locked { + val cell = self.underlying + if (cellIsReady(cell)) cell.hasMessages else !queue.isEmpty + } + + def numberOfMessages: Int = locked { + val cell = self.underlying + if (cellIsReady(cell)) cell.numberOfMessages else queue.size } - def numberOfMessages: Int = { + + private[this] final def locked[T](body: ⇒ T): T = { lock.lock() - try { - if (self.underlying eq this) queue.size - else self.underlying.numberOfMessages - } finally { - lock.unlock() - } + try body finally lock.unlock() } } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index d59b0b71da8..d701f22a3d5 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -53,19 +53,24 @@ private[akka] trait Children { this: ActorCell ⇒ } final def stop(actor: ActorRef): Unit = { - val started = actor match { - case r: RepointableRef ⇒ r.isStarted - case _ ⇒ true + if (childrenRefs.getByRef(actor).isDefined) { + @tailrec def shallDie(ref: ActorRef): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) + } + + if (actor match { + case r: RepointableRef ⇒ r.isStarted + case _ ⇒ true + }) shallDie(actor) } - if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor) actor.asInstanceOf[InternalActorRef].stop() } /* * low level CAS helpers */ - - @inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = + @inline private final def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren) @tailrec final def reserveChild(name: String): Boolean = { @@ -90,18 +95,6 @@ private[akka] trait Children { this: ActorCell ⇒ } } - @tailrec final protected def shallDie(ref: ActorRef): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) - } - - @tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = { - val c = childrenRefs - val n = c.remove(ref) - if (swapChildrenRefs(c, n)) n - else removeChild(ref) - } - @tailrec final protected def setChildrenTerminationReason(reason: ChildrenContainer.SuspendReason): Boolean = { childrenRefs match { case c: ChildrenContainer.TerminatingChildrenContainer ⇒ @@ -144,10 +137,18 @@ private[akka] trait Children { this: ActorCell ⇒ protected def getAllChildStats: Iterable[ChildRestartStats] = childrenRefs.stats protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = { - childrenRefs match { + @tailrec def removeChild(ref: ActorRef): ChildrenContainer = { + val c = childrenRefs + val n = c.remove(ref) + if (swapChildrenRefs(c, n)) n else removeChild(ref) + } + + childrenRefs match { // The match must be performed BEFORE the removeChild case TerminatingChildrenContainer(_, _, reason) ⇒ - val newContainer = removeChild(child) - if (!newContainer.isInstanceOf[TerminatingChildrenContainer]) Some(reason) else None + removeChild(child) match { + case _: TerminatingChildrenContainer ⇒ None + case _ ⇒ Some(reason) + } case _ ⇒ removeChild(child) None diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index aefd2bcc556..469aac78c28 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -55,7 +55,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ if (sendSupervise) { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - parent.sendSystemMessage(akka.dispatch.Supervise(self, uid)) + parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false, uid)) parent ! NullMessage // read ScalaDoc of NullMessage to see why } this diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 36afc8a24cd..959cb416aec 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -108,7 +108,7 @@ private[akka] case class Terminate() extends SystemMessage // sent to self from /** * INTERNAL API */ -private[akka] case class Supervise(child: ActorRef, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start +private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start /** * INTERNAL API */ diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 9ccf43fb584..a664c8b0d68 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -5,18 +5,20 @@ package akka.routing import language.implicitConversions import language.postfixOps -import akka.actor._ import scala.concurrent.duration._ +import akka.actor._ import akka.ConfigurationException import akka.pattern.pipe import com.typesafe.config.Config import scala.collection.JavaConverters.iterableAsScalaIterableConverter import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } import java.util.concurrent.TimeUnit +import akka.event.Logging.Warning import scala.concurrent.forkjoin.ThreadLocalRandom import akka.dispatch.Dispatchers import scala.annotation.tailrec import concurrent.ExecutionContext +import akka.event.Logging.Warning /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -34,11 +36,11 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _props.routerConfig.verifyConfig() - override def newCell(old: Cell): Cell = new RoutedActorCell(system, this, props, supervisor, old.asInstanceOf[UnstartedCell].uid) + override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false) } -private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActorRef, _props: Props, _supervisor: InternalActorRef, _uid: Int) +private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActorRef, _props: Props, _supervisor: InternalActorRef) extends ActorCell( _system, _ref, @@ -69,8 +71,6 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo r } - init(_uid, sendSupervise = false).start() - /* * end of construction */ @@ -92,7 +92,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo * `RouterConfig.createRoute` and `Resizer.resize` */ private[akka] def addRoutees(newRoutees: Iterable[ActorRef]): Unit = { - _routees = _routees ++ newRoutees + _routees ++= newRoutees // subscribe to Terminated messages for all route destinations, to be handled by Router actor newRoutees foreach watch } @@ -108,30 +108,27 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo } override def tell(message: Any, sender: ActorRef): Unit = { - resize() - + resize() // Mucho importante val s = if (sender eq null) system.deadLetters else sender - - val msg = message match { - case wrapped: RouterEnvelope ⇒ wrapped.message - case m ⇒ m - } - applyRoute(s, message) match { - case Destination(_, x) :: Nil if x == self ⇒ super.tell(message, s) - case refs ⇒ - refs foreach (p ⇒ - if (p.recipient == self) super.tell(msg, p.sender) - else p.recipient.!(msg)(p.sender)) + case Destination(_, x) :: Nil if x == self ⇒ + super.tell(message, s) + case refs ⇒ refs foreach { p ⇒ + val msg = message match { + case wrapped: RouterEnvelope ⇒ wrapped.message + case m ⇒ m + } + if (p.recipient == self) super.tell(msg, p.sender) + else p.recipient.!(msg)(p.sender) + } } } - def resize(): Unit = { + def resize(): Unit = for (r ← routerConfig.resizer) { if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) super.tell(Router.Resize, self) } - } } /**