Permalink
Browse files

initial camel integration (early-access, see also http://doc.akkasour…

  • Loading branch information...
1 parent 1093451 commit bb202d476e78315a116e363aebf58f78c4f67d7d @krasserm krasserm committed Feb 25, 2010
View
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>akka-camel</artifactId>
+ <name>Akka Camel Module</name>
+
+ <packaging>jar</packaging>
+
+ <parent>
+ <artifactId>akka</artifactId>
+ <groupId>se.scalablesolutions.akka</groupId>
+ <version>0.7-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <!-- Core deps -->
+ <dependency>
+ <artifactId>akka-core</artifactId>
+ <groupId>${project.groupId}</groupId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jetty</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+
+ <!-- For Testing -->
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.5</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
@@ -0,0 +1 @@
+class=se.scalablesolutions.akka.camel.component.ActorComponent
@@ -0,0 +1,23 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package se.scalablesolutions.akka.camel
+
+import se.scalablesolutions.akka.actor.Actor
+
+/**
+ * Mixed in by Actor subclasses to be Camel endpoint consumers.
+ *
+ * @author Martin Krasser
+ */
+trait CamelConsumer {
+
+ self: Actor =>
+
+ /**
+ * Returns the Camel endpoint URI to consume messages from.
+ */
+ def endpointUri: String
+
+}
@@ -0,0 +1,166 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package se.scalablesolutions.akka.camel.component
+
+import java.lang.{RuntimeException, String}
+import java.util.{Map => JavaMap}
+
+import org.apache.camel.{Exchange, Consumer, Processor}
+import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
+
+import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
+
+/**
+ * Camel component for interacting with actors.
+ *
+ * @see se.scalablesolutions.akka.camel.component.ActorEndpoint
+ * @see se.scalablesolutions.akka.camel.component.ActorProducer
+ *
+ * @author Martin Krasser
+ */
+class ActorComponent extends DefaultComponent {
+
+ def createEndpoint(uri: String, remaining: String, parameters: JavaMap[String, Object]): ActorEndpoint = {
+ val idAndUuid = idAndUuidPair(remaining)
+ new ActorEndpoint(uri, this, idAndUuid._1, idAndUuid._2)
+ }
+
+ private def idAndUuidPair(remaining: String): Tuple2[String, Option[String]] = {
+ remaining split "/" toList match {
+ case id :: Nil => (id, None)
+ case id :: uuid :: Nil => (id, Some(uuid))
+ case _ => throw new IllegalArgumentException(
+ "invalid path format: %s - should be <actorid>[/<actoruuid>]" format remaining)
+ }
+ }
+
+}
+
+/**
+ * Camel endpoint for interacting with actors. An actor can be addressed by its
+ * <code>Actor.id</code> or by an <code>Actor.id</code> - <code>Actor.uuid</code>
+ * combination. The URI format is <code>actor://<actorid>[/<actoruuid>]</code>.
+ *
+ * @see se.scalablesolutions.akka.camel.component.ActorComponent
+ * @see se.scalablesolutions.akka.camel.component.ActorProducer
+
+ * @author Martin Krasser
+ */
+class ActorEndpoint(uri: String, comp: ActorComponent, val id: String, val uuid: Option[String]) extends DefaultEndpoint(uri, comp) {
+
+ // TODO: clarify uuid details
+ // - do they change after persist/restore
+ // - what about remote actors and uuids
+
+ /**
+ * @throws UnsupportedOperationException
+ */
+ def createConsumer(processor: Processor): Consumer =
+ throw new UnsupportedOperationException("actor consumer not supported yet")
+
+ def createProducer: ActorProducer = new ActorProducer(this)
+
+ def isSingleton: Boolean = true
+
+}
+
+/**
+ * Sends the in-message of an exchange to an actor. If the exchange pattern is out-capable,
+ * the producer waits for a reply (using the !! operator), otherwise the ! operator is used
+ * for sending the message. Asynchronous communication is not implemented yet but will be
+ * added for Camel components that support the Camel Async API (like the jetty component that
+ * makes use of Jetty continuations).
+ *
+ * @see se.scalablesolutions.akka.camel.component.ActorComponent
+ * @see se.scalablesolutions.akka.camel.component.ActorEndpoint
+ *
+ * @author Martin Krasser
+ */
+class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
+
+ implicit val sender = Some(Sender)
+
+ def process(exchange: Exchange) {
+ val actor = target getOrElse (throw new ActorNotRegisteredException(ep.id, ep.uuid))
+ if (exchange.getPattern.isOutCapable)
+ processInOut(exchange, actor)
+ else
+ processInOnly(exchange, actor)
+ }
+
+ override def start {
+ super.start
+ }
+
+ protected def receive = {
+ throw new UnsupportedOperationException
+ }
+
+ protected def processInOnly(exchange: Exchange, actor: Actor) {
+ actor ! exchange.getIn
+ }
+
+ protected def processInOut(exchange: Exchange, actor: Actor) {
+ val outmsg = exchange.getOut
+ // TODO: make timeout configurable
+ // TODO: send immutable message
+ // TODO: support asynchronous communication
+ // - jetty component: jetty continuations
+ // - file component: completion callbacks
+ val result: Any = actor !! exchange.getIn
+
+ result match {
+ case Some((body, headers:Map[String, Any])) => {
+ outmsg.setBody(body)
+ for (header <- headers)
+ outmsg.getHeaders.put(header._1, header._2.asInstanceOf[AnyRef])
+ }
+ case Some(body) => outmsg.setBody(body)
+ }
+ }
+
+ private def target: Option[Actor] = {
+ ActorRegistry.actorsFor(ep.id) match {
+ case actor :: Nil if targetMatchesUuid(actor) => Some(actor)
+ case Nil => None
+ case actors => actors find (targetMatchesUuid _)
+ }
+ }
+
+ private def targetMatchesUuid(target: Actor): Boolean =
+ // if ep.uuid is not defined always return true
+ target.uuid == (ep.uuid getOrElse target.uuid)
+
+}
+
+/**
+ * Generic message sender used by ActorProducer.
+ *
+ * @author Martin Krasser
+ */
+private[component] object Sender extends Actor {
+
+ start
+
+ /**
+ * Ignores any message.
+ */
+ protected def receive = {
+ case _ => { /* ignore any reply */ }
+ }
+
+}
+
+/**
+ * Thrown to indicate that an actor referenced by an endpoint URI cannot be
+ * found in the ActorRegistry.
+ *
+ * @author Martin Krasser
+ */
+class ActorNotRegisteredException(name: String, uuid: Option[String]) extends RuntimeException {
+
+ override def getMessage = "actor(id=%s,uuid=%s) not registered" format (name, uuid getOrElse "<none>")
+
+}
@@ -0,0 +1,23 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package se.scalablesolutions.akka.camel.service
+
+import org.apache.camel.CamelContext
+import org.apache.camel.impl.DefaultCamelContext
+
+/**
+ * Manages the CamelContext used by CamelService.
+ *
+ * @author Martin Krasser
+ */
+object CamelContextManager {
+
+ /**
+ * The CamelContext used by CamelService. Can be modified by applications prior to
+ * loading the CamelService.
+ */
+ var context: CamelContext = new DefaultCamelContext
+
+}
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package se.scalablesolutions.akka.camel.service
+
+import java.io.InputStream
+
+import org.apache.camel.builder.RouteBuilder
+
+import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
+import se.scalablesolutions.akka.annotation.consume
+import se.scalablesolutions.akka.camel.CamelConsumer
+import se.scalablesolutions.akka.util.{Bootable, Logging}
+
+/**
+ * Started by the Kernel to expose actors as Camel endpoints.
+ *
+ * @see CamelRouteBuilder
+ *
+ * @author Martin Krasser
+ */
+trait CamelService extends Bootable with Logging {
+
+ import CamelContextManager.context
+
+ abstract override def onLoad = {
+ super.onLoad
+ context.addRoutes(new CamelRouteBuilder)
+ context.setStreamCaching(true)
+ context.start
+ log.info("Camel context started")
+ }
+
+ abstract override def onUnload = {
+ super.onUnload
+ context.stop
+ log.info("Camel context stopped")
+ }
+
+}
+
+/**
+ * Generic route builder that searches the registry for actors that are
+ * either annotated with @se.scalablesolutions.akka.annotation.consume or
+ * mixed in se.scalablesolutions.akka.camel.CamelConsumer and exposes them
+ * as Camel endpoints.
+ *
+ * @author Martin Krasser
+ */
+class CamelRouteBuilder extends RouteBuilder with Logging {
+
+ def configure = {
+ val actors = ActorRegistry.actors
+
+ //
+ // TODO: resolve/clarify issues with ActorRegistry
+ // - custom Actor.id ignored
+ // - actor de-registration issues
+ // - multiple registration with same id/uuid possible
+ //
+
+ // TODO: avoid redundant registrations
+ actors.filter(isConsumeAnnotated _).foreach { actor: Actor =>
+ val fromUri = actor.getClass.getAnnotation(classOf[consume]).value()
+ configure(fromUri, "actor://%s" format actor.id)
+ log.debug("registered actor (id=%s) for consuming messages from %s "
+ format (actor.id, fromUri))
+ }
+
+ // TODO: avoid redundant registrations
+ actors.filter(isConsumerInstance _).foreach { actor: Actor =>
+ val fromUri = actor.asInstanceOf[CamelConsumer].endpointUri
+ configure(fromUri, "actor://%s/%s" format (actor.id, actor.uuid))
+ log.debug("registered actor (id=%s, uuid=%s) for consuming messages from %s "
+ format (actor.id, actor.uuid, fromUri))
+ }
+ }
+
+ private def configure(fromUri: String, toUri: String) {
+ val schema = fromUri take fromUri.indexOf(":") // e.g. "http" from "http://whatever/..."
+ bodyConversions.get(schema) match {
+ case Some(clazz) => from(fromUri).convertBodyTo(clazz).to(toUri)
+ case None => from(fromUri).to(toUri)
+ }
+ }
+
+ // TODO: make conversions configurable
+ private def bodyConversions = Map(
+ "file" -> classOf[InputStream]
+ )
+
+ private def isConsumeAnnotated(actor: Actor) =
+ actor.getClass.getAnnotation(classOf[consume]) ne null
+
+ private def isConsumerInstance(actor: Actor) =
+ actor.isInstanceOf[CamelConsumer]
+
+}
Oops, something went wrong.

0 comments on commit bb202d4

Please sign in to comment.