Permalink
Browse files

Pubsub is hopefully fixed (it was not converted to new error handling…

… technique). It works for me, but no tests, sorry.
  • Loading branch information...
1 parent e20f8a6 commit e15ddaaa9faac5b25447ad790b33e7782d5c35df @kmike kmike committed Apr 12, 2011
Showing with 19 additions and 21 deletions.
  1. +19 −21 brukva/client.py
View
@@ -407,7 +407,7 @@ def consume_multibulk(self, length, cmd_line, callback):
@async
@process
def consume_bulk(self, length, callback):
- with forward_error(callback):
+ with forward_error(callback):
data = yield async(self.connection.read)(length)
if isinstance(data, Exception):
raise data
@@ -779,9 +779,7 @@ def subscribe(self, channels, callbacks=None):
self.execute_command('SUBSCRIBE', callbacks, *channels)
def on_subscribed(self, result):
- (e, _) = result
- if not e:
- self.subscribed = True
+ self.subscribed = True
def unsubscribe(self, channels, callbacks=None):
callbacks = callbacks or []
@@ -791,31 +789,31 @@ def unsubscribe(self, channels, callbacks=None):
self.execute_command('UNSUBSCRIBE', callbacks, *channels)
def on_unsubscribed(self, result):
- (e, _) = result
- if not e:
- self.subscribed = False
+ self.subscribed = False
def publish(self, channel, message, callbacks=None):
self.execute_command('PUBLISH', callbacks, channel, message)
@process
def listen(self, callbacks=None):
- # 'LISTEN' is just for exception information, it is not actually sent anywhere
- callbacks = callbacks or []
- if not hasattr(callbacks, '__iter__'):
- callbacks = [callbacks]
+ # 'LISTEN' is just for receiving information, it is not actually sent anywhere
+ with forward_error(callbacks):
+ callbacks = callbacks or []
+ if not hasattr(callbacks, '__iter__'):
+ callbacks = [callbacks]
- yield self.connection.queue_wait()
- cmd_listen = CmdLine('LISTEN')
- while self.subscribed:
- data = yield async(self.connection.readline)()
- try:
- error, response = yield self.process_data(data, cmd_listen)
- result = self.format_reply(cmd_listen, response)
- except Exception, e:
- error, result = e, None
+ yield self.connection.queue_wait()
+ cmd_listen = CmdLine('LISTEN')
+ while self.subscribed:
+ data = yield async(self.connection.readline)()
+ if isinstance(data, Exception):
+ raise data
- self.call_callbacks(callbacks, (error, result) )
+ response = yield self.process_data(data, cmd_listen)
+ if isinstance(response, Exception):
+ raise response
+ result = self.format_reply(cmd_listen, response)
+ self.call_callbacks(callbacks, result)
### CAS
def watch(self, key, callbacks=None):

0 comments on commit e15ddaa

Please sign in to comment.