Skip to content

Commit

Permalink
auto reconnect support for queue clients
Browse files Browse the repository at this point in the history
  • Loading branch information
fiorix committed Sep 4, 2010
1 parent c0153a8 commit 59d4da1
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions demos/redis/myapp.py
Expand Up @@ -24,7 +24,7 @@
import cyclone.web
import cyclone.escape
import cyclone.redis
import cyclone.redis.protocol
from cyclone.redis.protocol import SubscriberProtocol


class textHandler(cyclone.web.RequestHandler):
Expand Down Expand Up @@ -76,7 +76,7 @@ def get(self, channels):
self.notifyFinish().addCallback(self.remove_peer)

for channel in channels:
if "*" in channel:
if "*" in channel and self.settings.queue.current_connection is not None:
self.settings.queue.current_connection.psubscribe(channel)
else:
self.settings.queue.current_connection.subscribe(channel)
Expand All @@ -93,7 +93,7 @@ def remove_peer(self, ignore):
except:
pass
else:
if not members:
if not members and self.settings.queue.current_connection is not None:
if "*" in chan:
self.settings.queue.current_connection.punsubscribe(chan)
else:
Expand All @@ -103,6 +103,10 @@ def remove_peer(self, ignore):
@defer.inlineCallbacks
def post(self, channel):
message = self.get_argument("message")

if self.settings.queue.current_connection is None:
raise cyclone.web.HTTPError(503)

try:
n = yield self.settings.redis.publish(channel, message.encode("utf-8"))
except Exception, e:
Expand All @@ -113,7 +117,7 @@ def post(self, channel):
self.finish("OK %d\r\n" % n)


class QueueProtocol(cyclone.redis.protocol.SubscriberProtocol):
class QueueProtocol(SubscriberProtocol):
def messageReceived(self, pattern, channel, message):
if pattern:
peers = self.factory.peers[pattern]
Expand All @@ -126,6 +130,11 @@ def messageReceived(self, pattern, channel, message):

def connectionMade(self):
self.factory.current_connection = self
for chan in self.factory.peers:
if "*" in chan:
self.psubscribe(chan)
else:
self.subscribe(chan)

def connectionLost(self, reason):
self.factory.current_connection = None
Expand Down

0 comments on commit 59d4da1

Please sign in to comment.