Skip to content

Commit

Permalink
fix issue #39 and issue #40 and pull request by kelaneren - add hook …
Browse files Browse the repository at this point in the history
…to handle error by consumer when redis server gives an error
  • Loading branch information
debasishg committed Sep 19, 2012
1 parent 0440424 commit 6f62a8e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
41 changes: 23 additions & 18 deletions src/main/scala/com/redis/PubSub.scala
Expand Up @@ -17,6 +17,7 @@ sealed trait PubSubMessage
case class S(channel: String, noSubscribed: Int) extends PubSubMessage
case class U(channel: String, noSubscribed: Int) extends PubSubMessage
case class M(origChannel: String, message: String) extends PubSubMessage
case class E(e: java.lang.Throwable) extends PubSubMessage

import Util._
trait PubSub { self: Redis =>
Expand All @@ -30,25 +31,29 @@ trait PubSub { self: Redis =>
}

def run {
whileTrue {
asList match {
case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>
msgType match {
case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
case "unsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
break
case "punsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
break
case "unsubscribe" | "punsubscribe" =>
fn(U(channel, data.toInt))
case "message" | "pmessage" =>
fn(M(channel, data))
case x => throw new RuntimeException("unhandled message: " + x)
}
case _ => break
try {
whileTrue {
asList match {
case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>
msgType match {
case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
case "unsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
break
case "punsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
break
case "unsubscribe" | "punsubscribe" =>
fn(U(channel, data.toInt))
case "message" | "pmessage" =>
fn(M(channel, data))
case x => throw new RuntimeException("unhandled message: " + x)
}
case _ => break
}
}
} catch {
case e => fn(E(e))
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/test/scala/com/redis/PubSubSpec.scala
Expand Up @@ -37,6 +37,8 @@ class PubSubSpec extends Spec
pubsub match {
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup")

case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
Expand Down

0 comments on commit 6f62a8e

Please sign in to comment.