Permalink
Browse files

Initial checkin of code for simple queue

  • Loading branch information...
0 parents commit 3be590d24c02adef4ba3e5b2101d9908ba8e16ef @coleifer committed Nov 3, 2011
19 LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2010 Charles Leifer
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
@@ -0,0 +1,3 @@
+include LICENSE.txt
+include MANIFEST.in
+include README.rst
@@ -0,0 +1,17 @@
+#!/usr/bin/env python
+import sys
+import unittest
+
+from skew import tests
+
+def runtests(*test_args):
+ suite = unittest.TestLoader().loadTestsFromModule(tests)
+ result = unittest.TextTestRunner(verbosity=2).run(suite)
+ if result.failures:
+ sys.exit(1)
+ elif result.errors:
+ sys.exit(2)
+ sys.exit(0)
+
+if __name__ == '__main__':
+ runtests(*sys.argv[1:])
@@ -0,0 +1,28 @@
+import os
+from setuptools import setup, find_packages
+
+
+setup(
+ name='skew',
+ version="0.1.0",
+ description='a simple queue for python',
+ author='Charles Leifer',
+ author_email='coleifer@gmail.com',
+ url='http://github.com/coleifer/skew/tree/master',
+ packages=find_packages(),
+ package_data = {
+ 'skew': [
+ ],
+ },
+ classifiers=[
+ 'Development Status :: 4 - Beta',
+ 'Environment :: Web Environment',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: MIT License',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python',
+ 'Framework :: Django',
+ ],
+ test_suite='runtests.runtests',
+ scripts = ['skew/bin/skew_consumer.py'],
+)
No changes.
No changes.
@@ -0,0 +1,61 @@
+class BaseQueue(object):
+ """
+ Base implementation for a Queue, all backends should subclass
+ """
+ blocking = False
+
+ def __init__(self, name, connection):
+ """
+ Initialize the Queue - this happens once when the module is loaded
+ """
+ self.name = name
+ self.connection = connection
+
+ def write(self, data):
+ """
+ Push 'data' onto the queue
+ """
+ raise NotImplementedError
+
+ def read(self):
+ """
+ Pop 'data' from the queue, returning None if no data is available --
+ an empty queue should not raise an Exception!
+ """
+ raise NotImplementedError
+
+ def flush(self):
+ """
+ Delete everything from the queue
+ """
+ raise NotImplementedError
+
+ def __len__(self):
+ """
+ Used primarily in tests, but return the number of items in the queue
+ """
+ raise NotImplementedError
+
+
+class BaseResultStore(object):
+ """
+ Base implementation for a result store
+ """
+ def __init__(self, name, connection):
+ """
+ Initialize the Queue - this happens once when the module is loaded
+ """
+ self.name = name
+ self.connection = connection
+
+ def put(self, task_id, value):
+ """
+ Store the result of a task
+ """
+ raise NotImplementedError
+
+ def get(self, task_id):
+ """
+ Retrieve a task's result from the backend
+ """
+ raise NotImplementedError
@@ -0,0 +1,34 @@
+from skew.backends.base import BaseQueue, BaseResultStore
+
+
+class DummyQueue(BaseQueue):
+ def __init__(self, *args, **kwargs):
+ super(DummyQueue, self).__init__(*args, **kwargs)
+ self._queue = []
+
+ def write(self, data):
+ self._queue.insert(0, data)
+
+ def read(self):
+ try:
+ return self._queue.pop()
+ except IndexError:
+ return None
+
+ def flush(self):
+ self._queue = []
+
+ def __len__(self):
+ return len(self._queue)
+
+
+class DummyResultStore(BaseResultStore):
+ def __init__(self, name, conn):
+ super(DummyResultStore, self).__init__(name, conn)
+ self._results = {}
+
+ def put(self, task_id, value):
+ self._results[task_id] = value
+
+ def get(self, task_id):
+ return self._results.pop(task_id, None)
@@ -0,0 +1,73 @@
+import re
+import redis
+
+from skew.backends.base import BaseQueue, BaseResultStore
+
+
+class RedisQueue(BaseQueue):
+ """
+ A simple Queue that uses the redis to store messages
+ """
+ def __init__(self, name, connection):
+ """
+ QUEUE_CONNECTION = 'host:port:database' or defaults to localhost:6379:0
+ """
+ super(RedisQueue, self).__init__(name, connection)
+
+ if not connection:
+ connection = 'localhost:6379:0'
+
+ self.queue_name = 'skew.redis.%s' % re.sub('[^a-z0-9]', '', name)
+ host, port, db = connection.split(':')
+
+ self.conn = redis.Redis(
+ host=host, port=int(port), db=int(db)
+ )
+
+ def write(self, data):
+ self.conn.lpush(self.queue_name, data)
+
+ def read(self):
+ return self.conn.rpop(self.queue_name)
+
+ def flush(self):
+ self.conn.delete(self.queue_name)
+
+ def __len__(self):
+ return self.conn.llen(self.queue_name)
+
+
+class RedisBlockingQueue(RedisQueue):
+ """
+ Use the blocking right pop, should result in messages getting
+ executed close to immediately by the consumer as opposed to
+ being polled for
+ """
+ blocking = True
+
+ def read(self):
+ return self.conn.brpop(self.queue_name)
+
+
+class RedisResultStore(BaseResultStore):
+ def __init__(self, name, connection):
+ super(RedisResultStore, self).__init__(name, connection)
+
+ if not connection:
+ connection = 'localhost:6379:0'
+
+ self.storage_name = 'skew.redis.results.%s' % re.sub('[^a-z0-9]', '', name)
+ host, port, db = connection.split(':')
+
+ self.conn = redis.Redis(
+ host=host, port=int(port), db=int(db)
+ )
+
+ def put(self, task_id, value):
+ self.conn.hset(self.storage_name, task_id, value)
+
+ def get(self, task_id):
+ val = self.conn.hget(self.storage_name, task_id)
+ if val:
+ self.conn.hdel(self.storage_name, task_id)
+ return val
No changes.
@@ -0,0 +1,17 @@
+import logging
+
+from skew.backends.dummy import DummyQueue
+
+
+class BaseConfiguration(object):
+ QUEUE = None
+ RESULT_STORE = None
+ PERIODIC = False
+ THREADS = 1
+
+ LOGFILE = '/tmp/skew.log'
+ LOGLEVEL = logging.INFO
+
+ BACKOFF = 1.15
+ INITIAL_DELAY = .1
+ MAX_DELAY = 10
Oops, something went wrong.

0 comments on commit 3be590d

Please sign in to comment.