Skip to content

Commit

Permalink
Merge branch 'blpop'
Browse files Browse the repository at this point in the history
  • Loading branch information
Ask Solem committed Jun 28, 2010
2 parents af5b4e9 + 33ae474 commit b428eb3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
7 changes: 6 additions & 1 deletion ghettoq/backends/pyredis.py
Expand Up @@ -34,7 +34,12 @@ def put(self, queue, message):
self.client.lpush(queue, message)

def get(self, queue):
return self.client.rpop(queue)
dest, item = self.client.brpop([queue], timeout=1)
return item

def get_many(self, queues, timeout=None):
dest, item = self.client.brpop(queues, timeout)
return item, dest

def purge(self, queue):
return self.client.delete(queue)
12 changes: 11 additions & 1 deletion ghettoq/messaging.py
Expand Up @@ -31,6 +31,13 @@ def __init__(self, backend, queues):
self.backend = backend
self.queue_names = list(queues)

self._get_many = getattr(self.backend, "get_many", None)
self.get = self._emulated
if self._get_many:
self.get = self._native

# attributes below are only used in emulation mode.

# queues could be a PriorityQueue as well to support
# priorities.
self.queues = map(self.backend.Queue, self.queue_names)
Expand All @@ -42,7 +49,10 @@ def __init__(self, backend, queues):
# tried all of them.
self.all = frozenset(self.queue_names)

def get(self):
def _native(self, timeout=None):
return self._get_many(self.queue_names, timeout=timeout)

def _emulated(self, timeout=None):
"""Get the next message avaiable in the queue.
:returns: The message and the name of the queue it came from as
Expand Down
4 changes: 3 additions & 1 deletion ghettoq/taproot.py
Expand Up @@ -78,6 +78,7 @@ class MultiBackend(BaseBackend):
default_port = None
type = None
interval = 1
polling = True
_prefetch_count = None

def __init__(self, connection, **kwargs):
Expand Down Expand Up @@ -116,7 +117,7 @@ def _poll(self, resource):
return resource.get()
except QueueEmpty:
pass
time.sleep(self.interval)
self.polling and time.sleep(self.interval)

def declare_consumer(self, queue, no_ack, callback, consumer_tag,
**kwargs):
Expand Down Expand Up @@ -214,6 +215,7 @@ def qos_manager(self):

class Redis(MultiBackend):
type = "Redis"
polling = False


class Database(MultiBackend):
Expand Down

0 comments on commit b428eb3

Please sign in to comment.