Skip to content

Commit

Permalink
[#2977] Lazy initialization of background job queue.
Browse files Browse the repository at this point in the history
Check Redis availability on server start but initialize queue only once
it is actually required.
  • Loading branch information
torfsen committed Sep 12, 2016
1 parent cc46efa commit 4f50fe3
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 37 deletions.
6 changes: 6 additions & 0 deletions ckan/config/environment.py
Expand Up @@ -16,6 +16,7 @@
import ckan.plugins as p
import ckan.lib.helpers as helpers
import ckan.lib.app_globals as app_globals
import ckan.lib.jobs as jobs
import ckan.lib.render as render
import ckan.lib.search as search
import ckan.logic as logic
Expand Down Expand Up @@ -93,6 +94,11 @@ def find_controller(self, controller):
for msg in msgs:
warnings.filterwarnings('ignore', msg, sqlalchemy.exc.SAWarning)

# Check Redis availability
if not jobs.is_available():
log.critical('Could not connect to Redis. The background job queue '
'will not be available.')

# load all CKAN plugins
p.load_all()

Expand Down
7 changes: 2 additions & 5 deletions ckan/lib/cli.py
Expand Up @@ -2579,8 +2579,6 @@ class JobsCommand(CkanCommand):

def command(self):
self._load_config()
from ckan.lib.jobs import init_queue
self.queue = init_queue()

if not self.args:
print(self.__doc__)
Expand All @@ -2601,9 +2599,8 @@ def command(self):
error(u'Unknown command "{}"'.format(cmd))

def worker(self):
from ckan.lib.jobs import init_queue, Worker
worker = Worker(self.queue)
worker.work()
from ckan.lib.jobs import Worker
Worker().work()

def list(self):
jobs = p.toolkit.get_action('job_list')({}, {})
Expand Down
104 changes: 77 additions & 27 deletions ckan/lib/jobs.py
Expand Up @@ -11,7 +11,7 @@
import logging

from pylons import config
from redis import Redis
from redis import ConnectionPool, Redis
from rq import Queue, Worker as RqWorker
from rq.connections import push_connection

Expand All @@ -22,26 +22,63 @@

REDIS_URL_DEFAULT_VALUE = u'redis://localhost:6379/0'

queue = None
# Redis connection pool. Do not use this directly, use ``connect_to_redis``
# instead.
_connection_pool = None

# RQ job queue. Do not use this directly, use ``get_queue`` instead.
_queue = None

def init_queue():

def connect_to_redis():
u'''
(Lazily) connect to Redis.
The connection is set up but not actually established. The latter
happens automatically once the connection is used.
:returns: A lazy Redis connection.
:rtype: ``redis.Redis``
'''
global _connection_pool
if _connection_pool is None:
url = config.get(REDIS_URL_SETTING_NAME, REDIS_URL_DEFAULT_VALUE)
log.debug(u'Using Redis at {}'.format(url))
_connection_pool = ConnectionPool.from_url(url)
return Redis(connection_pool=_connection_pool)


def is_available():
u'''
Initialize the job queue.
Check whether Redis is available.
:returns: The queue.
:returns: The availability of Redis.
:rtype: boolean
'''
redis_conn = connect_to_redis()
try:
return redis_conn.ping()
except Exception:
log.exception(u'Redis is not available')
return False


def get_queue():
u'''
Get the job queue.
The job queue is initialized if that hasn't happened before.
:returns: The job queue.
:rtype: ``rq.queue.Queue``
'''
global queue
if queue is not None:
return
redis_url = config.get(REDIS_URL_SETTING_NAME, REDIS_URL_DEFAULT_VALUE)
log.warn(u'Initializing background job queue at {}'.format(redis_url))
redis_conn = Redis.from_url(redis_url)
redis_conn.ping() # Force connection check
queue = Queue(connection=redis_conn)
push_connection(redis_conn) # See https://github.com/nvie/rq/issues/479
return queue
global _queue
if _queue is None:
log.debug(u'Initializing the background job queue')
redis_conn = connect_to_redis()
_queue = Queue(connection=redis_conn)
push_connection(redis_conn) # https://github.com/nvie/rq/issues/479
return _queue


def enqueue(fn, args=None, title=None):
Expand All @@ -60,7 +97,7 @@ def enqueue(fn, args=None, title=None):
'''
if args is None:
args = []
job = queue.enqueue_call(func=fn, args=args)
job = get_queue().enqueue_call(func=fn, args=args)
job.meta[u'title'] = title
job.save()
if title:
Expand All @@ -82,10 +119,10 @@ def from_id(id):
:raises KeyError: if no job with that ID exists.
'''
for job in queue.jobs:
for job in get_queue().jobs:
if job.id == id:
return job
raise KeyError('No such job: "{}"'.format(id))
raise KeyError(u'No such job: "{}"'.format(id))


def dictize_job(job):
Expand All @@ -101,37 +138,50 @@ def dictize_job(job):
:rtype: dict
'''
return {
'id': job.id,
'title': job.meta.get('title'),
'created': job.created_at.isoformat(),
u'id': job.id,
u'title': job.meta.get(u'title'),
u'created': job.created_at.isoformat(),
}


def test_job(*args):
u'''Test job.
A test job for debugging purposes. Prints out any arguments it
receives.
receives. Can be scheduled via ``paster jobs test``.
'''
print(args)


class Worker(RqWorker):
u'''
Custom worker with CKAN-specific logging.
CKAN-specific worker.
'''
def __init__(self, queues=None, *args, **kwargs):
u'''
Constructor.
Accepts the same arguments as the constructor of
``rq.worker.Worker``. However, ``queues`` defaults to the CKAN
background job queue.
'''
if queues is None:
queues = get_queue()
super(Worker, self).__init__(queues, *args, **kwargs)

def register_birth(self, *args, **kwargs):
result = super(Worker, self).register_birth(*args, **kwargs)
log.info('Worker {} has started (PID: {})'.format(self.key, self.pid))
log.info(u'Worker {} has started (PID: {})'.format(self.key, self.pid))
return result

def execute_job(self, job, *args, **kwargs):
log.info('Worker {} starts to execute job {}'.format(self.key, job.id))
log.info(u'Worker {} starts executing job {}'.format(self.key, job.id))
result = super(Worker, self).execute_job(job, *args, **kwargs)
log.info('Worker {} has finished executing job {}'.format(self.key, job.id))
log.info(u'Worker {} has finished executing job {}'.format(
self.key, job.id))
return result

def register_death(self, *args, **kwargs):
result = super(Worker, self).register_death(*args, **kwargs)
log.info('Worker {} has stopped'.format(self.key))
log.info(u'Worker {} has stopped'.format(self.key))
return result
2 changes: 1 addition & 1 deletion ckan/logic/action/delete.py
Expand Up @@ -715,7 +715,7 @@ def job_clear(context, data_dict):
Does not affect jobs that are already being processed.
'''
_check_access('job_clear', context, data_dict)
jobs.queue.empty()
jobs.get_queue().empty()
log.warn('Cleared background job queue')


Expand Down
2 changes: 1 addition & 1 deletion ckan/logic/action/get.py
Expand Up @@ -3510,7 +3510,7 @@ def job_list(context, data_dict):
:rtype: list
'''
_check_access('job_list', context, data_dict)
return [jobs.dictize_job(job) for job in jobs.queue.jobs]
return [jobs.dictize_job(job) for job in jobs.get_queue().jobs]


def job_show(context, data_dict):
Expand Down
4 changes: 1 addition & 3 deletions ckan/websetup.py
Expand Up @@ -10,11 +10,9 @@
def setup_app(command, conf, vars):
"""Place any commands to setup ckan here"""
load_environment(conf.global_conf, conf.local_conf)

from ckan import model
log.debug('Creating tables')
model.repo.create_db()
log.info('Creating tables: SUCCESS')

from ckan.lib.jobs import init_queue
init_queue()

0 comments on commit 4f50fe3

Please sign in to comment.