Permalink
Browse files

Processed review wip-camel pull request 344

  • Loading branch information...
1 parent 4d6511c commit f74616f828d3e31724d768dd86ce05af85d97ade @RayRoestenburg RayRoestenburg committed Mar 1, 2012
Showing with 544 additions and 1,992 deletions.
  1. +0 −34 akka-camel-typed/src/main/java/akka/camel/consume.java
  2. +0 −1 akka-camel-typed/src/main/resources/META-INF/services/org/apache/camel/component/typed-actor
  3. +0 −55 akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala
  4. +0 −59 akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala
  5. +0 −139 akka-camel-typed/src/main/scala/akka/camel/TypedConsumerPublisher.scala
  6. +0 −85 akka-camel-typed/src/main/scala/akka/camel/component/TypedActorComponent.scala
  7. +0 −11 akka-camel-typed/src/test/java/akka/camel/SampleErrorHandlingTypedConsumer.java
  8. +0 −12 akka-camel-typed/src/test/java/akka/camel/SampleErrorHandlingTypedConsumerImpl.java
  9. +0 −12 akka-camel-typed/src/test/java/akka/camel/SampleRemoteTypedConsumer.java
  10. +0 −12 akka-camel-typed/src/test/java/akka/camel/SampleRemoteTypedConsumerImpl.java
  11. +0 −14 akka-camel-typed/src/test/java/akka/camel/SampleRouteDefinitionHandler.java
  12. +0 −9 akka-camel-typed/src/test/java/akka/camel/SampleTypedActor.java
  13. +0 −14 akka-camel-typed/src/test/java/akka/camel/SampleTypedActorImpl.java
  14. +0 −20 akka-camel-typed/src/test/java/akka/camel/SampleTypedConsumer.java
  15. +0 −28 akka-camel-typed/src/test/java/akka/camel/SampleTypedConsumerImpl.java
  16. +0 −13 akka-camel-typed/src/test/java/akka/camel/SampleTypedSingleConsumer.java
  17. +0 −11 akka-camel-typed/src/test/java/akka/camel/SampleTypedSingleConsumerImpl.java
  18. +0 −52 akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java
  19. +0 −66 akka-camel-typed/src/test/scala/akka/camel/TypedCamelTestSupport.scala
  20. +0 −5 akka-camel-typed/src/test/scala/akka/camel/TypedConsumerJavaTest.scala
  21. +0 −104 akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala
  22. +0 −99 akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala
  23. +0 −113 akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala
  24. +27 −6 akka-camel/src/main/scala/akka/camel/Activation.scala
  25. +5 −26 akka-camel/src/main/scala/akka/camel/Camel.scala
  26. +44 −46 akka-camel/src/main/scala/akka/camel/{Message.scala → CamelMessage.scala}
  27. +14 −14 akka-camel/src/main/scala/akka/camel/Consumer.scala
  28. +12 −17 akka-camel/src/main/scala/akka/camel/Producer.scala
  29. +42 −13 akka-camel/src/main/scala/akka/camel/internal/ActivationTracker.scala
  30. +20 −20 akka-camel/src/main/scala/akka/camel/internal/CamelExchangeAdapter.scala
  31. +60 −25 akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala
  32. +9 −3 akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala
  33. +17 −18 akka-camel/src/main/scala/akka/camel/internal/ProducerRegistry.scala
  34. +0 −53 akka-camel/src/main/scala/akka/camel/internal/Try.scala
  35. +93 −71 akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala
  36. +14 −2 akka-camel/src/main/scala/akka/camel/javaapi/UntypedConsumer.scala
  37. +4 −11 akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala
  38. +3 −2 akka-camel/src/main/scala/akka/camelexamples/ExamplesSupport.scala
  39. +0 −1 akka-camel/src/main/scala/akka/package.scala
  40. +0 −79 akka-camel/src/on-hold/test/scala/akka/camel/CamelTestSupport.scala
  41. +0 −130 akka-camel/src/on-hold/test/scala/akka/camel/component/ActorComponentFeatureTest.scala
  42. +0 −243 akka-camel/src/on-hold/test/scala/akka/camel/component/ActorProducerTest.scala
  43. +14 −15 akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java
  44. +2 −2 akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java
  45. +3 −3 akka-camel/src/test/java/akka/camel/SampleUntypedConsumer.java
  46. +2 −2 akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java
  47. +6 −5 akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala
  48. +8 −4 akka-camel/src/test/scala/akka/camel/CamelExchangeAdapterTest.scala
  49. +5 −6 akka-camel/src/test/scala/akka/camel/{MessageTest.scala → CamelMessageTest.scala}
  50. +35 −23 akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala
  51. +3 −3 akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala
  52. +55 −65 akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala
  53. +2 −2 akka-camel/src/test/scala/akka/camel/TestSupport.scala
  54. +7 −7 akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala
  55. +0 −63 akka-camel/src/test/scala/akka/camel/internal/TryTest.scala
  56. +38 −44 akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala
@@ -1,34 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
- */
-
-package akka.camel;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation used by implementations of {@link akka.actor.TypedActor}
- * (on method-level) to define consumer endpoints.
- *
- * @author Martin Krasser
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.METHOD})
-public @interface consume {
-
- /**
- * Consumer endpoint URI
- */
- public abstract String value();
-
- /**
- * Route definition handler class for customizing route to annotated method.
- * The handler class must have a default constructor.
- */
- public abstract Class<? extends RouteDefinitionHandler> routeDefinitionHandler()
- default RouteDefinitionIdentity.class;
-
-}
@@ -1 +0,0 @@
-class=akka.camel.component.TypedActorComponent
@@ -1,55 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
- */
-
-package akka.camel
-
-import org.apache.camel.CamelContext
-
-import akka.actor.Actor._
-import akka.actor._
-import akka.camel.component.TypedActorComponent
-
-/**
- * Module that adds typed consumer actor support to akka-camel. It is automatically
- * detected by CamelService if added to the classpath.
- *
- * @author Martin Krasser
- */
-private[camel] object TypedCamel {
- private var consumerPublisher: ActorRef = _
- private var publishRequestor: ActorRef = _
-
- /**
- * Adds the <code>TypedActorComponent</code> to <code>context</code>.
- */
- def onCamelContextInit(context: CamelContext) {
- context.addComponent(TypedActorComponent.InternalSchema, new TypedActorComponent)
- }
-
- /**
- * Configures a <code>TypedConsumerPublishRequestor</code> and a <code>TypedConsumerPublisher</code>
- * and re-uses the <code>activationTracker</code> of <code>service</code>.
- */
- def onCamelServiceStart(service: CamelService) {
- consumerPublisher = new LocalActorRef(Props(new TypedConsumerPublisher(service.activationTracker)), Props.randomName, true)
- publishRequestor = new LocalActorRef(Props(new TypedConsumerPublishRequestor), Props.randomName, true)
-
- registerPublishRequestor
-
- for (event PublishRequestor.pastActorRegisteredEvents) publishRequestor ! event
- publishRequestor ! InitPublishRequestor(consumerPublisher)
- }
-
- /**
- * Stops the configured Configures <code>TypedConsumerPublishRequestor</code> and
- * <code>TypedConsumerPublisher</code>.
- */
- def onCamelServiceStop(service: CamelService) {
- unregisterPublishRequestor
- consumerPublisher.stop
- }
-
- private def registerPublishRequestor: Unit = registry.addListener(publishRequestor)
- private def unregisterPublishRequestor: Unit = registry.removeListener(publishRequestor)
-}
@@ -1,59 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
- */
-
-package akka.camel
-
-import java.lang.reflect.Method
-import java.lang.reflect.Proxy._
-
-import akka.actor.{ LocalActorRef, TypedActor, ActorRef }
-import akka.actor.TypedActor._
-
-/**
- * @author Martin Krasser
- */
-private[camel] object TypedConsumer {
-
- /**
- * Applies a function <code>f</code> to <code>actorRef</code> if <code>actorRef</code>
- * references a typed consumer actor. A valid reference to a typed consumer actor is a
- * local actor reference with a target actor that implements <code>TypedActor</code> and
- * has at least one of its methods annotated with <code>@consume</code> (on interface or
- * implementation class). For each <code>@consume</code>-annotated method, <code>f</code>
- * is called with the corresponding <code>method</code> instance and the return value is
- * added to a list which is then returned by this method.
- */
- def withTypedConsumer[T](actorRef: ActorRef, typedActor: Option[AnyRef])(f: (AnyRef, Method) T): List[T] = {
- typedActor match {
- case None Nil
- case Some(tc) {
- withConsumeAnnotatedMethodsOnInterfaces(tc, f) ++
- withConsumeAnnotatedMethodsonImplClass(tc, actorRef, f)
- }
- }
- }
-
- private implicit def class2ProxyClass(c: Class[_]) = new ProxyClass(c)
-
- private def withConsumeAnnotatedMethodsOnInterfaces[T](tc: AnyRef, f: (AnyRef, Method) T): List[T] = for {
- i tc.getClass.allInterfaces
- m i.getDeclaredMethods.toList
- if (m.isAnnotationPresent(classOf[consume]))
- } yield f(tc, m)
-
- private def withConsumeAnnotatedMethodsonImplClass[T](tc: AnyRef, actorRef: ActorRef, f: (AnyRef, Method) T): List[T] = actorRef match {
- case l: LocalActorRef
- val implClass = l.underlyingActorInstance.asInstanceOf[TypedActor.TypedActor[AnyRef, AnyRef]].me.getClass
- for (m implClass.getDeclaredMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(tc, m)
- case _ Nil
- }
-
- private class ProxyClass(c: Class[_]) {
- def allInterfaces: List[Class[_]] = allInterfaces(c.getInterfaces.toList)
- def allInterfaces(is: List[Class[_]]): List[Class[_]] = is match {
- case Nil Nil
- case x :: xs x :: allInterfaces(x.getInterfaces.toList) ::: allInterfaces(xs)
- }
- }
-}
@@ -1,139 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
- */
-
-package akka.camel
-
-import java.lang.reflect.Method
-
-import akka.actor._
-import akka.camel.component.TypedActorComponent
-import akka.event.EventHandler
-
-/**
- * Concrete publish requestor that requests publication of typed consumer actor methods on
- * <code>TypedActorRegistered</code> events and unpublication of typed consumer actor methods on
- * <code>TypedActorUnregistered</code> events.
- *
- * @author Martin Krasser
- */
-private[camel] class TypedConsumerPublishRequestor extends PublishRequestor {
- def receiveActorRegistryEvent = {
- case TypedActorRegistered(_, actor, typedActor) for (event ConsumerMethodRegistered.eventsFor(actor, Option(typedActor))) deliverCurrentEvent(event)
- case TypedActorUnregistered(_, actor, typedActor) for (event ConsumerMethodUnregistered.eventsFor(actor, Option(typedActor))) deliverCurrentEvent(event)
- case _ ()
- }
-}
-
-/**
- * Publishes a typed consumer actor method on <code>ConsumerMethodRegistered</code> events and
- * unpublishes a typed consumer actor method on <code>ConsumerMethodUnregistered</code> events.
- * Publications are tracked by sending an <code>activationTracker</code> an <code>EndpointActivated</code>
- * event, unpublications are tracked by sending an <code>EndpointActivated</code> event.
- *
- * @author Martin Krasser
- */
-private[camel] class TypedConsumerPublisher(activationTracker: ActorRef) extends Actor {
- import TypedConsumerPublisher._
-
- def receive = {
- case mr: ConsumerMethodRegistered {
- handleConsumerMethodRegistered(mr)
- activationTracker ! EndpointActivated
- }
- case mu: ConsumerMethodUnregistered {
- handleConsumerMethodUnregistered(mu)
- activationTracker ! EndpointDeactivated
- }
- case _ { /* ignore */ }
- }
-}
-
-/**
- * @author Martin Krasser
- */
-private[camel] object TypedConsumerPublisher {
- /**
- * Creates a route to a typed actor method.
- */
- def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
- CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event))
- EventHandler.info(this, "published method %s of %s at endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
- }
-
- /**
- * Stops the route to the already un-registered typed consumer actor method.
- */
- def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) {
- CamelContextManager.mandatoryContext.stopRoute(event.methodUuid)
- EventHandler.info(this, "unpublished method %s of %s from endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
- }
-}
-
-/**
- * Builder of a route to a typed consumer actor method.
- *
- * @author Martin Krasser
- */
-private[camel] class ConsumerMethodRouteBuilder(event: ConsumerMethodRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.methodUuid) {
- protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler
- protected def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, event.methodUuid, event.methodName)
-}
-
-/**
- * A typed consumer method (un)registration event.
- */
-private[camel] trait ConsumerMethodEvent extends ConsumerEvent {
- val actorRef: ActorRef
- val typedActor: AnyRef
- val method: Method
-
- val methodName = method.getName
- val methodUuid = "%s_%s" format (uuid, methodName)
-
- lazy val routeDefinitionHandler = consumeAnnotation.routeDefinitionHandler.newInstance
- lazy val consumeAnnotation = method.getAnnotation(classOf[consume])
- lazy val endpointUri = consumeAnnotation.value
-}
-
-/**
- * Event indicating that a typed consumer actor has been registered at the actor registry. For
- * each <code>@consume</code> annotated typed actor method a separate event is created.
- */
-private[camel] case class ConsumerMethodRegistered(actorRef: ActorRef, typedActor: AnyRef, method: Method) extends ConsumerMethodEvent
-
-/**
- * Event indicating that a typed consumer actor has been unregistered from the actor registry. For
- * each <code>@consume</code> annotated typed actor method a separate event is created.
- */
-private[camel] case class ConsumerMethodUnregistered(actorRef: ActorRef, typedActor: AnyRef, method: Method) extends ConsumerMethodEvent
-
-/**
- * @author Martin Krasser
- */
-private[camel] object ConsumerMethodRegistered {
- /**
- * Creates a list of ConsumerMethodRegistered event messages for a typed consumer actor or an empty
- * list if <code>actorRef</code> doesn't reference a typed consumer actor.
- */
- def eventsFor(actorRef: ActorRef, typedActor: Option[AnyRef]): List[ConsumerMethodRegistered] = {
- TypedConsumer.withTypedConsumer(actorRef, typedActor) { (tc, m)
- ConsumerMethodRegistered(actorRef, tc, m)
- }
- }
-}
-
-/**
- * @author Martin Krasser
- */
-private[camel] object ConsumerMethodUnregistered {
- /**
- * Creates a list of ConsumerMethodUnregistered event messages for a typed consumer actor or an empty
- * list if <code>actorRef</code> doesn't reference a typed consumer actor.
- */
- def eventsFor(actorRef: ActorRef, typedActor: Option[AnyRef]): List[ConsumerMethodUnregistered] = {
- TypedConsumer.withTypedConsumer(actorRef, typedActor) { (tc, m)
- ConsumerMethodUnregistered(actorRef, tc, m)
- }
- }
-}
@@ -1,85 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>
- */
-
-package akka.camel.component
-
-import java.util.Map
-import java.util.concurrent.ConcurrentHashMap
-
-import org.apache.camel.CamelContext
-import org.apache.camel.component.bean._
-
-import akka.actor._
-
-/**
- * @author Martin Krasser
- */
-object TypedActorComponent {
- /**
- * Default schema name for typed actor endpoint URIs.
- */
- val InternalSchema = "typed-actor-internal"
-}
-
-/**
- * Camel component for exchanging messages with typed actors. This component
- * tries to obtain the typed actor from <code>Actor.registry</code> if the
- * schema is <code>TypedActorComponent.InternalSchema</code>. If the schema
- * name is <code>typed-actor</code> this component tries to obtain the typed
- * actor from the CamelContext's registry.
- *
- * @see org.apache.camel.component.bean.BeanComponent
- *
- * @author Martin Krasser
- */
-class TypedActorComponent extends BeanComponent {
- val typedActorRegistry = new ConcurrentHashMap[String, AnyRef]
-
- /**
- * Creates an <code>org.apache.camel.component.bean.BeanEndpoint</code> with a custom
- * bean holder that uses <code>Actor.registry</code> for getting access to typed actors
- * (beans).
- *
- * @see akka.camel.component.TypedActorHolder
- */
- override def createEndpoint(uri: String, remaining: String, parameters: Map[String, AnyRef]) = {
- val endpoint = new BeanEndpoint(uri, this)
- endpoint.setBeanName(remaining)
- endpoint.setBeanHolder(createBeanHolder(uri, remaining))
- setProperties(endpoint.getProcessor, parameters)
- endpoint
- }
-
- private def createBeanHolder(uri: String, beanName: String) =
- new TypedActorHolder(uri, getCamelContext, beanName).createCacheHolder
-}
-
-/**
- * <code>org.apache.camel.component.bean.BeanHolder</code> implementation that uses
- * <code>Actor.registry</code> for getting access to typed actors.
- *
- * @author Martin Krasser
- */
-class TypedActorHolder(uri: String, context: CamelContext, name: String)
- extends RegistryBean(context, name) {
-
- /**
- * Returns an <code>akka.camel.component.BeanInfo</code> instance.
- */
- override def getBeanInfo: BeanInfo =
- new BeanInfo(getContext, getBean.getClass, getParameterMappingStrategy)
-
- /**
- * Obtains a typed actor from <code>Actor.registry</code> if the schema is
- * <code>TypedActorComponent.InternalSchema</code>. If the schema name is
- * <code>typed-actor</code> this method obtains the typed actor from the
- * CamelContext's registry.
- *
- * @return a typed actor or <code>null</code>.
- */
- override def getBean: AnyRef = {
- val internal = uri.startsWith(TypedActorComponent.InternalSchema)
- if (internal) Actor.registry.local.typedActorFor(uuidFrom(getName)) getOrElse null else super.getBean
- }
-}
@@ -1,11 +0,0 @@
-package akka.camel;
-
-/**
- * @author Martin Krasser
- */
-public interface SampleErrorHandlingTypedConsumer {
-
- @consume(value="direct:error-handler-test-java-typed", routeDefinitionHandler=SampleRouteDefinitionHandler.class)
- String willFail(String s);
-
-}
Oops, something went wrong.

0 comments on commit f74616f

Please sign in to comment.