From 59d4da1d25d98ab1bf4af737db7521a0e37bae99 Mon Sep 17 00:00:00 2001 From: Alexandre Fiori Date: Sat, 4 Sep 2010 16:58:03 -0300 Subject: [PATCH] auto reconnect support for queue clients --- demos/redis/myapp.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/demos/redis/myapp.py b/demos/redis/myapp.py index bdb6b920bd..a3e51caca5 100644 --- a/demos/redis/myapp.py +++ b/demos/redis/myapp.py @@ -24,7 +24,7 @@ import cyclone.web import cyclone.escape import cyclone.redis -import cyclone.redis.protocol +from cyclone.redis.protocol import SubscriberProtocol class textHandler(cyclone.web.RequestHandler): @@ -76,7 +76,7 @@ def get(self, channels): self.notifyFinish().addCallback(self.remove_peer) for channel in channels: - if "*" in channel: + if "*" in channel and self.settings.queue.current_connection is not None: self.settings.queue.current_connection.psubscribe(channel) else: self.settings.queue.current_connection.subscribe(channel) @@ -93,7 +93,7 @@ def remove_peer(self, ignore): except: pass else: - if not members: + if not members and self.settings.queue.current_connection is not None: if "*" in chan: self.settings.queue.current_connection.punsubscribe(chan) else: @@ -103,6 +103,10 @@ def remove_peer(self, ignore): @defer.inlineCallbacks def post(self, channel): message = self.get_argument("message") + + if self.settings.queue.current_connection is None: + raise cyclone.web.HTTPError(503) + try: n = yield self.settings.redis.publish(channel, message.encode("utf-8")) except Exception, e: @@ -113,7 +117,7 @@ def post(self, channel): self.finish("OK %d\r\n" % n) -class QueueProtocol(cyclone.redis.protocol.SubscriberProtocol): +class QueueProtocol(SubscriberProtocol): def messageReceived(self, pattern, channel, message): if pattern: peers = self.factory.peers[pattern] @@ -126,6 +130,11 @@ def messageReceived(self, pattern, channel, message): def connectionMade(self): self.factory.current_connection = self + for chan in self.factory.peers: + if "*" in chan: + self.psubscribe(chan) + else: + self.subscribe(chan) def connectionLost(self, reason): self.factory.current_connection = None