diff --git a/ckan/lib/task.py b/ckan/lib/task.py new file mode 100644 index 00000000000..6a2242861b8 --- /dev/null +++ b/ckan/lib/task.py @@ -0,0 +1,101 @@ +from rq import Queue +from redis import Redis + +from logging import getLogger +log = getLogger(__name__) + +redis_conn = Redis() +queues = None + + +def init_queues(): + ''' + Attempt to initialise the three queues on first use. There's no need + to create these until we need them (we might not). This will explode + if redis is not reachable + ''' + global queues + try: + queues = { + 'low': Queue('low', connection=redis_conn), + 'medium': Queue('medium', connection=redis_conn), + 'high': Queue('high', connection=redis_conn) + } + except Exception, e: + log.exception(e) + + +def async(fn, arguments, priority='medium', timeout=30): + ''' + Enqueue a task to be run in the background. + + :param fn: A function to be executed in the background. This + should be imported by the caller, but the function itself should + avoid having dependencies on CKAN. + :type fn: function + + :param arguments: A list of arguments to be passed to the function, + should be empty if there are no arguments. + :type arguments: list + + :param priority: The priority of this task, low, medium or high. By + default this is medium. + :type priority: string + + :param timeout: How long this should wait before considering + the job lost + :type: integer + + ''' + if not queues: + init_queues() + + if priority not in queues.keys(): + raise ValueError("priority is not a valid value") + + job = queues[priority].enqueue_call(func=fn, + args=arguments, timeout=timeout) + log.info("Enqueued task: %r" % job) + + +def clear_tasks(queue_priority): + ''' Empties the specified queue and returns the number of items + deleted. ''' + if queues is None: + init_queues() + + if queue_priority not in queues.keys(): + raise ValueError("priority is not a valid value") + + # We have to manually clear the queue unless we use rqinfo on the + # command line + counter = 0 + redis_conn = Redis() + while True: + job_id = redis_conn.lpop("rq:queue:%s" % queue_priority) + if job_id is None: + break + redis_conn.delete("rq:job:" + job_id) + log.info("Deleted task: %s" % job_id) + counter += 1 + return counter + + +def task_count(queue_priority=None): + ''' + Returns the number of jobs in the queue specified, which should be low, + medium, or high. If no queue is specified, the size of all of the queues is + returned. + ''' + if queues is None: + init_queues() + + if queue_priority and queue_priority not in queues.keys(): + raise ValueError("priority is not a valid value") + + size = 0 + if not queue_priority: + size = sum(len(q.job_ids) for q in queues.values()) + else: + size = len(queues.get(queue_priority).job_ids) + return size diff --git a/ckan/tests/lib/test_tasks.py b/ckan/tests/lib/test_tasks.py new file mode 100644 index 00000000000..0ef5513b615 --- /dev/null +++ b/ckan/tests/lib/test_tasks.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +import logging +from nose.tools import raises + +import ckan.tests.helpers as helpers +import ckan.lib.task as task + +log = logging.getLogger(__name__) + + +def fake_function(): + return 42 + + +class TestTasks(object): + + def setup(self): + task.clear_tasks('low') + task.clear_tasks('medium') + task.clear_tasks('high') + + def test_simple_async(self): + task.async(fake_function, []) + assert task.task_count('medium') == 1 + + @raises(ValueError) + def test_failing_async_bad_priority(self): + task.async(fake_function, [], priority="immediately") + + def test_queue_size(self): + task.async(fake_function, [], priority='low') + task.async(fake_function, [], priority='low') + task.async(fake_function, [], priority='low') + assert task.task_count('low') == 3 + task.clear_queue('low') + assert task.task_count('low') == 0 + + @raises(ValueError) + def test_queue_size_invalid(self): + assert task.task_count('immediately') == 0 + + def test_queue_size(self): + task.async(fake_function, [], priority='low') + task.async(fake_function, [], priority='medium') + task.async(fake_function, [], priority='high') + assert task.task_count() == 3 diff --git a/requirements.txt b/requirements.txt index 0a8f6310ce5..47d70dcce51 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,6 +33,7 @@ repoze.who-friendlyform==1.0.8 repoze.who==2.0 requests==2.7.0 routes==1.13 +rq==0.5.6 simplejson==3.3.1 # via pylons HAND-FIXED FOR NOW #2681 six==1.10.0 # via pastescript, sqlalchemy-migrate solrpy==0.9.5