Permalink
Browse files

Merge branch 'master' of https://github.com/kmike/brukva into kmike-m…

…aster
  • Loading branch information...
evilkost committed Apr 12, 2011
2 parents 7f71b7b + e15ddaa commit abc1e87845b9ec3517ca90feb44b1d0581b6ab67
Showing with 20 additions and 22 deletions.
  1. +20 −22 brukva/client.py
  2. 0 setup.py
View
@@ -23,7 +23,7 @@ def forward_error(callbacks, cleanup=None):
log.error(e)
if isinstance(callbacks, Iterable):
for cb in callbacks:
- callbacks(e)
+ cb(e)
else:
callbacks(e)
finally:
@@ -407,7 +407,7 @@ def consume_multibulk(self, length, cmd_line, callback):
@async
@process
def consume_bulk(self, length, callback):
- with forward_error(callback):
+ with forward_error(callback):
data = yield async(self.connection.read)(length)
if isinstance(data, Exception):
raise data
@@ -779,9 +779,7 @@ def subscribe(self, channels, callbacks=None):
self.execute_command('SUBSCRIBE', callbacks, *channels)
def on_subscribed(self, result):
- (e, _) = result
- if not e:
- self.subscribed = True
+ self.subscribed = True
def unsubscribe(self, channels, callbacks=None):
callbacks = callbacks or []
@@ -791,31 +789,31 @@ def unsubscribe(self, channels, callbacks=None):
self.execute_command('UNSUBSCRIBE', callbacks, *channels)
def on_unsubscribed(self, result):
- (e, _) = result
- if not e:
- self.subscribed = False
+ self.subscribed = False
def publish(self, channel, message, callbacks=None):
self.execute_command('PUBLISH', callbacks, channel, message)
@process
def listen(self, callbacks=None):
- # 'LISTEN' is just for exception information, it is not actually sent anywhere
- callbacks = callbacks or []
- if not hasattr(callbacks, '__iter__'):
- callbacks = [callbacks]
+ # 'LISTEN' is just for receiving information, it is not actually sent anywhere
+ with forward_error(callbacks):
+ callbacks = callbacks or []
+ if not hasattr(callbacks, '__iter__'):
+ callbacks = [callbacks]
- yield self.connection.queue_wait()
- cmd_listen = CmdLine('LISTEN')
- while self.subscribed:
- data = yield async(self.connection.readline)()
- try:
- error, response = yield self.process_data(data, cmd_listen)
- result = self.format_reply(cmd_listen, response)
- except Exception, e:
- error, result = e, None
+ yield self.connection.queue_wait()
+ cmd_listen = CmdLine('LISTEN')
+ while self.subscribed:
+ data = yield async(self.connection.readline)()
+ if isinstance(data, Exception):
+ raise data
- self.call_callbacks(callbacks, (error, result) )
+ response = yield self.process_data(data, cmd_listen)
+ if isinstance(response, Exception):
+ raise response
+ result = self.format_reply(cmd_listen, response)
+ self.call_callbacks(callbacks, result)
### CAS
def watch(self, key, callbacks=None):
View
0 setup.py 100644 → 100755
No changes.

0 comments on commit abc1e87

Please sign in to comment.