Permalink
Browse files

First pass at Mongo Durable Mailboxes in a "Naive" approach. Some bug…

…s in Hammersmith currently surfacing that need resolving.
  • Loading branch information...
1 parent 0524d97 commit 73edd8e6d3a43eaac3319062c80189be6ab35742 @bwmcadams bwmcadams committed Jul 6, 2011
@@ -0,0 +1,110 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+package akka.actor.mailbox
+
+import akka.actor.{Actor, ActorRef, NullChannel}
+import akka.config.Config.config
+import akka.dispatch._
+import akka.event.EventHandler
+import akka.AkkaException
+import akka.remote.MessageSerializer
+import akka.remote.protocol.RemoteProtocol.MessageProtocol
+
+import MailboxProtocol._
+
+import com.mongodb.async._
+
+import org.bson.util._
+import org.bson.io.{BasicOutputBuffer, OutputBuffer}
+import org.bson.types.ObjectId
+import java.io.{ByteArrayInputStream, InputStream}
+
+import org.bson._
+import org.bson.collection._
+
+object BSONSerializableMailbox extends SerializableBSONObject[MongoDurableMessage] with Logging {
+
+ protected[akka] def serializeDurableMsg(msg: MongoDurableMessage)(implicit serializer: BSONSerializer) = {
+ EventHandler.debug(this, "Serializing a durable message to MongoDB: %s".format(msg))
+ val msgData = MessageSerializer.serialize(msg.message.asInstanceOf[AnyRef])
+ EventHandler.debug(this, "Serialized Message: %s".format(msgData))
+
+ // TODO - Skip the whole map creation step for performance, fun, and profit! (Needs Salat)
+ val b = Map.newBuilder[String, Any]
+ b += "_id" -> msg._id
+ b += "ownerAddress" -> msg.ownerAddress
+
+ msg.channel match {
+ case a : ActorRef => { b += "senderAddress" -> a.address }
+ case _ =>
+ }
+ /**
+ * TODO - Figure out a way for custom serialization of the message instance
+ * TODO - Test if a serializer is registered for the message and if not, use toByteString
+ */
+ b += "message" -> new org.bson.types.Binary(0, msgData.toByteArray)
+ val doc = b.result
+ EventHandler.debug(this, "Serialized Document: %s".format(doc))
+ serializer.putObject(doc)
+ }
+
+ /*
+ * TODO - Implement some object pooling for the Encoders/decoders
+ */
+ def encode(msg: MongoDurableMessage, out: OutputBuffer) = {
+ implicit val serializer = new DefaultBSONSerializer
+ serializer.set(out)
+ serializeDurableMsg(msg)
+ serializer.done
+ }
+
+ def encode(msg: MongoDurableMessage): Array[Byte] = {
+ implicit val serializer = new DefaultBSONSerializer
+ val buf = new BasicOutputBuffer
+ serializer.set(buf)
+ serializeDurableMsg(msg)
+ val bytes = buf.toByteArray
+ serializer.done
+ bytes
+ }
+
+ def decode(in: InputStream): MongoDurableMessage = {
+ val deserializer = new DefaultBSONDeserializer
+ // TODO - Skip the whole doc step for performance, fun, and profit! (Needs Salat / custom Deser)
+ val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument]
+ EventHandler.debug(this, "Deserializing a durable message from MongoDB: %s".format(doc))
+ val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData)
+ val msg = MessageSerializer.deserialize(msgData)
+ val ownerAddress = doc.as[String]("ownerAddress")
+ val owner = Actor.registry.actorFor(ownerAddress).getOrElse(
+ throw new DurableMailboxException("No actor could be found for address [" + ownerAddress + "], could not deserialize message."))
+
+ val senderOption = if (doc.contains("senderAddress")) {
+ Actor.registry.actorFor(doc.as[String]("senderAddress"))
+ } else None
+
+ val sender = senderOption match {
+ case Some(ref) => ref
+ case None => NullChannel
+ }
+
+ MongoDurableMessage(ownerAddress, owner, msg, sender)
+ }
+
+ def checkObject(msg: MongoDurableMessage, isQuery: Boolean = false) = {} // object expected to be OK with this message type.
+
+ def checkKeys(msg: MongoDurableMessage) {} // keys expected to be OK with this message type.
+
+ /**
+ * Checks for an ID and generates one.
+ * Not all implementers will need this, but it gets invoked nonetheless
+ * as a signal to BSONDocument, etc implementations to verify an id is there
+ * and generate one if needed.
+ */
+ def checkID(msg: MongoDurableMessage) = {} // OID already generated in wrapper message
+
+ def _id(msg: MongoDurableMessage): Option[AnyRef] = Some(msg._id)
+}
+
+// vim: set ts=2 sw=2 sts=2 et:
@@ -0,0 +1,120 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+package akka.actor.mailbox
+
+import akka.actor.ActorRef
+import akka.config.Config.config
+import akka.dispatch._
+import akka.event.EventHandler
+import akka.AkkaException
+
+import MailboxProtocol._
+
+import com.mongodb.async._
+import com.mongodb.async.futures.RequestFutures
+import org.bson.collection._
+
+class MongoBasedMailboxException(message: String) extends AkkaException(message)
+
+/**
+ * A "naive" durable mailbox which uses findAndRemove; it's possible if the actor crashes
+ * after consuming a message that the message could be lost.
+ *
+ * Does not use the Protobuf protocol, instead using a pure Mongo based serialization for sanity
+ * (and mongo-iness).
+ *
+ * TODO - Integrate Salat or a Salat-Based solution for the case classiness
+ *
+ * @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
+ */
+class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) {
+ // this implicit object provides the context for reading/writing things as MongoDurableMessage
+ implicit val mailboxBSONSer = BSONSerializableMailbox
+ implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
+
+ val mongoConfig = config.getList("akka.mailbox.actor.mailbox.mongodb")// need an explicit definition in akka-conf
+
+ @volatile private var db = connect() //review Is the Redis connection thread safe?
+ private val collName = "akka_mailbox.%s".format(name)
+
+ def enqueue(msg: MessageInvocation) = {
+ EventHandler.debug(this,
+ "\nENQUEUING message in mongodb-based mailbox [%s]".format(msg))
+ /* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */
+ val durableMessage = MongoDurableMessage(ownerAddress, msg.receiver, msg.message, msg.channel)
+ // todo - do we need to filter the actor name at all for safe collection naming?
+ val result = new DefaultPromise[Boolean](3000) // give the write 3 seconds to succeed ... should we wait infinitely (does akka allow it?)
+ db.insert(collName)(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)] => wr match {
+ case Right((oid, wr)) => result.completeWithResult(true)
+ case Left(t) => result.completeWithException(t)
+ }})
+
+ result.get
+ }
+
+ def dequeue: MessageInvocation = withErrorHandling {
+ /**
+ * Retrieves first item in natural order (oldest first, assuming no modification/move)
+ * Waits 3 seconds for now for a message, else pops back out.
+ * TODO - How do we handle fetch, but sleep if nothing is in there cleanly?
+ * TODO - Should we have a specific query in place? Which way do we sort?
+ * TODO - Error handling version!
+ */
+ val msgInvocation = new DefaultPromise[MessageInvocation](3000)
+ db.findAndRemove(collName)(Document.empty) { msg: MongoDurableMessage =>
+ EventHandler.debug(this,
+ "\nDEQUEUING message in mongo-based mailbox [%s]".format(msg))
+ msgInvocation.completeWithResult(msg.messageInvocation())
+ EventHandler.debug(this,
+ "\nDEQUEUING messageInvocation in mongo-based mailbox [%s]".format(msgInvocation))
+ }
+ msgInvocation.get
+ }
+
+ def size: Int = {
+ val count = new DefaultPromise[Int](3000)
+ db.count(collName)()(count.completeWithResult)
+ count.get
+ }
+
+
+ def isEmpty: Boolean = size == 0 //TODO review find other solution, this will be very expensive
+
+ private[akka] def connect() = {
+ EventHandler.debug(this,
+ "\nCONNECTING mongodb { config: [%s] } ".format(mongoConfig))
+ MongoConnection("localhost", 27017)("akka")
+ /*nodes match {
+ case Seq() =>
+ // no cluster defined
+ new RedisClient(
+ config.getString("akka.actor.mailbox.redis.hostname", "127.0.0.1"),
+ config.getInt("akka.actor.mailbox.redis.port", 6379))
+
+ case s =>
+ // with cluster
+ import com.redis.cluster._
+ EventHandler.info(this, "Running on Redis cluster")
+ new RedisCluster(nodes: _*) {
+ val keyTag = Some(NoOpKeyTag)
+ }
+ }*/
+ }
+
+ private def withErrorHandling[T](body: => T): T = {
+ try {
+ body
+ } catch {
+ case e: Exception => {
+ db = connect()
+ body
+ }
+ case e => {
+ val error = new MongoBasedMailboxException("Could not connect to MongoDB server")
+ EventHandler.error(error, this, "Could not connect to MongoDB server")
+ throw error
+ }
+ }
+ }
+}
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+package akka.actor.mailbox
+
+import akka.actor.{ActorRef, UntypedChannel, NullChannel}
+import akka.config.Config.config
+import akka.dispatch._
+import akka.event.EventHandler
+import akka.AkkaException
+
+import MailboxProtocol._
+
+import com.mongodb.async._
+
+import org.bson.util._
+import org.bson.io.OutputBuffer
+import org.bson.types.ObjectId
+import java.io.InputStream
+
+import org.bson.collection._
+
+/**
+ * A container message for durable mailbox messages, which can be easily stuffed into
+ * and out of MongoDB.
+ *
+ * Does not use the Protobuf protocol, instead using a pure Mongo based serialization for sanity
+ * (and mongo-iness).
+ *
+ * This should eventually branch out into a more flat, compound solution for all remote actor stuff
+ * TODO - Integrate Salat or a Salat-Based solution for the case classiness
+ *
+ * @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
+ */
+case class MongoDurableMessage(val ownerAddress: String,
+ val receiver: ActorRef,
+ val message: Any,
+ val channel: UntypedChannel,
+ val _id: ObjectId = new ObjectId) {
+
+ def messageInvocation() = MessageInvocation(this.receiver, this.message, this.channel)
+}
+
+// vim: set ts=2 sw=2 sts=2 et:
@@ -0,0 +1,65 @@
+package akka.actor.mailbox
+
+import java.util.concurrent.TimeUnit
+
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
+
+import akka.actor._
+import akka.actor.Actor._
+import java.util.concurrent.CountDownLatch
+import akka.config.Supervision.Temporary
+import akka.dispatch.MessageDispatcher
+
+class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoNaiveDurableMailboxStorage)
+
+/*object DurableMongoMailboxSpecActorFactory {
+
+ class MongoMailboxTestActor extends Actor {
+ self.lifeCycle = Temporary
+ def receive = {
+ case "sum" => self.reply("sum")
+ }
+ }
+
+ def createMongoMailboxTestActor(id: String)(implicit dispatcher: MessageDispatcher): ActorRef = {
+ val queueActor = localActorOf[MongoMailboxTestActor]
+ queueActor.dispatcher = dispatcher
+ queueActor.start
+ }
+}*/
+
+/*class MongoBasedMailboxSpec extends WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll {
+ import DurableMongoMailboxSpecActorFactory._
+
+ implicit val dispatcher = DurableDispatcher("mongodb", MongoNaiveDurableMailboxStorage, 1)
+
+ "A MongoDB based naive mailbox backed actor" should {
+ "should handle reply to ! for 1 message" in {
+ val latch = new CountDownLatch(1)
+ val queueActor = createMongoMailboxTestActor("mongoDB Backend should handle Reply to !")
+ val sender = localActorOf(new Actor { def receive = { case "sum" => latch.countDown } }).start
+
+ queueActor.!("sum")(Some(sender))
+ latch.await(10, TimeUnit.SECONDS) must be (true)
+ }
+
+ "should handle reply to ! for multiple messages" in {
+ val latch = new CountDownLatch(5)
+ val queueActor = createMongoMailboxTestActor("mongoDB Backend should handle reply to !")
+ val sender = localActorOf( new Actor { def receive = { case "sum" => latch.countDown } } ).start
+
+ queueActor.!("sum")(Some(sender))
+ queueActor.!("sum")(Some(sender))
+ queueActor.!("sum")(Some(sender))
+ queueActor.!("sum")(Some(sender))
+ queueActor.!("sum")(Some(sender))
+ latch.await(10, TimeUnit.SECONDS) must be (true)
+ }
+ }
+
+ override def beforeEach() {
+ registry.local.shutdownAll
+ }
+}*/

0 comments on commit 73edd8e

Please sign in to comment.