Skip to content

Commit

Permalink
Merge pull request #905 from akka/wip-2.1-2675-∂π
Browse files Browse the repository at this point in the history
Wip 2.1 2675 ∂π
  • Loading branch information
viktorklang committed Nov 26, 2012
2 parents c04c3cf + 5ce0cb8 commit d88d328
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 175 deletions.
58 changes: 57 additions & 1 deletion akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala
Expand Up @@ -14,6 +14,10 @@ import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLat
import akka.util.Timeout
import scala.concurrent.Future
import akka.pattern.ask
import akka.dispatch._
import com.typesafe.config.Config
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue, TimeUnit }
import akka.util.Switch

class JavaExtensionSpec extends JavaExtension with JUnitSuite

Expand Down Expand Up @@ -67,10 +71,57 @@ object ActorSystemSpec {
}
}

case class FastActor(latch: TestLatch, testActor: ActorRef) extends Actor {
val ref1 = context.actorOf(Props.empty)
val ref2 = context.actorFor(ref1.path.toString)
testActor ! ref2.getClass
latch.countDown()

def receive = {
case _
}
}

class SlowDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(_config, _prerequisites) {
private val instance = new Dispatcher(
prerequisites,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {
val doneIt = new Switch
override protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint)
doneIt.switchOn {
TestKit.awaitCond(mbox.actor.actor != null, 1.second)
mbox.actor.actor match {
case FastActor(latch, _) Await.ready(latch, 1.second)
}
}
ret
}
}

/**
* Returns the same dispatcher instance for each invocation
*/
override def dispatcher(): MessageDispatcher = instance
}

val config = s"""
akka.extensions = ["akka.actor.TestExtension"]
slow {
type="${classOf[SlowDispatcher].getName}"
}"""

}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension"]""") with ImplicitSender {
class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSender {

import ActorSystemSpec.FastActor

"An ActorSystem" must {

Expand Down Expand Up @@ -168,6 +219,11 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done")
}

"find actors that just have been created" in {
system.actorOf(Props(new FastActor(TestLatch(), testActor)).withDispatcher("slow"))
expectMsgType[Class[_]] must be(classOf[LocalActorRef])
}

"reliable deny creation of actors while shutting down" in {
val system = ActorSystem()
import system.dispatcher
Expand Down
47 changes: 26 additions & 21 deletions akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
Expand Up @@ -5,7 +5,6 @@ package akka.routing

import language.postfixOps

import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import scala.collection.mutable.LinkedList
import akka.testkit._
Expand All @@ -18,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config
import akka.dispatch.Dispatchers
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger

object RoutingSpec {

Expand Down Expand Up @@ -101,41 +101,45 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}

"be able to send their routees" in {
class TheActor extends Actor {
val routee1 = context.actorOf(Props[TestActor], "routee1")
val routee2 = context.actorOf(Props[TestActor], "routee2")
val routee3 = context.actorOf(Props[TestActor], "routee3")
val router = context.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(
routees = List(routee1, routee2, routee3),
within = 5 seconds)))

case class TestRun(id: String, names: Iterable[String], actors: Int)
val actor = system.actorOf(Props(new Actor {
def receive = {
case "doIt" router ! CurrentRoutees
case routees: RouterRoutees testActor forward routees
case TestRun(id, names, actors)
val routerProps = Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(
routees = names map { context.actorOf(Props(new TestActor), _) },
within = 5 seconds))

1 to actors foreach { i context.actorOf(routerProps, id + i).tell(CurrentRoutees, testActor) }
}
}
}))

val theActor = system.actorOf(Props(new TheActor), "theActor")
theActor ! "doIt"
val routees = expectMsgPF() {
case RouterRoutees(routees) routees.toSet
}
val actors = 15
val names = 1 to 20 map { "routee" + _ } toList

routees.map(_.path.name) must be(Set("routee1", "routee2", "routee3"))
actor ! TestRun("test", names, actors)

1 to actors foreach { _
val routees = expectMsgType[RouterRoutees].routees
routees.map(_.path.name) must be === names
}
expectNoMsg(500.millis)
}

"use configured nr-of-instances when FromConfig" in {
val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1")
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3)
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size must be(3)
watch(router)
system.stop(router)
expectMsgType[Terminated]
}

"use configured nr-of-instances when router is specified" in {
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2")
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3)
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size must be(3)
system.stop(router)
}

Expand All @@ -150,7 +154,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))), "router3")
Await.ready(latch, remaining)
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3)
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size must be(3)
system.stop(router)
}

Expand Down
2 changes: 2 additions & 0 deletions akka-actor/src/main/java/akka/actor/AbstractActorRef.java
Expand Up @@ -8,10 +8,12 @@

final class AbstractActorRef {
final static long cellOffset;
final static long lookupOffset;

static {
try {
cellOffset = Unsafe.instance.objectFieldOffset(RepointableActorRef.class.getDeclaredField("_cellDoNotCallMeDirectly"));
lookupOffset = Unsafe.instance.objectFieldOffset(RepointableActorRef.class.getDeclaredField("_lookupDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
Expand Down
23 changes: 14 additions & 9 deletions akka-actor/src/main/scala/akka/actor/ActorCell.scala
Expand Up @@ -208,6 +208,11 @@ private[akka] trait Cell {
* The system internals where this Cell lives.
*/
def systemImpl: ActorSystemImpl
/**
* Start the cell: enqueued message must not be processed before this has
* been called. The usual action is to attach the mailbox to a dispatcher.
*/
def start(): this.type
/**
* Recursively suspend this actor and all its children. Must not throw exceptions.
*/
Expand Down Expand Up @@ -361,10 +366,10 @@ private[akka] class ActorCell(
case null faultResume(inRespToFailure)
case w: WaitingForChildren w.enqueue(message)
}
case Terminate() terminate()
case Supervise(child, uid) supervise(child, uid)
case ChildTerminated(child) todo = handleChildTerminated(child)
case NoMessage // only here to suppress warning
case Terminate() terminate()
case Supervise(child, async, uid) supervise(child, async, uid)
case ChildTerminated(child) todo = handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(Nil, e, "error while processing " + message)
Expand Down Expand Up @@ -492,21 +497,21 @@ private[akka] class ActorCell(
}
}

private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) {
private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) {
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
initChild(child) match {
case Some(crs)
crs.uid = uid
handleSupervise(child)
handleSupervise(child, async)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case None publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
}
}

// future extension point
protected def handleSupervise(child: ActorRef): Unit = child match {
case r: RepointableActorRef r.activate()
case _
protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match {
case r: RepointableActorRef if async r.point()
case _
}

final protected def clearActorFields(actorInstance: Actor): Unit = {
Expand Down
7 changes: 3 additions & 4 deletions akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
Expand Up @@ -391,7 +391,7 @@ class LocalActorRefProvider(

override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
message match {
case Supervise(_, _) // TODO register child in some map to keep track of it and enable shutdown after all dead
case Supervise(_, _, _) // TODO register child in some map to keep track of it and enable shutdown after all dead
case ChildTerminated(_) stop()
case _ log.error(this + " received unexpected system message [" + message + "]")
}
Expand Down Expand Up @@ -595,14 +595,13 @@ class LocalActorRefProvider(
if (settings.DebugRouterMisconfiguration && deployer.lookup(path).isDefined)
log.warning("Configuration says that {} should be a router, but code disagrees. Remove the config or add a routerConfig to its Props.")

if (async) new RepointableActorRef(system, props, supervisor, path).initialize()
if (async) new RepointableActorRef(system, props, supervisor, path).initialize(async)
else new LocalActorRef(system, props, supervisor, path)
case router
val lookup = if (lookupDeploy) deployer.lookup(path) else None
val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router))
val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) b withFallback a)
val ref = new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize()
if (async) ref else ref.activate()
new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize(async)
}
}

Expand Down

0 comments on commit d88d328

Please sign in to comment.