Permalink
Browse files

Merge pull request #9 from gamechanger/queue_piping

introducing Queue Piping v1
  • Loading branch information...
2 parents 4a5d23b + 1eb88cb commit 856e33dbae348ab2ce75d4e05aa5e5d5adb5da74 @dwoos dwoos committed Apr 9, 2012
Showing with 51 additions and 9 deletions.
  1. +51 −9 resched/queue.py
View
60 resched/queue.py
@@ -12,8 +12,9 @@ class Queue(RedisBacked):
>>> from redis import Redis
>>> from base import ContentType
>>> import time
+ >>> client = Redis('localhost')
- >>> q = Queue(Redis('localhost'), 'stuff', ContentType.JSON)
+ >>> q = Queue(client, 'stuff', ContentType.JSON)
>>> q.keep_entry_set = True
>>> q.clear()
>>> q.reclaim_tasks()
@@ -38,8 +39,8 @@ class Queue(RedisBacked):
>>> assert q.number_active_workers() == 1
>>> q.reclaim_tasks()
- >>> qa = Queue(Redis('localhost'), 'stuff2', ContentType.JSON, worker_id='a', work_ttl=1)
- >>> qb = Queue(Redis('localhost'), 'stuff2', ContentType.JSON, worker_id='b', work_ttl=1)
+ >>> qa = Queue(client, 'stuff2', ContentType.JSON, worker_id='a', work_ttl=1)
+ >>> qb = Queue(client, 'stuff2', ContentType.JSON, worker_id='b', work_ttl=1)
>>> qa.clear()
>>> qa.push({'hello': 'cruelworld'})
>>> qa.size()
@@ -70,10 +71,29 @@ class Queue(RedisBacked):
1
>>> qb.number_in_progress()
0
+ >>> first = Queue(client, 'abc')
+ >>> second = Queue(client, 'abc_errors')
+ >>> first.clear()
+ >>> second.clear()
+ >>> first.pipe(Queue.RESULT_ERROR, second)
+ >>> first.push('a', 'aaa')
+ >>> first.pop(return_key=True)
+ ('a', 'aaa')
+ >>> first.complete('a', Queue.RESULT_ERROR)
+ >>> first.size()
+ 0
+ >>> second.size()
+ 1
+ >>> second.peek()
+ 'a'
+ >>> second.pop()
+ 'aaa'
"""
FIFO = 'fifo'
FILO = LIFO = 'filo'
+ RESULT_ERROR = 'error' # useful for piping errors somewhere
+ RESULT_SUCCESS = 'success' # useful for piping completion somewhere
DEFAULT_WORK_TTL_SECONDS = 60
def __init__(self, redis_client, namespace, content_type=ContentType.STRING, **kwargs):
@@ -94,9 +114,8 @@ def __init__(self, redis_client, namespace, content_type=ContentType.STRING, **k
assert self.strategy in (self.FIFO, self.LIFO)
self.keep_entry_set = kwargs.get('track_entries', False)
self.work_ttl_seconds = kwargs.get('work_ttl', self.DEFAULT_WORK_TTL_SECONDS)
- self.pipes = kwargs.get('pipes', [])
- assert isinstance(self.pipes, (list, tuple))
- for result_code, queue in self.pipes:
+ self.pipes = dict(kwargs.get('pipes', []))
+ for result_code, queue in self.pipes.iteritems():
assert isintance(result_code, basestring) and isinstance(queue, Queue)
self.QUEUE_LIST_KEY = 'queue.{ns}'.format(ns=namespace)
@@ -107,6 +126,15 @@ def __init__(self, redis_client, namespace, content_type=ContentType.STRING, **k
self.PAYLOADS = 'queue.{ns}.payload'.format(ns=namespace)
+ def pipe(self, result, queue):
+ """
+ @param result A string used to pipe completions to another Queue.
+ @param queue The resched.queue.Queue to pipe them to.
+ """
+ assert result and isinstance(result, basestring)
+ assert isinstance(queue, Queue)
+ self.pipes[result] = queue
+
def _working_list_key(self, worker_id=None):
worker_id = worker_id or self.worker_id
return 'queue.{ns}.working.{wid}'.format(ns=self.namespace, wid=worker_id)
@@ -156,8 +184,8 @@ def number_of_entries(self):
def number_active_workers(self):
return self.server.scard(self.WORKER_SET_KEY)
- def push(self, value, payload=None):
- with self.server.pipeline() as pipe:
+ def push(self, value, payload=None, pipeline=None):
+ with (pipeline or self.server.pipeline()) as pipe:
value = self.pack(value)
payload = self.pack(payload)
if self.strategy == self.FIFO:
@@ -210,10 +238,24 @@ def peek(self):
self._on_activity()
return self.unpack(self.server.lindex(self.QUEUE_LIST_KEY, 0))
- def complete(self, value):
+ def complete(self, value, result=None):
+ """
+ Mark an item as complete.
+ This does the following:
+ * removes the item from the WORKING list for the current worker
+ * removes the item from the ENTRY set (if used)
+ * moves the key and payload to a RESULT QUEUE if specified
+ * removes the PAYLOAD from its collection.
+
+ @param value The key corresponding to a value that's in-progress.
+ @param result An optional result string, which could trigger a queue transition.
+ """
self._on_activity()
with self.server.pipeline() as pipe:
value = self.pack(value)
+ if result and result in self.pipes:
+ payload = self.server.hget(self.PAYLOADS, value) or value
+ self.pipes[result].push(value, payload=payload, pipeline=pipe)
pipe.lrem(self.WORKING_LIST_KEY, value)
pipe.srem(self.ENTRY_SET_KEY, value)
pipe.hdel(self.PAYLOADS, value)

0 comments on commit 856e33d

Please sign in to comment.