diff --git a/ckan/lib/cli.py b/ckan/lib/cli.py index e24156144d4..0bc9c00d3f3 100644 --- a/ckan/lib/cli.py +++ b/ckan/lib/cli.py @@ -11,20 +11,23 @@ import itertools import json import logging +import urlparse +from optparse import OptionConflictError + +import sqlalchemy as sa +import routes +import paste.script +from paste.registry import Registry +from paste.script.util.logging_config import fileConfig + import ckan.logic as logic import ckan.model as model import ckan.include.rjsmin as rjsmin import ckan.include.rcssmin as rcssmin import ckan.lib.fanstatic_resources as fanstatic_resources import ckan.plugins as p -import sqlalchemy as sa -import urlparse -import routes from ckan.common import config -import paste.script -from paste.registry import Registry -from paste.script.util.logging_config import fileConfig #NB No CKAN imports are allowed until after the config file is loaded. # i.e. do the imports in methods, after _load_config is called. @@ -2567,7 +2570,7 @@ class JobsCommand(CkanCommand): Usage: - paster jobs worker [QUEUES] + paster jobs worker [--burst] [QUEUES] Start a worker that fetches jobs from queues and executes them. If no queue names are given then the worker listens @@ -2585,6 +2588,9 @@ class JobsCommand(CkanCommand): paster jobs worker default my-custom-queue + If the `--burst` option is given then the worker will exit + as soon as all its queues are empty. + paster jobs list [QUEUES] List currently enqueued jobs from the given queues. If no queue @@ -2610,6 +2616,17 @@ class JobsCommand(CkanCommand): usage = __doc__ min_args = 0 + + def __init__(self, *args, **kwargs): + super(JobsCommand, self).__init__(*args, **kwargs) + try: + self.parser.add_option(u'--burst', action='store_true', + default=False, + help=u'Start worker in burst mode.') + except OptionConflictError: + # Option has already been added in previous call + pass + def command(self): self._load_config() try: @@ -2632,7 +2649,7 @@ def command(self): def worker(self): from ckan.lib.jobs import Worker - Worker(self.args).work() + Worker(self.args).work(burst=self.options.burst) def list(self): data_dict = { diff --git a/ckan/tests/helpers.py b/ckan/tests/helpers.py index ff60fd21eaa..c0e7e2752f5 100644 --- a/ckan/tests/helpers.py +++ b/ckan/tests/helpers.py @@ -22,9 +22,12 @@ import collections import contextlib +import errno import functools import logging +import os import re +import tempfile import webtest import nose.tools @@ -543,22 +546,88 @@ def wrapper(*args, **kwargs): return decorator -class CapturingLogHandler(logging.Handler): +@contextlib.contextmanager +def recorded_logs(logger=None, level=logging.DEBUG, + override_disabled=True, override_global_level=True): u''' - Log handler to check for expected logs. + Context manager for recording log messages. + + :param logger: The logger to record messages from. Can either be a + :py:class:`logging.Logger` instance or a string with the + logger's name. Defaults to the root logger. + + :param int level: Temporary log level for the target logger while + the context manager is active. Pass ``None`` if you don't want + the level to be changed. The level is automatically reset to its + original value when the context manager is left. + + :param bool override_disabled: A logger can be disabled by setting + its ``disabled`` attribute. By default, this context manager + sets that attribute to ``False`` at the beginning of its + execution and resets it when the context manager is left. Set + ``override_disabled`` to ``False`` to keep the current value + of the attribute. + + :param bool override_global_level: The ``logging.disable`` function + allows one to install a global minimum log level that takes + precedence over a logger's own level. By default, this context + manager makes sure that the global limit is at most ``level``, + and reduces it if necessary during its execution. Set + ``override_global_level`` to ``False`` to keep the global limit. + + :returns: A recording log handler that listens to ``logger`` during + the execution of the context manager. + :rtype: :py:class:`RecordingLogHandler` + + Example:: - Automatically attaches itself to the root logger or to ``logger`` if - given. ``logger`` can be a string with the logger's name or an - actual ``logging.Logger`` instance. + import logging + + logger = logging.getLogger(__name__) + + with recorded_logs(logger) as logs: + logger.info(u'Hello, world!') + + logs.assert_log(u'info', u'world') ''' - def __init__(self, logger=None, *args, **kwargs): - super(CapturingLogHandler, self).__init__(*args, **kwargs) + if logger is None: + logger = logging.getLogger() + elif not isinstance(logger, logging.Logger): + logger = logging.getLogger(logger) + handler = RecordingLogHandler() + old_level = logger.level + manager_level = logger.manager.disable + disabled = logger.disabled + logger.addHandler(handler) + try: + if level is not None: + logger.setLevel(level) + if override_disabled: + logger.disabled = False + if override_global_level: + if (level is None) and (manager_level > old_level): + logger.manager.disable = old_level + elif (level is not None) and (manager_level > level): + logger.manager.disable = level + yield handler + finally: + logger.handlers.remove(handler) + logger.setLevel(old_level) + logger.disabled = disabled + logger.manager.disable = manager_level + + +class RecordingLogHandler(logging.Handler): + u''' + Log handler that records log messages for later inspection. + + You can inspect the recorded messages via the ``messages`` attribute + (a dict that maps log levels to lists of messages) or by using + ``assert_log``. + ''' + def __init__(self, *args, **kwargs): + super(RecordingLogHandler, self).__init__(*args, **kwargs) self.clear() - if logger is None: - logger = logging.getLogger() - elif not isinstance(logger, logging.Logger): - logger = logging.getLogger(logger) - logger.addHandler(self) def emit(self, record): self.messages[record.levelname.lower()].append(record.getMessage()) @@ -577,10 +646,19 @@ def assert_log(self, level, pattern, msg=None): :raises AssertionError: If the expected message was not logged. ''' - pattern = re.compile(pattern) + compiled_pattern = re.compile(pattern) for log_msg in self.messages[level]: - if pattern.search(log_msg): + if compiled_pattern.search(log_msg): return + if not msg: + if self.messages[level]: + lines = u'\n '.join(self.messages[level]) + msg = (u'Pattern "{}" was not found in the log messages for ' + + u'level "{}":\n {}').format(pattern, level, lines) + else: + msg = (u'Pattern "{}" was not found in the log messages for ' + + u'level "{}" (no messages were recorded for that ' + + u'level).').format(pattern, level) raise AssertionError(msg) def clear(self): @@ -588,3 +666,27 @@ def clear(self): Clear all captured log messages. ''' self.messages = collections.defaultdict(list) + + +@contextlib.contextmanager +def temp_file(*args, **kwargs): + u''' + Context manager that provides a temporary file. + + The temporary file is named and open. It is automatically deleted + when the context manager is left if it still exists at that point. + + Any arguments are passed on to + :py:func:`tempfile.NamedTemporaryFile`. + ''' + kwargs['delete'] = False + f = tempfile.NamedTemporaryFile(*args, **kwargs) + try: + yield f + finally: + f.close() + try: + os.remove(f.name) + except OSError as e: + if e.errno != errno.ENOENT: + raise diff --git a/ckan/tests/lib/test_cli.py b/ckan/tests/lib/test_cli.py index c8ddd77222d..6a706cf3a8a 100644 --- a/ckan/tests/lib/test_cli.py +++ b/ckan/tests/lib/test_cli.py @@ -1,22 +1,77 @@ # encoding: utf-8 +import datetime import logging +import os +import os.path +from StringIO import StringIO +import sys -from nose.tools import assert_raises +from nose.tools import (assert_raises, eq_ as eq, ok_ as ok, assert_in, + assert_not_in, assert_not_equal as neq, assert_false as nok) +from pylons import config +from paste.script.command import run -from ckan.lib.cli import UserCmd +import ckan.lib.cli as cli +import ckan.lib.jobs as jobs import ckan.tests.helpers as helpers log = logging.getLogger(__name__) +def paster(*args, **kwargs): + ''' + Call a paster command. + + All arguments are parsed and passed on to the command. The + ``--config`` option is automatically appended. + + By default, an ``AssertionError`` is raised if the command exits + with a non-zero return code or if anything is written to STDERR. + Pass ``fail_on_error=False`` to disable this behavior. + + Example:: + + code, stdout, stderr = paster(u'jobs', u'list') + assert u'My Job Title' in stdout + + code, stdout, stderr = paster(u'jobs', u'foobar', + fail_on_error=False) + assert code == 1 + assert u'Unknown command' in stderr + + Any ``SystemExit`` raised by the command is swallowed. + + :returns: A tuple containing the return code, the content of + STDOUT, and the content of STDERR. + ''' + fail_on_error = kwargs.pop(u'fail_on_error', True) + args = list(args) + [u'--config=' + config[u'__file__']] + sys.stdout, sys.stderr = StringIO(u''), StringIO(u'') + code = 0 + try: + run(args) + except SystemExit as e: + code = e.code + finally: + stdout, stderr = sys.stdout.getvalue(), sys.stderr.getvalue() + sys.stdout, sys.stderr = sys.__stdout__, sys.__stderr__ + if code != 0 and fail_on_error: + raise AssertionError(u'Paster command exited with non-zero ' + + u'return code {}: {}'.format(code, stderr)) + if stderr.strip() and fail_on_error: + raise AssertionError(u'Paster command wrote to STDERR: {}'.format( + stderr)) + return code, stdout, stderr + + class TestUserAdd(object): '''Tests for UserCmd.add''' @classmethod def setup_class(cls): - cls.user_cmd = UserCmd('user-command') + cls.user_cmd = cli.UserCmd('user-command') def setup(self): helpers.reset_db() @@ -70,3 +125,190 @@ def test_cli_user_add_unicode_fullname_system_exit(self): self.user_cmd.add() except SystemExit: assert False, "SystemExit exception shouldn't be raised" + + +class TestJobsUnknown(helpers.RQTestBase): + ''' + Test unknown sub-command for ``paster jobs``. + ''' + def test_unknown_command(self): + ''' + Test error handling for unknown ``paster jobs`` sub-command. + ''' + code, stdout, stderr = paster(u'jobs', u'does-not-exist', + fail_on_error=False) + neq(code, 0) + assert_in(u'Unknown command', stderr) + + +class TestJobsList(helpers.RQTestBase): + ''' + Tests for ``paster jobs list``. + ''' + def test_list_default_queue(self): + ''' + Test output of ``jobs list`` for default queue. + ''' + job = self.enqueue() + stdout = paster(u'jobs', u'list')[1] + fields = stdout.split() + eq(len(fields), 3) + dt = datetime.datetime.strptime(fields[0], u'%Y-%m-%dT%H:%M:%S') + now = datetime.datetime.utcnow() + ok(abs((now - dt).total_seconds()) < 10) + eq(fields[1], job.id) + eq(fields[2], jobs.DEFAULT_QUEUE_NAME) + + def test_list_other_queue(self): + ''' + Test output of ``jobs.list`` for non-default queue. + ''' + job = self.enqueue(queue=u'my_queue') + stdout = paster(u'jobs', u'list')[1] + fields = stdout.split() + eq(len(fields), 3) + eq(fields[2], u'my_queue') + + def test_list_title(self): + ''' + Test title output of ``jobs list``. + ''' + job = self.enqueue(title=u'My_Title') + stdout = paster(u'jobs', u'list')[1] + fields = stdout.split() + eq(len(fields), 4) + eq(fields[3], u'"My_Title"') + + def test_list_filter(self): + ''' + Test filtering by queues for ``jobs list``. + ''' + job1 = self.enqueue(queue=u'q1') + job2 = self.enqueue(queue=u'q2') + job3 = self.enqueue(queue=u'q3') + stdout = paster(u'jobs', u'list', u'q1', u'q2')[1] + assert_in(u'q1', stdout) + assert_in(u'q2', stdout) + assert_not_in(u'q3', stdout) + + +class TestJobsCancel(helpers.RQTestBase): + ''' + Tests for ``paster jobs cancel``. + ''' + def test_cancel_existing(self): + ''' + Test ``jobs cancel`` for an existing job. + ''' + job1 = self.enqueue() + job2 = self.enqueue() + stdout = paster(u'jobs', u'cancel', job1.id)[1] + all_jobs = self.all_jobs() + eq(len(all_jobs), 1) + eq(all_jobs[0].id, job2.id) + assert_in(job1.id, stdout) + + def test_cancel_not_existing(self): + ''' + Test ``jobs cancel`` for a not existing job. + ''' + code, stdout, stderr = paster(u'jobs', u'cancel', u'does-not-exist', + fail_on_error=False) + neq(code, 0) + assert_in(u'does-not-exist', stderr) + + +class TestJobsClear(helpers.RQTestBase): + ''' + Tests for ``paster jobs clear``. + ''' + def test_clear_all_queues(self): + ''' + Test clearing all queues via ``jobs clear``. + ''' + self.enqueue() + self.enqueue() + self.enqueue(queue=u'q1') + self.enqueue(queue=u'q2') + stdout = paster(u'jobs', u'clear')[1] + assert_in(jobs.DEFAULT_QUEUE_NAME, stdout) + assert_in(u'q1', stdout) + assert_in(u'q2', stdout) + eq(self.all_jobs(), []) + + def test_clear_specific_queues(self): + ''' + Test clearing specific queues via ``jobs clear``. + ''' + job1 = self.enqueue() + job2 = self.enqueue(queue=u'q1') + self.enqueue(queue=u'q2') + self.enqueue(queue=u'q2') + self.enqueue(queue=u'q3') + stdout = paster(u'jobs', u'clear', u'q2', u'q3')[1] + assert_in(u'q2', stdout) + assert_in(u'q3', stdout) + assert_not_in(jobs.DEFAULT_QUEUE_NAME, stdout) + assert_not_in(u'q1', stdout) + all_jobs = self.all_jobs() + eq(set(all_jobs), {job1, job2}) + + +class TestJobsTest(helpers.RQTestBase): + ''' + Tests for ``paster jobs test``. + ''' + def test_test_default_queue(self): + ''' + Test ``jobs test`` for the default queue. + ''' + stdout = paster(u'jobs', u'test')[1] + all_jobs = self.all_jobs() + eq(len(all_jobs), 1) + eq(jobs.remove_queue_name_prefix(all_jobs[0].origin), + jobs.DEFAULT_QUEUE_NAME) + + def test_test_specific_queues(self): + ''' + Test ``jobs test`` for specific queues. + ''' + stdout = paster(u'jobs', u'test', u'q1', u'q2')[1] + all_jobs = self.all_jobs() + eq(len(all_jobs), 2) + eq({jobs.remove_queue_name_prefix(j.origin) for j in all_jobs}, + {u'q1', u'q2'}) + + +class TestJobsWorker(helpers.RQTestBase): + ''' + Tests for ``paster jobs worker``. + ''' + # All tests of ``jobs worker`` must use the ``--burst`` option to + # make sure that the worker exits. + + def test_worker_default_queue(self): + ''' + Test ``jobs worker`` with the default queue. + ''' + with helpers.temp_file() as f: + self.enqueue(os.remove, args=[f.name]) + paster(u'jobs', u'worker', u'--burst') + all_jobs = self.all_jobs() + eq(all_jobs, []) + nok(os.path.isfile(f.name)) + + def test_worker_specific_queues(self): + ''' + Test ``jobs worker`` with specific queues. + ''' + with helpers.temp_file() as f: + with helpers.temp_file() as g: + job1 = self.enqueue() + job2 = self.enqueue(queue=u'q2') + self.enqueue(os.remove, args=[f.name], queue=u'q3') + self.enqueue(os.remove, args=[g.name], queue=u'q4') + paster(u'jobs', u'worker', u'--burst', u'q3', u'q4') + all_jobs = self.all_jobs() + eq(set(all_jobs), {job1, job2}) + nok(os.path.isfile(f.name)) + nok(os.path.isfile(g.name)) diff --git a/ckan/tests/lib/test_jobs.py b/ckan/tests/lib/test_jobs.py index 95b7618b750..ed842ffa4e7 100644 --- a/ckan/tests/lib/test_jobs.py +++ b/ckan/tests/lib/test_jobs.py @@ -11,7 +11,7 @@ import rq import ckan.lib.jobs as jobs -from ckan.tests.helpers import CapturingLogHandler, changed_config, RQTestBase +from ckan.tests.helpers import changed_config, recorded_logs, RQTestBase class TestQueueNamePrefixes(RQTestBase): @@ -149,9 +149,9 @@ def test_worker_logging_lifecycle(self): ''' queue = u'my_queue' job = self.enqueue(queue=queue) - logs = CapturingLogHandler(u'ckan.lib.jobs') - worker = jobs.Worker([queue]) - worker.work(burst=True) + with recorded_logs(u'ckan.lib.jobs') as logs: + worker = jobs.Worker([queue]) + worker.work(burst=True) messages = logs.messages[u'info'] # We expect 4 log messages: Worker start, job start, job end, # worker end. @@ -169,16 +169,16 @@ def test_worker_exception_logging(self): Test that exceptions in a job are logged. ''' job = self.enqueue(failing_job) - logs = CapturingLogHandler(u'ckan.lib.jobs') worker = jobs.Worker() # Prevent worker from forking so that we can capture log # messages from within the job def execute_job(*args, **kwargs): return worker.perform_job(*args, **kwargs) - worker.execute_job = execute_job - worker.work(burst=True) + worker.execute_job = execute_job + with recorded_logs(u'ckan.lib.jobs') as logs: + worker.work(burst=True) logs.assert_log(u'error', u'JOB FAILURE') def test_worker_default_queue(self): diff --git a/ckan/tests/logic/action/test_delete.py b/ckan/tests/logic/action/test_delete.py index 72783dd5dc9..6563cb926b5 100644 --- a/ckan/tests/logic/action/test_delete.py +++ b/ckan/tests/logic/action/test_delete.py @@ -514,8 +514,8 @@ def test_specific_queues(self): job2 = self.enqueue(queue=u'q1') job3 = self.enqueue(queue=u'q1') job4 = self.enqueue(queue=u'q2') - logs = helpers.CapturingLogHandler(u'ckan.logic') - queues = helpers.call_action(u'job_clear', queues=[u'q1', u'q2']) + with helpers.recorded_logs(u'ckan.logic') as logs: + queues = helpers.call_action(u'job_clear', queues=[u'q1', u'q2']) eq({u'q1', u'q2'}, set(queues)) all_jobs = self.all_jobs() eq(len(all_jobs), 1) @@ -532,8 +532,8 @@ def test_existing_job(self): ''' job1 = self.enqueue(queue=u'q') job2 = self.enqueue(queue=u'q') - logs = helpers.CapturingLogHandler(u'ckan.logic') - helpers.call_action(u'job_cancel', id=job1.id) + with helpers.recorded_logs(u'ckan.logic') as logs: + helpers.call_action(u'job_cancel', id=job1.id) all_jobs = self.all_jobs() eq(len(all_jobs), 1) eq(all_jobs[0], job2)