Skip to content

Commit

Permalink
Fix compile errors and failing tests after upgrade to akka-actor 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
krasserm committed May 23, 2011
1 parent 1b5f8b8 commit e9e6b63
Show file tree
Hide file tree
Showing 14 changed files with 40 additions and 44 deletions.
5 changes: 4 additions & 1 deletion akka-camel/src/main/scala/akka/camel/Consumer.scala
Expand Up @@ -141,7 +141,10 @@ private[camel] object Consumer {
*/
def withConsumer[T](actorRef: ActorRef)(f: Consumer => T): Option[T] = {
if (!actorRef.actor.isInstanceOf[Consumer]) None
else if (actorRef.homeAddress.isDefined) None

// TODO: check if this is needed at all
//else if (actorRef.homeAddress.isDefined) None

else Some(f(actorRef.actor.asInstanceOf[Consumer]))
}
}
4 changes: 2 additions & 2 deletions akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala
Expand Up @@ -20,8 +20,8 @@ import akka.event.EventHandler
*/
private[camel] class ConsumerPublishRequestor extends PublishRequestor {
def receiveActorRegistryEvent = {
case ActorRegistered(actor) => for (event <- ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event)
case ActorUnregistered(actor) => for (event <- ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event)
case ActorRegistered(_, actor) => for (event <- ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event)
case ActorUnregistered(_, actor) => for (event <- ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event)
}
}

Expand Down
Expand Up @@ -54,7 +54,7 @@ private[camel] abstract class PublishRequestor extends Actor {
* @author Martin Krasser
*/
private[camel] object PublishRequestor {
def pastActorRegisteredEvents = for (actor <- Actor.registry.actors) yield ActorRegistered(actor)
def pastActorRegisteredEvents = for (actor <- Actor.registry.local.actors) yield ActorRegistered(actor.address, actor)
}

/**
Expand Down
Expand Up @@ -206,12 +206,8 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn
case null => uuid
}

private def targetById(id: String) = Actor.registry.actorsFor(id) match {
case actors if actors.length == 0 => None
case actors => Some(actors(0))
}

private def targetByUuid(uuid: Uuid) = Actor.registry.actorFor(uuid)
private def targetById(id: String) = Actor.registry.local.actorFor(id)
private def targetByUuid(uuid: Uuid) = Actor.registry.local.actorFor(uuid)
}

/**
Expand Down Expand Up @@ -268,6 +264,8 @@ private[akka] object AsyncCallbackAdapter {
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef {
import akka.camel.Consumer._

val address = exchange.getExchangeId

def start = {
_status = ActorRefInternals.RUNNING
this
Expand Down Expand Up @@ -304,7 +302,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
def remoteAddress: Option[InetSocketAddress] = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): ActorRef = unsupported
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported
def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
Expand Down
Expand Up @@ -25,7 +25,7 @@ public static void setUpBeforeClass() {
@AfterClass
public static void tearDownAfterClass() {
stopCamelService();
registry().shutdownAll();
registry().local().shutdownAll();
}

@Test
Expand Down
Expand Up @@ -12,7 +12,7 @@ class CamelServiceManagerTest extends WordSpec with BeforeAndAfterAll with MustM

override def afterAll = {
CamelServiceManager.stopCamelService
Actor.registry.shutdownAll
Actor.registry.local.shutdownAll
}

"A CamelServiceManager" when {
Expand Down
Expand Up @@ -28,20 +28,20 @@ class ConsumerPublishRequestorTest extends JUnitSuite {

@After def tearDown = {
Actor.registry.removeListener(requestor);
Actor.registry.shutdownAll
Actor.registry.local.shutdownAll
}

@Test def shouldReceiveOneConsumerRegisteredEvent = {
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorRegistered(consumer)
requestor ! ActorRegistered(consumer.address, consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer])))
}

@Test def shouldReceiveOneConsumerUnregisteredEvent = {
val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get
requestor ! ActorUnregistered(consumer)
requestor ! ActorUnregistered(consumer.address, consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher !! GetRetainedMessage) ===
Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer])))
Expand Down
4 changes: 2 additions & 2 deletions akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala
Expand Up @@ -24,7 +24,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher
var service: CamelService = _

override protected def beforeAll = {
registry.shutdownAll
registry.local.shutdownAll
service = CamelServiceFactory.createCamelService
// register test consumer before registering the publish requestor
// and before starting the CamelService (registry is scanned for consumers)
Expand All @@ -39,7 +39,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher

override protected def afterAll = {
service.stop
registry.shutdownAll
registry.local.shutdownAll
}

"A responding consumer" when {
Expand Down
Expand Up @@ -6,21 +6,21 @@ import org.apache.camel.component.mock.MockEndpoint
import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}

import akka.actor.Actor._
import akka.actor.{ActorRef, Actor, ActorRegistry}
import akka.actor.{ActorRef, Actor}

class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
import ProducerFeatureTest._

override protected def beforeAll = {
Actor.registry.shutdownAll
Actor.registry.local.shutdownAll
CamelContextManager.init
CamelContextManager.mandatoryContext.addRoutes(new TestRoute)
CamelContextManager.start
}

override protected def afterAll = {
CamelContextManager.stop
Actor.registry.shutdownAll
Actor.registry.local.shutdownAll
}

override protected def afterEach = {
Expand Down
Expand Up @@ -11,15 +11,15 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with
import UntypedProducerFeatureTest._

override protected def beforeAll = {
registry.shutdownAll
registry.local.shutdownAll
CamelContextManager.init
CamelContextManager.mandatoryContext.addRoutes(new TestRoute)
CamelContextManager.start
}

override protected def afterAll = {
CamelContextManager.stop
registry.shutdownAll
registry.local.shutdownAll
}

override protected def afterEach = {
Expand Down
Expand Up @@ -16,7 +16,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
import ActorComponentFeatureTest._

override protected def beforeAll = {
Actor.registry.shutdownAll
Actor.registry.local.shutdownAll
CamelContextManager.init
CamelContextManager.mandatoryContext.addRoutes(new TestRoute)
CamelContextManager.start
Expand All @@ -25,7 +25,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
override protected def afterAll = CamelContextManager.stop

override protected def afterEach = {
Actor.registry.shutdownAll
Actor.registry.local.shutdownAll
mockEndpoint.reset
}

Expand Down Expand Up @@ -72,19 +72,19 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
scenario("one-way communication") {
val actor = actorOf[Tester1].start
val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get
mandatoryTemplate.sendBody("actor:%s" format actor.id, "Martin")
mandatoryTemplate.sendBody("actor:%s" format actor.address, "Martin")
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message]
assert(reply.body === "Martin")
}

scenario("two-way communication") {
val actor = actorOf[Tester2].start
assert(mandatoryTemplate.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin")
assert(mandatoryTemplate.requestBody("actor:%s" format actor.address, "Martin") === "Hello Martin")
}

scenario("two-way communication via a custom route") {
val actor = actorOf[CustomIdActor].start
val actor = actorOf[CustomIdActor]("custom-id").start
assert(mandatoryTemplate.requestBody("direct:custom-id-test-1", "Martin") === "Received Martin")
assert(mandatoryTemplate.requestBody("direct:custom-id-test-2", "Martin") === "Received Martin")
}
Expand All @@ -95,7 +95,6 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with

object ActorComponentFeatureTest {
class CustomIdActor extends Actor {
self.id = "custom-id"
protected def receive = {
case msg: Message => self.reply("Received %s" format msg.body)
}
Expand Down
Expand Up @@ -17,7 +17,7 @@ import akka.camel.CamelTestSupport._
class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
import ActorProducerTest._

@After def tearDown = registry.shutdownAll
@After def tearDown = registry.local.shutdownAll

@Test def shouldSendMessageToActorWithSyncProcessor = {
val actor = actorOf[Tester1].start
Expand Down Expand Up @@ -106,20 +106,18 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
}

@Test def shouldDynamicallyRouteMessageToActorWithDefaultId = {
val actor1 = actorOf[Tester1]
val actor2 = actorOf[Tester1]
actor1.id = "x"
actor2.id = "y"
val actor1 = actorOf[Tester1]("x")
val actor2 = actorOf[Tester1]("y")
actor1.start
actor2.start
val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
val endpoint = actorEndpoint("actor:id:%s" format actor1.id)
val endpoint = actorEndpoint("actor:id:%s" format actor1.address)
val exchange1 = endpoint.createExchange(ExchangePattern.InOnly)
val exchange2 = endpoint.createExchange(ExchangePattern.InOnly)
exchange1.getIn.setBody("Test1")
exchange2.getIn.setBody("Test2")
exchange2.getIn.setHeader(ActorComponent.ActorIdentifier, actor2.id)
exchange2.getIn.setHeader(ActorComponent.ActorIdentifier, actor2.address)
actorProducer(endpoint).process(exchange1)
actorProducer(endpoint).process(exchange2)
assert(latch1.await(5, TimeUnit.SECONDS))
Expand All @@ -131,10 +129,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
}

@Test def shouldDynamicallyRouteMessageToActorWithoutDefaultId = {
val actor1 = actorOf[Tester1]
val actor2 = actorOf[Tester1]
actor1.id = "x"
actor2.id = "y"
val actor1 = actorOf[Tester1]("x")
val actor2 = actorOf[Tester1]("y")
actor1.start
actor2.start
val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get
Expand All @@ -144,8 +140,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
val exchange2 = endpoint.createExchange(ExchangePattern.InOnly)
exchange1.getIn.setBody("Test1")
exchange2.getIn.setBody("Test2")
exchange1.getIn.setHeader(ActorComponent.ActorIdentifier, actor1.id)
exchange2.getIn.setHeader(ActorComponent.ActorIdentifier, actor2.id)
exchange1.getIn.setHeader(ActorComponent.ActorIdentifier, actor1.address)
exchange2.getIn.setHeader(ActorComponent.ActorIdentifier, actor2.address)
actorProducer(endpoint).process(exchange1)
actorProducer(endpoint).process(exchange2)
assert(latch1.await(5, TimeUnit.SECONDS))
Expand Down
2 changes: 1 addition & 1 deletion akka-spring/src/test/resources/akka-test.conf
Expand Up @@ -34,7 +34,7 @@ akka {
dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down

default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
type = "GlobalDispatcher" # Must be one of the following, all "Global*" are non-configurable
# - ExecutorBasedEventDriven
# - ExecutorBasedEventDrivenWorkStealing
# - GlobalExecutorBasedEventDriven
Expand Down
2 changes: 1 addition & 1 deletion config/akka-reference.conf
Expand Up @@ -35,7 +35,7 @@ akka {
dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down

default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
type = "GlobalDispatcher" # Must be one of the following, all "Global*" are non-configurable
# - ExecutorBasedEventDriven
# - ExecutorBasedEventDrivenWorkStealing
# - GlobalExecutorBasedEventDriven
Expand Down

0 comments on commit e9e6b63

Please sign in to comment.