Permalink
Browse files

New task, sparts.tasks.queue.QueueTask

QueueTasks are pretty great for working on work from a queue.

They also provide a --{task}-workers option, that makes it easier to
have multiple threads operating on the queue.
  • Loading branch information...
1 parent 1793f54 commit 60545559e80c851fa3bd21c2f9e21c39b1ba7594 @fmoo committed Nov 4, 2012
Showing with 90 additions and 0 deletions.
  1. +1 −0 CHANGES.txt
  2. +36 −0 sparts/tasks/queue.py
  3. +10 −0 sparts/vtask.py
  4. +43 −0 tests/tasks/test_queue.py
View
@@ -4,6 +4,7 @@ CHANGES
0.1.3
-----
* task "workers" attribute to set number of worker threads
+* new task, sparts.tasks.queue.QueueTask
0.1.2
-----
View
@@ -0,0 +1,36 @@
+from ..vtask import VTask, ExecuteContext, TryLater
+from ..sparts import option
+from Queue import Queue, Empty
+
+class QueueTask(VTask):
+ WORKERS = 1
+ workers = option('workers', type=int, default=lambda cls: cls.WORKERS,
+ help='Number of threads to spawn to work on items from '
+ 'its queue. [%(default)s]')
+
+ def initTask(self):
+ super(QueueTask, self).initTask()
+ self.queue = Queue()
+
+ def _runloop(self):
+ while not self.service._stop:
+ try:
+ item = self.queue.get(timeout=0.600)
+ except Empty:
+ continue
+
+ if isinstance(item, ExecuteContext):
+ context = item
+ item = context.item
+ else:
+ context = ExecuteContext(item=item)
+ try:
+ self.execute(item, context)
+ except TryLater:
+ context.attempt += 1
+ self.queue.put(context)
+ finally:
+ self.queue.task_done()
+
+ def execute(self, item, context):
+ raise NotImplementedError()
View
@@ -64,3 +64,13 @@ def _addArguments(cls, ap):
class SkipTask(Exception):
pass
+
+
+class TryLater(Exception):
+ pass
+
+
+class ExecuteContext(object):
+ def __init__(self, attempt=1, item=None):
+ self.attempt = attempt
+ self.item = item
View
@@ -0,0 +1,43 @@
+from sparts.tasks.queue import QueueTask
+from sparts.vtask import TryLater
+from ..base import SingleTaskTestCase
+
+
+class MyTask(QueueTask):
+ counter = 0
+
+ def execute(self, item, context):
+ self.counter += 1
+
+class TestMyTask(SingleTaskTestCase):
+ TASK = MyTask
+
+ def test_execute_happens(self):
+ self.task.queue.put('foo')
+ self.task.queue.put('bar')
+ self.task.queue.put('baz')
+ self.task.queue.join()
+ self.assertEquals(self.task.counter, 3)
+
+
+class MyRetryTask(QueueTask):
+ completed = 0
+ retried = 0
+
+ def execute(self, item, context):
+ if context.attempt <= 10:
+ self.retried += 1
+ raise TryLater()
+ else:
+ self.completed += 1
+
+class TestRetries(SingleTaskTestCase):
+ TASK = MyRetryTask
+
+ def test_retries_completed(self):
+ self.task.queue.put('foo')
+ self.task.queue.put('bar')
+ self.task.queue.put('baz')
+ self.task.queue.join()
+ self.assertEquals(self.task.retried, 30)
+ self.assertEquals(self.task.completed, 3)

0 comments on commit 6054555

Please sign in to comment.