From e9e6b632cb1eec6e399e4ab932740c0bc9b31447 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Mon, 23 May 2011 11:59:49 +0200 Subject: [PATCH] Fix compile errors and failing tests after upgrade to akka-actor 2.0 --- .../src/main/scala/akka/camel/Consumer.scala | 5 ++++- .../scala/akka/camel/ConsumerPublisher.scala | 4 ++-- .../scala/akka/camel/PublisherRequestor.scala | 2 +- .../akka/camel/component/ActorComponent.scala | 12 +++++----- .../java/akka/camel/ConsumerJavaTestBase.java | 2 +- .../akka/camel/CamelServiceManagerTest.scala | 2 +- .../camel/ConsumerPublishRequestorTest.scala | 6 ++--- .../scala/akka/camel/ConsumerScalaTest.scala | 4 ++-- .../akka/camel/ProducerFeatureTest.scala | 6 ++--- .../camel/UntypedProducerFeatureTest.scala | 4 ++-- .../component/ActorComponentFeatureTest.scala | 11 +++++----- .../camel/component/ActorProducerTest.scala | 22 ++++++++----------- akka-spring/src/test/resources/akka-test.conf | 2 +- config/akka-reference.conf | 2 +- 14 files changed, 40 insertions(+), 44 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index b248c21..b1b4795 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -141,7 +141,10 @@ private[camel] object Consumer { */ def withConsumer[T](actorRef: ActorRef)(f: Consumer => T): Option[T] = { if (!actorRef.actor.isInstanceOf[Consumer]) None - else if (actorRef.homeAddress.isDefined) None + + // TODO: check if this is needed at all + //else if (actorRef.homeAddress.isDefined) None + else Some(f(actorRef.actor.asInstanceOf[Consumer])) } } diff --git a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala index d82c561..a556694 100644 --- a/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/akka/camel/ConsumerPublisher.scala @@ -20,8 +20,8 @@ import akka.event.EventHandler */ private[camel] class ConsumerPublishRequestor extends PublishRequestor { def receiveActorRegistryEvent = { - case ActorRegistered(actor) => for (event <- ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event) - case ActorUnregistered(actor) => for (event <- ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event) + case ActorRegistered(_, actor) => for (event <- ConsumerActorRegistered.eventFor(actor)) deliverCurrentEvent(event) + case ActorUnregistered(_, actor) => for (event <- ConsumerActorUnregistered.eventFor(actor)) deliverCurrentEvent(event) } } diff --git a/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala b/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala index 34d0fd7..2c061ff 100644 --- a/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala +++ b/akka-camel/src/main/scala/akka/camel/PublisherRequestor.scala @@ -54,7 +54,7 @@ private[camel] abstract class PublishRequestor extends Actor { * @author Martin Krasser */ private[camel] object PublishRequestor { - def pastActorRegisteredEvents = for (actor <- Actor.registry.actors) yield ActorRegistered(actor) + def pastActorRegisteredEvents = for (actor <- Actor.registry.local.actors) yield ActorRegistered(actor.address, actor) } /** diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index a08f3fd..b4777a9 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -206,12 +206,8 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn case null => uuid } - private def targetById(id: String) = Actor.registry.actorsFor(id) match { - case actors if actors.length == 0 => None - case actors => Some(actors(0)) - } - - private def targetByUuid(uuid: Uuid) = Actor.registry.actorFor(uuid) + private def targetById(id: String) = Actor.registry.local.actorFor(id) + private def targetByUuid(uuid: Uuid) = Actor.registry.local.actorFor(uuid) } /** @@ -268,6 +264,8 @@ private[akka] object AsyncCallbackAdapter { private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef { import akka.camel.Consumer._ + val address = exchange.getExchangeId + def start = { _status = ActorRefInternals.RUNNING this @@ -304,7 +302,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall def remoteAddress: Option[InetSocketAddress] = unsupported def link(actorRef: ActorRef): Unit = unsupported def unlink(actorRef: ActorRef): Unit = unsupported - def startLink(actorRef: ActorRef): Unit = unsupported + def startLink(actorRef: ActorRef): ActorRef = unsupported def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = unsupported def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index 6a3fd50..929f72c 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -25,7 +25,7 @@ public static void setUpBeforeClass() { @AfterClass public static void tearDownAfterClass() { stopCamelService(); - registry().shutdownAll(); + registry().local().shutdownAll(); } @Test diff --git a/akka-camel/src/test/scala/akka/camel/CamelServiceManagerTest.scala b/akka-camel/src/test/scala/akka/camel/CamelServiceManagerTest.scala index 3da38f7..f06c853 100644 --- a/akka-camel/src/test/scala/akka/camel/CamelServiceManagerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/CamelServiceManagerTest.scala @@ -12,7 +12,7 @@ class CamelServiceManagerTest extends WordSpec with BeforeAndAfterAll with MustM override def afterAll = { CamelServiceManager.stopCamelService - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } "A CamelServiceManager" when { diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala index c6de349..6da3a6d 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala @@ -28,12 +28,12 @@ class ConsumerPublishRequestorTest extends JUnitSuite { @After def tearDown = { Actor.registry.removeListener(requestor); - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } @Test def shouldReceiveOneConsumerRegisteredEvent = { val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get - requestor ! ActorRegistered(consumer) + requestor ! ActorRegistered(consumer.address, consumer) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher !! GetRetainedMessage) === Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer]))) @@ -41,7 +41,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerUnregisteredEvent = { val latch = (publisher !! SetExpectedTestMessageCount(1)).as[CountDownLatch].get - requestor ! ActorUnregistered(consumer) + requestor ! ActorUnregistered(consumer.address, consumer) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher !! GetRetainedMessage) === Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer]))) diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index 862b691..2c0fa0a 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -24,7 +24,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher var service: CamelService = _ override protected def beforeAll = { - registry.shutdownAll + registry.local.shutdownAll service = CamelServiceFactory.createCamelService // register test consumer before registering the publish requestor // and before starting the CamelService (registry is scanned for consumers) @@ -39,7 +39,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher override protected def afterAll = { service.stop - registry.shutdownAll + registry.local.shutdownAll } "A responding consumer" when { diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 813e8fa..6bb2495 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -6,13 +6,13 @@ import org.apache.camel.component.mock.MockEndpoint import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} import akka.actor.Actor._ -import akka.actor.{ActorRef, Actor, ActorRegistry} +import akka.actor.{ActorRef, Actor} class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen { import ProducerFeatureTest._ override protected def beforeAll = { - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll CamelContextManager.init CamelContextManager.mandatoryContext.addRoutes(new TestRoute) CamelContextManager.start @@ -20,7 +20,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before override protected def afterAll = { CamelContextManager.stop - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } override protected def afterEach = { diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala index 76f710e..10920da 100644 --- a/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala @@ -11,7 +11,7 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with import UntypedProducerFeatureTest._ override protected def beforeAll = { - registry.shutdownAll + registry.local.shutdownAll CamelContextManager.init CamelContextManager.mandatoryContext.addRoutes(new TestRoute) CamelContextManager.start @@ -19,7 +19,7 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with override protected def afterAll = { CamelContextManager.stop - registry.shutdownAll + registry.local.shutdownAll } override protected def afterEach = { diff --git a/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala index 4795aab..21ba3a3 100644 --- a/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/component/ActorComponentFeatureTest.scala @@ -16,7 +16,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with import ActorComponentFeatureTest._ override protected def beforeAll = { - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll CamelContextManager.init CamelContextManager.mandatoryContext.addRoutes(new TestRoute) CamelContextManager.start @@ -25,7 +25,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with override protected def afterAll = CamelContextManager.stop override protected def afterEach = { - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll mockEndpoint.reset } @@ -72,7 +72,7 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with scenario("one-way communication") { val actor = actorOf[Tester1].start val latch = (actor !! SetExpectedMessageCount(1)).as[CountDownLatch].get - mandatoryTemplate.sendBody("actor:%s" format actor.id, "Martin") + mandatoryTemplate.sendBody("actor:%s" format actor.address, "Martin") assert(latch.await(5000, TimeUnit.MILLISECONDS)) val reply = (actor !! GetRetainedMessage).get.asInstanceOf[Message] assert(reply.body === "Martin") @@ -80,11 +80,11 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with scenario("two-way communication") { val actor = actorOf[Tester2].start - assert(mandatoryTemplate.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin") + assert(mandatoryTemplate.requestBody("actor:%s" format actor.address, "Martin") === "Hello Martin") } scenario("two-way communication via a custom route") { - val actor = actorOf[CustomIdActor].start + val actor = actorOf[CustomIdActor]("custom-id").start assert(mandatoryTemplate.requestBody("direct:custom-id-test-1", "Martin") === "Received Martin") assert(mandatoryTemplate.requestBody("direct:custom-id-test-2", "Martin") === "Received Martin") } @@ -95,7 +95,6 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with object ActorComponentFeatureTest { class CustomIdActor extends Actor { - self.id = "custom-id" protected def receive = { case msg: Message => self.reply("Received %s" format msg.body) } diff --git a/akka-camel/src/test/scala/akka/camel/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/component/ActorProducerTest.scala index e9a4071..b72079f 100644 --- a/akka-camel/src/test/scala/akka/camel/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/component/ActorProducerTest.scala @@ -17,7 +17,7 @@ import akka.camel.CamelTestSupport._ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { import ActorProducerTest._ - @After def tearDown = registry.shutdownAll + @After def tearDown = registry.local.shutdownAll @Test def shouldSendMessageToActorWithSyncProcessor = { val actor = actorOf[Tester1].start @@ -106,20 +106,18 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { } @Test def shouldDynamicallyRouteMessageToActorWithDefaultId = { - val actor1 = actorOf[Tester1] - val actor2 = actorOf[Tester1] - actor1.id = "x" - actor2.id = "y" + val actor1 = actorOf[Tester1]("x") + val actor2 = actorOf[Tester1]("y") actor1.start actor2.start val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get val latch2 = (actor2 !! SetExpectedMessageCount(1)).as[CountDownLatch].get - val endpoint = actorEndpoint("actor:id:%s" format actor1.id) + val endpoint = actorEndpoint("actor:id:%s" format actor1.address) val exchange1 = endpoint.createExchange(ExchangePattern.InOnly) val exchange2 = endpoint.createExchange(ExchangePattern.InOnly) exchange1.getIn.setBody("Test1") exchange2.getIn.setBody("Test2") - exchange2.getIn.setHeader(ActorComponent.ActorIdentifier, actor2.id) + exchange2.getIn.setHeader(ActorComponent.ActorIdentifier, actor2.address) actorProducer(endpoint).process(exchange1) actorProducer(endpoint).process(exchange2) assert(latch1.await(5, TimeUnit.SECONDS)) @@ -131,10 +129,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { } @Test def shouldDynamicallyRouteMessageToActorWithoutDefaultId = { - val actor1 = actorOf[Tester1] - val actor2 = actorOf[Tester1] - actor1.id = "x" - actor2.id = "y" + val actor1 = actorOf[Tester1]("x") + val actor2 = actorOf[Tester1]("y") actor1.start actor2.start val latch1 = (actor1 !! SetExpectedMessageCount(1)).as[CountDownLatch].get @@ -144,8 +140,8 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll { val exchange2 = endpoint.createExchange(ExchangePattern.InOnly) exchange1.getIn.setBody("Test1") exchange2.getIn.setBody("Test2") - exchange1.getIn.setHeader(ActorComponent.ActorIdentifier, actor1.id) - exchange2.getIn.setHeader(ActorComponent.ActorIdentifier, actor2.id) + exchange1.getIn.setHeader(ActorComponent.ActorIdentifier, actor1.address) + exchange2.getIn.setHeader(ActorComponent.ActorIdentifier, actor2.address) actorProducer(endpoint).process(exchange1) actorProducer(endpoint).process(exchange2) assert(latch1.await(5, TimeUnit.SECONDS)) diff --git a/akka-spring/src/test/resources/akka-test.conf b/akka-spring/src/test/resources/akka-test.conf index 4deb00f..bb9c0e3 100644 --- a/akka-spring/src/test/resources/akka-test.conf +++ b/akka-spring/src/test/resources/akka-test.conf @@ -34,7 +34,7 @@ akka { dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down default-dispatcher { - type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable + type = "GlobalDispatcher" # Must be one of the following, all "Global*" are non-configurable # - ExecutorBasedEventDriven # - ExecutorBasedEventDrivenWorkStealing # - GlobalExecutorBasedEventDriven diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 8001ec8..f1f1452 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -35,7 +35,7 @@ akka { dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down default-dispatcher { - type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable + type = "GlobalDispatcher" # Must be one of the following, all "Global*" are non-configurable # - ExecutorBasedEventDriven # - ExecutorBasedEventDrivenWorkStealing # - GlobalExecutorBasedEventDriven