Skip to content

Commit

Permalink
Merge pull request #52 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds pluggable brokers
  • Loading branch information
Koed00 committed Sep 6, 2015
2 parents b0925af + cba7b60 commit 5bd150b
Show file tree
Hide file tree
Showing 27 changed files with 753 additions and 247 deletions.
13 changes: 13 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ env:
- DJANGO=1.8.4
- DJANGO=1.7.10

sudo: false

addons:
apt:
packages:
- tcl8.5

before_script:
- git clone https://github.com/antirez/disque.git disque_server
- "cd disque_server/src && make && PREFIX=../ make install && cd -"
- "./disque_server/bin/disque-server &"
- ./disque_server/bin/disque PING

install:
- pip install -q django==$DJANGO
- pip install -r requirements.txt
Expand Down
3 changes: 2 additions & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ include LICENSE
include README.rst
include django_q/management/*.py
include django_q/management/commands/*.py
include django_q/migrations/*.py
include django_q/migrations/*.py
include django_q/brokers/*.py
13 changes: 9 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@ Features
- Django Admin integration
- PaaS compatible with multiple instances
- Multi cluster monitor
- Redis broker
- Redis broker and Disque broker
- Python 2 and 3

Requirements
~~~~~~~~~~~~

- `Redis-py <https://github.com/andymccurdy/redis-py>`__
- `Django <https://www.djangoproject.com>`__ > = 1.7
- `Django-picklefield <https://github.com/gintas/django-picklefield>`__
- `Arrow <https://github.com/crsmithdev/arrow>`__
- `Blessed <https://github.com/jquast/blessed>`__

Tested with: Python 2.7 & 3.4. Django 1.7.10 & 1.8.4

Brokers
~~~~~~~
- `Redis <http://redis.io/>`__
- `Disque <https://github.com/antirez/disque>`__
- `Amazon SQS <https://aws.amazon.com/sqs/>`__ (TBA)
- `IronMQ <http://www.iron.io/mq/>`__ (TBA)


Installation
~~~~~~~~~~~~
Expand All @@ -54,8 +60,7 @@ Installation

$ python manage.py migrate

- Make sure you have a `Redis <http://redis.io/>`__ server running
somewhere
- Choose a message `broker <https://django-q.readthedocs.org/en/latest/brokers.html>`__ , configure and install the appropriate client library.

Read the full documentation at `https://django-q.readthedocs.org <https://django-q.readthedocs.org>`__

Expand Down
2 changes: 1 addition & 1 deletion django_q/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
from .cluster import Cluster
from .status import Stat

VERSION = (0, 5, 3)
VERSION = (0, 6, 0)

default_app_config = 'django_q.apps.DjangoQConfig'
159 changes: 159 additions & 0 deletions django_q/brokers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from django_q.conf import Conf
from django.core.cache import caches, InvalidCacheBackendError


class Broker(object):
def __init__(self, list_key=Conf.PREFIX):
self.connection = self.get_connection(list_key)
self.list_key = list_key
self.cache = self.get_cache()

def enqueue(self, task):
"""
Puts a task onto the queue
:type task: str
:return: task id
"""
pass

def dequeue(self):
"""
Gets a task from the queue
:return: tuple with task id and task message
"""
pass

def queue_size(self):
"""
:return: the amount of tasks in the queue
"""
pass

def delete_queue(self):
"""
Deletes the queue from the broker
"""
pass

def purge_queue(self):
"""
Purges the queue of any tasks
"""
pass

def delete(self, task_id):
"""
Deletes a task from the queue
:param task_id: the id of the task
"""
pass

def acknowledge(self, task_id):
"""
Acknowledges completion of the task and removes it from the queue.
:param task_id: the id of the task
"""
pass

def fail(self, task_id):
"""
Fails a task message
:param task_id:
:return:
"""

def ping(self):
"""
Checks whether the broker connection is available
:rtype: bool
"""
pass

def info(self):
"""
Shows the broker type
"""
pass

def set_stat(self, key, value, timeout):
"""
Saves a cluster statistic to the cache provider
:type key: str
:type value: str
:type timeout: int
"""
if not self.cache:
return
key_list = self.cache.get(Conf.Q_STAT, [])
if key not in key_list:
key_list.append(key)
self.cache.set(Conf.Q_STAT, key_list)
return self.cache.set(key, value, timeout)

def get_stat(self, key):
"""
Gets a cluster statistic from the cache provider
:type key: str
:return: a cluster Stat
"""
if not self.cache:
return
return self.cache.get(key)

def get_stats(self, pattern):
"""
Returns a list of all cluster stats from the cache provider
:type pattern: str
:return: a list of Stats
"""
if not self.cache:
return
key_list = self.cache.get(Conf.Q_STAT)
if not key_list or len(key_list) == 0:
return []
stats = []
for key in key_list:
stat = self.cache.get(key)
if stat:
stats.append(stat)
else:
key_list.remove(key)
self.cache.set(Conf.Q_STAT, key_list)
return stats

@staticmethod
def get_cache():
"""
Gets the current cache provider
:return: a cache provider
"""
try:
return caches[Conf.CACHE]
except InvalidCacheBackendError:
return None

@staticmethod
def get_connection(list_key=Conf.PREFIX):
"""
Gets a connection to the broker
:param list_key: Optional queue name
:return: a broker connection
"""
return 0


def get_broker(list_key=Conf.PREFIX):
"""
Gets the configured broker type
:param list_key: optional queue name
:type list_key: str
:return:
"""
# disque
if Conf.DISQUE_NODES:
from brokers import disque
return disque.Disque(list_key=list_key)
# default to redis
else:
from brokers import redis_broker
return redis_broker.Redis(list_key=list_key)
61 changes: 61 additions & 0 deletions django_q/brokers/disque.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import random
import redis
from django_q.brokers import Broker
from django_q.conf import Conf


class Disque(Broker):
def enqueue(self, task):
retry = Conf.RETRY if Conf.RETRY > 0 else '{} REPLICATE 1'.format(Conf.RETRY)
return self.connection.execute_command(
'ADDJOB {} {} 500 RETRY {}'.format(self.list_key, task, retry)).decode()

def dequeue(self):
task = self.connection.execute_command('GETJOB TIMEOUT 1000 FROM {}'.format(self.list_key))
if task:
return task[0][1].decode(), task[0][2].decode()

def queue_size(self):
return self.connection.execute_command('QLEN {}'.format(self.list_key))

def acknowledge(self, task_id):
return self.connection.execute_command('ACKJOB {}'.format(task_id))

def ping(self):
return self.connection.execute_command('HELLO')[0] > 0

def delete(self, task_id):
return self.connection.execute_command('DELJOB {}'.format(task_id))

def fail(self, task_id):
return self.delete(task_id)

def delete_queue(self):
jobs = self.connection.execute_command('JSCAN QUEUE {}'.format(self.list_key))[1]
if jobs:
job_ids = ' '.join(jid.decode() for jid in jobs)
self.connection.execute_command('DELJOB {}'.format(job_ids))
return len(jobs)

def info(self):
info = self.connection.info('server')
return 'Disque {}'.format(info['disque_version'])

@staticmethod
def get_connection(list_key=Conf.PREFIX):
# randomize nodes
random.shuffle(Conf.DISQUE_NODES)
# find one that works
for node in Conf.DISQUE_NODES:
host, port = node.split(':')
kwargs = {'host': host, 'port': port}
if Conf.DISQUE_AUTH:
kwargs['password'] = Conf.DISQUE_AUTH
redis_client = redis.Redis(**kwargs)
redis_client.decode_responses = True
try:
redis_client.execute_command('HELLO')
return redis_client
except redis.exceptions.ConnectionError:
continue
raise redis.exceptions.ConnectionError('Could not connect to any Disque nodes')
60 changes: 60 additions & 0 deletions django_q/brokers/redis_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import redis
from django_q.brokers import Broker
from django_q.conf import Conf, logger

try:
import django_redis
except ImportError:
django_redis = None


class Redis(Broker):

def __init__(self, list_key=Conf.PREFIX):
super(Redis, self).__init__(list_key='django_q:{}:q'.format(list_key))

def enqueue(self, task):
return self.connection.rpush(self.list_key, task)

def dequeue(self):
task = self.connection.blpop(self.list_key, 1)
if task:
return None, task[1]

def queue_size(self):
return self.connection.llen(self.list_key)

def delete_queue(self):
return self.connection.delete(self.list_key)

def purge_queue(self):
return self.connection.ltrim(self.list_key, 1, 0)

def ping(self):
try:
return self.connection.ping()
except redis.ConnectionError as e:
logger.error('Can not connect to Redis server.')
raise e

def info(self):
info = self.connection.info('server')
return 'Redis {}'.format(info['redis_version'])

def set_stat(self, key, value, timeout):
self.connection.set(key, value, timeout)

def get_stat(self, key):
if self.connection.exists(key):
return self.connection.get(key)

def get_stats(self, pattern):
keys = self.connection.keys(pattern=pattern)
if keys:
return self.connection.mget(keys)

@staticmethod
def get_connection(list_key=Conf.PREFIX):
if django_redis and Conf.DJANGO_REDIS:
return django_redis.get_redis_connection(Conf.DJANGO_REDIS)
return redis.StrictRedis(**Conf.REDIS)

0 comments on commit 5bd150b

Please sign in to comment.