Permalink
Browse files

more pubsub

  • Loading branch information...
1 parent ff4dd79 commit d3728a974bccc17a8523c238f447cae0e9bd67c0 @derekjw committed Aug 17, 2011
Showing with 29 additions and 11 deletions.
  1. +8 −2 src/main/scala/Actors.scala
  2. +3 −0 src/main/scala/Protocol.scala
  3. +3 −0 src/main/scala/PubSub.scala
  4. +15 −9 src/test/scala/PubSubSpec.scala
@@ -163,12 +163,18 @@ private[redis] final class RedisClientWorker(ioManager: ActorRef, host: String,
loopC {
val result = readResult
result match {
+ case RedisMulti(Some(List(RedisBulk(Some(Protocol.message)), RedisBulk(Some(channel)), RedisBulk(Some(message))))) =>
+ listener ! pubsub.Message(channel, message)
+ case RedisMulti(Some(List(RedisBulk(Some(Protocol.pmessage)), RedisBulk(Some(pattern)), RedisBulk(Some(channel)), RedisBulk(Some(message))))) =>
+ listener ! pubsub.PMessage(pattern, channel, message)
case RedisMulti(Some(List(RedisBulk(Some(Protocol.subscribe)), RedisBulk(Some(channel)), RedisInteger(count)))) =>
listener ! pubsub.Subscribed(channel, count)
case RedisMulti(Some(List(RedisBulk(Some(Protocol.unsubscribe)), RedisBulk(Some(channel)), RedisInteger(count)))) =>
listener ! pubsub.Unsubscribed(channel, count)
- case RedisMulti(Some(List(RedisBulk(Some(Protocol.message)), RedisBulk(Some(channel)), RedisBulk(Some(message))))) =>
- listener ! pubsub.Message(channel, message)
+ case RedisMulti(Some(List(RedisBulk(Some(Protocol.psubscribe)), RedisBulk(Some(pattern)), RedisInteger(count)))) =>
+ listener ! pubsub.PSubscribed(pattern, count)
+ case RedisMulti(Some(List(RedisBulk(Some(Protocol.punsubscribe)), RedisBulk(Some(pattern)), RedisInteger(count)))) =>
+ listener ! pubsub.PUnsubscribed(pattern, count)
case other =>
throw RedisProtocolException("Unexpected response")
()
@@ -79,9 +79,12 @@ private[redis] object Protocol {
val OBJECT = ByteString("OBJECT")
val PERSIST = ByteString("PERSIST")
val PING = ByteString("PING")
+ val pmessage = ByteString("pmessage")
val PSUBSCRIBE = ByteString("PSUBSCRIBE")
+ val psubscribe = ByteString("psubscribe")
val PUBLISH = ByteString("PUBLISH")
val PUNSUBSCRIBE = ByteString("PUNSUBSCRIBE")
+ val punsubscribe = ByteString("punsubscribe")
val QUIT = ByteString("QUIT")
val RANDOMKEY = ByteString("RANDOMKEY")
val RENAME = ByteString("RENAME")
@@ -8,6 +8,9 @@ sealed trait PubSubMessage
case class Subscribed(channel: ByteString, count: Long) extends PubSubMessage
case class Unsubscribed(channel: ByteString, count: Long) extends PubSubMessage
case class Message(channel: ByteString, content: ByteString) extends PubSubMessage
+case class PSubscribed(pattern: ByteString, count: Long) extends PubSubMessage
+case class PUnsubscribed(pattern: ByteString, count: Long) extends PubSubMessage
+case class PMessage(pattern: ByteString, channel: ByteString, content: ByteString) extends PubSubMessage
object Subscribe {
def apply[A: Store](channel: A): Subscribe = new Subscribe(List(Store(channel)))
@@ -3,7 +3,7 @@ package net.fyrie.redis
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
-import akka.dispatch.Future
+import akka.dispatch.{ Future, PromiseStream }
import akka.testkit.{ filterEvents, EventFilter }
import akka.actor.{ Actor, ActorRef }
import Actor.actorOf
@@ -14,7 +14,7 @@ class PubSubSpec extends Spec
with ShouldMatchers
with RedisTestServer {
- class TestSubscriber extends Actor {
+ class TestSubscriber(ps: PromiseStream[String]) extends Actor {
var client: ActorRef = _
override def preStart {
client = r.subscriber(self)
@@ -25,20 +25,26 @@ class PubSubSpec extends Spec
case msg: Unsubscribe => client ! msg
case msg: PSubscribe => client ! msg
case msg: PUnsubscribe => client ! msg
- case Subscribed(channel, count) => println("[" + Parse[String](channel) + "] Subscribed")
- case Unsubscribed(channel, count) => println("[" + Parse[String](channel) + "] Unsubscribed")
- case Message(channel, bytes) => println("[" + Parse[String](channel) + "] Message: " + Parse[String](bytes))
+ case Subscribed(channel, count) => ps enqueue ("[" + Parse[String](channel) + "] Subscribed")
+ case Unsubscribed(channel, count) => ps enqueue ("[" + Parse[String](channel) + "] Unsubscribed")
+ case Message(channel, bytes) => ps enqueue ("[" + Parse[String](channel) + "] Message: " + Parse[String](bytes))
+ case PSubscribed(pattern, count) => ps enqueue ("[" + Parse[String](pattern) + "] Subscribed")
+ case PUnsubscribed(pattern, count) => ps enqueue ("[" + Parse[String](pattern) + "] Unsubscribed")
+ case PMessage(pattern, channel, bytes) => ps enqueue ("[" + Parse[String](pattern) + "][" + Parse[String](channel) + "] Message: " + Parse[String](bytes))
}
}
describe("pubsub") {
it("should work") {
- val s = actorOf(new TestSubscriber).start
- s ! Subscribe(Set("Test Channel", "Test Channel 2"))
- Thread.sleep(1000)
+ val ps = PromiseStream[String]()
+ val s = actorOf(new TestSubscriber(ps)).start
+ s ! Subscribe(Seq("Test Channel", "Test Channel 2"))
+ ps.dequeue.get should be("[Test Channel] Subscribed")
+ ps.dequeue.get should be("[Test Channel 2] Subscribed")
r.publish("Test Channel", "Hello!")
r.publish("Test Channel", "World!")
- Thread.sleep(1000)
+ ps.dequeue.get should be("[Test Channel] Message: Hello!")
+ ps.dequeue.get should be("[Test Channel] Message: World!")
s.stop
}
}

0 comments on commit d3728a9

Please sign in to comment.