Browse files

Merge pull request #429 from akka/wip-935-durable-mailbox-tests-√

Wip 935 durable mailbox tests √
  • Loading branch information...
2 parents b443649 + 573892d commit e20252e09459d867fcecf69f72839b3391227e6e @viktorklang viktorklang committed May 2, 2012
View
4 .../akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala
@@ -1,5 +1,7 @@
package akka.actor.mailbox
+import akka.dispatch.Mailbox
+
object BeanstalkBasedMailboxSpec {
val config = """
Beanstalkd-dispatcher {
@@ -25,6 +27,6 @@ class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", Beansta
}
override def atTermination(): Unit = beanstalkd.destroy()
-
+ def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[BeanstalkBasedMessageQueue]
}
View
3 ...-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala
@@ -2,6 +2,7 @@ package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import com.typesafe.config.ConfigFactory
+import akka.dispatch.Mailbox
object FileBasedMailboxSpec {
val config = """
@@ -24,6 +25,8 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp
}
}
+ def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue]
+
def clean {
FileUtils.deleteDirectory(new java.io.File(queuePath))
}
View
74 ...ailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala
@@ -3,26 +3,27 @@
*/
package akka.actor.mailbox
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.PoisonPill
-import akka.actor.Props
-import akka.dispatch.Await
import akka.testkit.AkkaSpec
import akka.testkit.TestLatch
import akka.util.duration._
import java.io.InputStream
import scala.annotation.tailrec
import com.typesafe.config.Config
+import akka.actor._
+import akka.dispatch.{ Mailbox, Await }
object DurableMailboxSpecActorFactory {
class MailboxTestActor extends Actor {
- def receive = { case "sum" sender ! "sum" }
+ def receive = { case x sender ! x }
}
- class Sender(latch: TestLatch) extends Actor {
- def receive = { case "sum" latch.countDown() }
+ class AccumulatorActor extends Actor {
+ var num = 0l
+ def receive = {
+ case x: Int num += x
+ case "sum" sender ! num
+ }
}
}
@@ -54,32 +55,53 @@ abstract class DurableMailboxSpec(val backendName: String, config: String) exten
if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result)
}
- def createMailboxTestActor(id: String): ActorRef =
- system.actorOf(Props(new MailboxTestActor).withDispatcher(backendName + "-dispatcher"))
+ def createMailboxTestActor(props: Props = Props[MailboxTestActor], id: String = ""): ActorRef = id match {
+ case null | "" system.actorOf(props.withDispatcher(backendName + "-dispatcher"))
+ case some system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some)
+ }
+
+ def isDurableMailbox(m: Mailbox): Boolean
"A " + backendName + " based mailbox backed actor" must {
- "handle reply to ! for 1 message" in {
- val latch = new TestLatch(1)
- val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
- val sender = system.actorOf(Props(new Sender(latch)))
+ "get a new, unique, durable mailbox" in {
+ val a1, a2 = createMailboxTestActor()
+ isDurableMailbox(a1.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
+ isDurableMailbox(a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
+ (a1.asInstanceOf[LocalActorRef].underlying.mailbox ne a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
+ }
+
+ "deliver messages at most once" in {
+ val queueActor = createMailboxTestActor()
+ implicit val sender = testActor
+
+ val msgs = 1 to 100 map { x "foo" + x }
+
+ msgs foreach { m queueActor ! m }
- queueActor.!("sum")(sender)
- Await.ready(latch, 10 seconds)
- queueActor ! PoisonPill
- sender ! PoisonPill
+ msgs foreach { m expectMsg(m) }
+
+ expectNoMsg()
}
- "handle reply to ! for multiple messages" in {
- val latch = new TestLatch(5)
- val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
- val sender = system.actorOf(Props(new Sender(latch)))
+ "support having multiple actors at the same time" in {
+ val actors = Vector.fill(3)(createMailboxTestActor(Props[AccumulatorActor]))
+
+ actors foreach { a isDurableMailbox(a.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) }
+
+ val msgs = 1 to 3
- for (i 1 to 10) queueActor.!("sum")(sender)
+ val expectedResult: Long = msgs.sum
+
+ for (a actors; m msgs) a ! m
+
+ for (a actors) {
+ implicit val sender = testActor
+ a ! "sum"
+ expectMsg(expectedResult)
+ }
- Await.ready(latch, 10 seconds)
- queueActor ! PoisonPill
- sender ! PoisonPill
+ expectNoMsg()
}
}
View
56 ...ailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala
@@ -7,7 +7,7 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import akka.actor._
import akka.actor.Actor._
import java.util.concurrent.CountDownLatch
-import akka.dispatch.MessageDispatcher
+import akka.dispatch.{ Mailbox, MessageDispatcher }
object MongoBasedMailboxSpec {
val config = """
@@ -27,6 +27,8 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail
lazy val mongod = new ProcessBuilder("mongod", "--dbpath", "mongoDB", "--bind_ip", "127.0.0.1", "--port", "27123").start()
lazy val mongo = MongoConnection("localhost", 27123)("akka")
+ def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[MongoBasedMessageQueue]
+
override def atStartup(): Unit = {
// start MongoDB daemon
new java.io.File("mongoDB").mkdir()
@@ -41,54 +43,4 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail
}
override def atTermination(): Unit = mongod.destroy()
-
-}
-
-/*object DurableMongoMailboxSpecActorFactory {
-
- class MongoMailboxTestActor extends Actor {
- def receive = {
- case "sum" => reply("sum")
- }
- }
-
- def createMongoMailboxTestActor(id: String)(implicit dispatcher: MessageDispatcher): ActorRef = {
- val queueActor = actorOf(Props[MongoMailboxTestActor]
- queueActor.dispatcher = dispatcher
- queueActor
- }
-}*/
-
-/*class MongoBasedMailboxSpec extends WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll {
- import DurableMongoMailboxSpecActorFactory._
-
- implicit val dispatcher = DurableDispatcher("mongodb", MongoNaiveDurableMailboxStorage, 1)
-
- "A MongoDB based naive mailbox backed actor" should {
- "should handle reply to ! for 1 message" in {
- val latch = new CountDownLatch(1)
- val queueActor = createMongoMailboxTestActor("mongoDB Backend should handle Reply to !")
- val sender = actorOf(Props(new Actor { def receive = { case "sum" => latch.countDown } })
-
- queueActor.!("sum")(Some(sender))
- latch.await(10, TimeUnit.SECONDS) must be (true)
- }
-
- "should handle reply to ! for multiple messages" in {
- val latch = new CountDownLatch(5)
- val queueActor = createMongoMailboxTestActor("mongoDB Backend should handle reply to !")
- val sender = actorOf( new Actor { def receive = { case "sum" => latch.countDown } } )
-
- queueActor.!("sum")(Some(sender))
- queueActor.!("sum")(Some(sender))
- queueActor.!("sum")(Some(sender))
- queueActor.!("sum")(Some(sender))
- queueActor.!("sum")(Some(sender))
- latch.await(10, TimeUnit.SECONDS) must be (true)
- }
- }
-
- override def beforeEach() {
- registry.local.shutdownAll
- }
-}*/
+}
View
4 ...ailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala
@@ -1,5 +1,7 @@
package akka.actor.mailbox
+import akka.dispatch.Mailbox
+
object RedisBasedMailboxSpec {
val config = """
Redis-dispatcher {
@@ -40,5 +42,5 @@ class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailbo
}
override def atTermination(): Unit = redisServer.destroy()
-
+ def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[RedisBasedMessageQueue]
}
View
4 .../akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala
@@ -3,10 +3,10 @@ package akka.actor.mailbox
import akka.actor.Actor
import akka.cluster.zookeeper._
import org.I0Itec.zkclient._
-import akka.dispatch.MessageDispatcher
import akka.actor.ActorRef
import com.typesafe.config.ConfigFactory
import akka.util.duration._
+import akka.dispatch.{ Mailbox, MessageDispatcher }
object ZooKeeperBasedMailboxSpec {
val config = """
@@ -31,7 +31,7 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe
}
var zkServer: ZkServer = _
-
+ def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[ZooKeeperBasedMessageQueue]
override def atStartup() {
zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath)
super.atStartup()

0 comments on commit e20252e

Please sign in to comment.