Skip to content
This repository
Browse code

[client] support for P[UN]SUBSCRIBE, thanks to https://github.com/ani…

  • Loading branch information...
commit 20db682ed191c784aedbee5d2de8defdd03774fc 1 parent 7956020
Valentin Gologuzov authored

Showing 1 changed file with 33 additions and 8 deletions. Show diff stats Hide diff stats

  1. +33 8 brukva/client.py
41 brukva/client.py
@@ -83,10 +83,20 @@ def execution_context(callbacks, error_wrapper=None):
83 83 return ExecutionContext(callbacks, error_wrapper)
84 84
85 85 class Message(object):
86   - def __init__(self, kind, channel, body):
87   - self.kind = kind
88   - self.channel = channel
89   - self.body = body
  86 + ''' Wrapper Message object.
  87 + kind = command
  88 + channel = channel from which the message was received
  89 + pattern = subscription pattern
  90 + body = message body
  91 + '''
  92 + def __init__(self, *args):
  93 + if len(args) == 3:
  94 + (self.kind, self.channel, self.body) = args
  95 + self.pattern = self.channel
  96 + elif len(args) == 4:
  97 + (self.kind, self.channel, self.pattern, self.body) = args
  98 + else:
  99 + raise ValueError('Invalid number of arguments')
90 100
91 101 class CmdLine(object):
92 102 def __init__(self, cmd, *args, **kwargs):
@@ -275,7 +285,9 @@ def reply_ttl(r, *args, **kwargs):
275 285
276 286 PUB_SUB_COMMANDS = set([
277 287 'SUBSCRIBE',
  288 + 'PSUBSCRIBE',
278 289 'UNSUBSCRIBE',
  290 + 'PUNSUBSCRIBE',
279 291 'LISTEN',
280 292 ])
281 293
@@ -315,7 +327,8 @@ def __init__(self, host='localhost', port=6379, password=None,
315 327 reply_dict_from_pairs),
316 328 string_keys_to_dict('HGET',
317 329 reply_str),
318   - string_keys_to_dict('SUBSCRIBE UNSUBSCRIBE LISTEN',
  330 + string_keys_to_dict('SUBSCRIBE UNSUBSCRIBE LISTEN '
  331 + 'PSUBSCRIBE UNSUBSCRIBE',
319 332 reply_pubsub_message),
320 333 string_keys_to_dict('ZRANK ZREVRANK',
321 334 reply_int),
@@ -854,6 +867,12 @@ def hvals(self, key, callbacks=None):
854 867
855 868 ### PUBSUB
856 869 def subscribe(self, channels, callbacks=None):
  870 + self._subscribe('SUBSCRIBE', channels, callbacks)
  871 +
  872 + def psubscribe(self, channels, callbacks=None):
  873 + self._subscribe('PSUBSCRIBE', channels, callbacks)
  874 +
  875 + def _subscribe(self, cmd, channels, callbacks=None):
857 876 callbacks = callbacks or []
858 877 if not isinstance(callbacks, Iterable):
859 878 callbacks = [callbacks]
@@ -861,19 +880,25 @@ def subscribe(self, channels, callbacks=None):
861 880 channels = [channels]
862 881 if not self.subscribed:
863 882 callbacks = list(callbacks) + [self.on_subscribed]
864   - self.execute_command('SUBSCRIBE', callbacks, *channels)
  883 + self.execute_command(cmd, callbacks, *channels)
865 884
866 885 def on_subscribed(self, result):
867 886 self.subscribed = True
868 887
869 888 def unsubscribe(self, channels, callbacks=None):
  889 + self._unsubscribe('UNSUBSCRIBE', channels, callbacks)
  890 +
  891 + def punsubscribe(self, channels, callbacks=None):
  892 + self._unsubscribe('UNSUBSCRIBE', channels, callbacks)
  893 +
  894 + def _unsubscribe(self, cmd, channels, callbacks=None):
870 895 callbacks = callbacks or []
871 896 if not isinstance(callbacks, Iterable):
872 897 callbacks = [callbacks]
873 898 if isinstance(channels, basestring):
874 899 channels = [channels]
875 900 callbacks = list(callbacks)
876   - self.execute_command('UNSUBSCRIBE', callbacks, *channels)
  901 + self.execute_command(cmd, callbacks, *channels)
877 902
878 903 def on_unsubscribed(self, *args, **kwargs):
879 904 self.subscribed = False
@@ -908,7 +933,7 @@ def error_wrapper(e):
908 933
909 934 result = self.format_reply(cmd_listen, response)
910 935
911   - if result.kind != 'message':
  936 + if result.kind not in ('message', 'pmessage'):
912 937 waiting_stack = self._waiting_callbacks[result.kind.upper()]
913 938 if len(waiting_stack) > 0:
914 939 ctx.safe_call(waiting_stack.pop(0), result)

0 comments on commit 20db682

Please sign in to comment.
Something went wrong with that request. Please try again.