Skip to content

Commit

Permalink
Initial implementation of providing async calls via RQ
Browse files Browse the repository at this point in the history
  • Loading branch information
rossjones committed Oct 23, 2015
1 parent 1c79507 commit 6b19927
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 0 deletions.
101 changes: 101 additions & 0 deletions ckan/lib/task.py
@@ -0,0 +1,101 @@
from rq import Queue
from redis import Redis

from logging import getLogger
log = getLogger(__name__)

redis_conn = Redis()
queues = None


def init_queues():
'''
Attempt to initialise the three queues on first use. There's no need
to create these until we need them (we might not). This will explode
if redis is not reachable
'''
global queues
try:
queues = {
'low': Queue('low', connection=redis_conn),
'medium': Queue('medium', connection=redis_conn),
'high': Queue('high', connection=redis_conn)
}
except Exception, e:
log.exception(e)


def async(fn, arguments, priority='medium', timeout=30):
'''
Enqueue a task to be run in the background.
:param fn: A function to be executed in the background. This
should be imported by the caller, but the function itself should
avoid having dependencies on CKAN.
:type fn: function
:param arguments: A list of arguments to be passed to the function,
should be empty if there are no arguments.
:type arguments: list
:param priority: The priority of this task, low, medium or high. By
default this is medium.
:type priority: string
:param timeout: How long this should wait before considering
the job lost
:type: integer
'''
if not queues:
init_queues()

if priority not in queues.keys():
raise ValueError("priority is not a valid value")

job = queues[priority].enqueue_call(func=fn,
args=arguments, timeout=timeout)
log.info("Enqueued task: %r" % job)


def clear_tasks(queue_priority):
''' Empties the specified queue and returns the number of items
deleted. '''
if queues is None:
init_queues()

if queue_priority not in queues.keys():
raise ValueError("priority is not a valid value")

# We have to manually clear the queue unless we use rqinfo on the
# command line
counter = 0
redis_conn = Redis()
while True:
job_id = redis_conn.lpop("rq:queue:%s" % queue_priority)
if job_id is None:
break
redis_conn.delete("rq:job:" + job_id)
log.info("Deleted task: %s" % job_id)
counter += 1
return counter


def task_count(queue_priority=None):
'''
Returns the number of jobs in the queue specified, which should be low,
medium, or high. If no queue is specified, the size of all of the queues is
returned.
'''
if queues is None:
init_queues()

if queue_priority and queue_priority not in queues.keys():
raise ValueError("priority is not a valid value")

size = 0
if not queue_priority:
size = sum(len(q.job_ids) for q in queues.values())
else:
size = len(queues.get(queue_priority).job_ids)
return size
46 changes: 46 additions & 0 deletions ckan/tests/lib/test_tasks.py
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
import logging
from nose.tools import raises

import ckan.tests.helpers as helpers
import ckan.lib.task as task

log = logging.getLogger(__name__)


def fake_function():
return 42


class TestTasks(object):

def setup(self):
task.clear_tasks('low')
task.clear_tasks('medium')
task.clear_tasks('high')

def test_simple_async(self):
task.async(fake_function, [])
assert task.task_count('medium') == 1

@raises(ValueError)
def test_failing_async_bad_priority(self):
task.async(fake_function, [], priority="immediately")

def test_queue_size(self):
task.async(fake_function, [], priority='low')
task.async(fake_function, [], priority='low')
task.async(fake_function, [], priority='low')
assert task.task_count('low') == 3
task.clear_queue('low')
assert task.task_count('low') == 0

@raises(ValueError)
def test_queue_size_invalid(self):
assert task.task_count('immediately') == 0

def test_queue_size(self):
task.async(fake_function, [], priority='low')
task.async(fake_function, [], priority='medium')
task.async(fake_function, [], priority='high')
assert task.task_count() == 3
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -33,6 +33,7 @@ repoze.who-friendlyform==1.0.8
repoze.who==2.0
requests==2.7.0
routes==1.13
rq==0.5.6
simplejson==3.3.1 # via pylons HAND-FIXED FOR NOW #2681
six==1.10.0 # via pastescript, sqlalchemy-migrate
solrpy==0.9.5
Expand Down

0 comments on commit 6b19927

Please sign in to comment.