diff --git a/ckan/config/deployment.ini_tmpl b/ckan/config/deployment.ini_tmpl index 73acd9bb6d7..ee59c86e329 100644 --- a/ckan/config/deployment.ini_tmpl +++ b/ckan/config/deployment.ini_tmpl @@ -79,6 +79,12 @@ ckan.site_id = default #solr_url = http://127.0.0.1:8983/solr +## Redis Settings + +# URL to your Redis instance, including the database to be used. +#ckan.redis.url = redis://localhost:6379/0 + + ## CORS Settings # If cors.origin_allow_all is true, all origins are allowed. diff --git a/ckan/config/environment.py b/ckan/config/environment.py index 278e5fceb0b..fb5f7d2cdc9 100644 --- a/ckan/config/environment.py +++ b/ckan/config/environment.py @@ -16,6 +16,7 @@ import ckan.plugins as p import ckan.lib.helpers as helpers import ckan.lib.app_globals as app_globals +from ckan.lib.redis import is_redis_available import ckan.lib.render as render import ckan.lib.search as search import ckan.logic as logic @@ -93,6 +94,10 @@ def find_controller(self, controller): for msg in msgs: warnings.filterwarnings('ignore', msg, sqlalchemy.exc.SAWarning) + # Check Redis availability + if not is_redis_available(): + log.critical('Could not connect to Redis.') + # load all CKAN plugins p.load_all() @@ -105,6 +110,7 @@ def find_controller(self, controller): 'sqlalchemy.url': 'CKAN_SQLALCHEMY_URL', 'ckan.datastore.write_url': 'CKAN_DATASTORE_WRITE_URL', 'ckan.datastore.read_url': 'CKAN_DATASTORE_READ_URL', + 'ckan.redis.url': 'CKAN_REDIS_URL', 'solr_url': 'CKAN_SOLR_URL', 'ckan.site_id': 'CKAN_SITE_ID', 'ckan.site_url': 'CKAN_SITE_URL', diff --git a/ckan/config/resource_formats.json b/ckan/config/resource_formats.json index fad65202015..d4749a99b1f 100644 --- a/ckan/config/resource_formats.json +++ b/ckan/config/resource_formats.json @@ -69,7 +69,7 @@ ["TAR", "TAR Compressed File", "application/x-tar", []], ["PNG", "PNG Image File", "image/png", []], ["RSS", "RSS feed", "application/rss+xml", []], - ["GeoJSON", "Geographic JavaScript Object Notation", null, []], + ["GeoJSON", "Geographic JavaScript Object Notation", "application/geo+json", ["geojson"]], ["SHP", "Shapefile", null, ["esri shapefile"]], ["TORRENT", "Torrent", "application/x-bittorrent", ["bittorrent"]], ["ICS", "iCalendar", "text/calendar", ["ifb", "iCal"]], diff --git a/ckan/config/solr/schema-1.2.xml b/ckan/config/solr/schema-1.2.xml deleted file mode 100644 index 4f9b11a580b..00000000000 --- a/ckan/config/solr/schema-1.2.xml +++ /dev/null @@ -1,170 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -id -text - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/config/solr/schema-1.3.xml b/ckan/config/solr/schema-1.3.xml deleted file mode 100644 index 89fea815356..00000000000 --- a/ckan/config/solr/schema-1.3.xml +++ /dev/null @@ -1,179 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -index_id -text - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/config/solr/schema-1.4.xml b/ckan/config/solr/schema-1.4.xml deleted file mode 100644 index 98cbf378dbe..00000000000 --- a/ckan/config/solr/schema-1.4.xml +++ /dev/null @@ -1,190 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -index_id -text - - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/config/solr/schema-2.0.xml b/ckan/config/solr/schema-2.0.xml deleted file mode 100644 index b306bb5d8c7..00000000000 --- a/ckan/config/solr/schema-2.0.xml +++ /dev/null @@ -1,173 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -index_id -text - - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/config/solr/schema.xml b/ckan/config/solr/schema.xml index e8893f70ff9..4578c164b10 100644 --- a/ckan/config/solr/schema.xml +++ b/ckan/config/solr/schema.xml @@ -24,7 +24,7 @@ - + @@ -112,6 +112,7 @@ schema. In this case the version should be set to the next CKAN version number. + diff --git a/ckan/config/supervisor-ckan-worker.conf b/ckan/config/supervisor-ckan-worker.conf new file mode 100644 index 00000000000..1cf2ffb460a --- /dev/null +++ b/ckan/config/supervisor-ckan-worker.conf @@ -0,0 +1,43 @@ +; ======================================================= +; Supervisor configuration for CKAN background job worker +; ======================================================= + +; 1. Copy this file to /etc/supervisr/conf.d +; 2. Make sure the paths below match your setup + + +[program:ckan-worker] + +; Use the full paths to the virtualenv and your configuration file here. +command=/usr/lib/ckan/default/bin/paster --plugin=ckan jobs worker --config=/etc/ckan/default/production.ini + + +; User the worker runs as. +user=www-data + + +; Start just a single worker. Increase this number if you have many or +; particularly long running background jobs. +numprocs=1 +process_name=%(program_name)s-%(process_num)02d + + +; Log files. +stdout_logfile=/var/log/ckan-worker.log +stderr_logfile=/var/log/ckan-worker.log + + +; Make sure that the worker is started on system start and automatically +; restarted if it crashes unexpectedly. +autostart=true +autorestart=true + + +; Number of seconds the process has to run before it is considered to have +; started successfully. +startsecs=10 + +; Need to wait for currently executing tasks to finish at shutdown. +; Increase this if you have very long running tasks. +stopwaitsecs = 600 + diff --git a/ckan/controllers/package.py b/ckan/controllers/package.py index 6b65a4514af..c89a999e9bc 100644 --- a/ckan/controllers/package.py +++ b/ckan/controllers/package.py @@ -30,7 +30,6 @@ render = base.render abort = base.abort -redirect = base.redirect NotFound = logic.NotFound NotAuthorized = logic.NotAuthorized @@ -575,8 +574,8 @@ def resource_edit(self, id, resource_id, data=None, errors=None, errors, error_summary) except NotAuthorized: abort(403, _('Unauthorized to edit this resource')) - redirect(h.url_for(controller='package', action='resource_read', - id=id, resource_id=resource_id)) + h.redirect_to(controller='package', action='resource_read', id=id, + resource_id=resource_id) context = {'model': model, 'session': model.Session, 'api_version': 3, 'for_edit': True, @@ -645,8 +644,7 @@ def new_resource(self, id, data=None, errors=None, error_summary=None): if not data_provided and save_action != "go-dataset-complete": if save_action == 'go-dataset': # go to final stage of adddataset - redirect(h.url_for(controller='package', - action='edit', id=id)) + h.redirect_to(controller='package', action='edit', id=id) # see if we have added any resources try: data_dict = get_action('package_show')(context, {'id': id}) @@ -661,8 +659,8 @@ def new_resource(self, id, data=None, errors=None, error_summary=None): # On new templates do not use flash message if g.legacy_templates: h.flash_error(msg) - redirect(h.url_for(controller='package', - action='new_resource', id=id)) + h.redirect_to(controller='package', + action='new_resource', id=id) else: errors = {} error_summary = {_('Error'): msg} @@ -673,8 +671,7 @@ def new_resource(self, id, data=None, errors=None, error_summary=None): get_action('package_update')( dict(context, allow_state_change=True), dict(data_dict, state='active')) - redirect(h.url_for(controller='package', - action='read', id=id)) + h.redirect_to(controller='package', action='read', id=id) data['package_id'] = id try: @@ -698,20 +695,17 @@ def new_resource(self, id, data=None, errors=None, error_summary=None): get_action('package_update')( dict(context, allow_state_change=True), dict(data_dict, state='active')) - redirect(h.url_for(controller='package', - action='read', id=id)) + h.redirect_to(controller='package', action='read', id=id) elif save_action == 'go-dataset': # go to first stage of add dataset - redirect(h.url_for(controller='package', - action='edit', id=id)) + h.redirect_to(controller='package', action='edit', id=id) elif save_action == 'go-dataset-complete': # go to first stage of add dataset - redirect(h.url_for(controller='package', - action='read', id=id)) + h.redirect_to(controller='package', action='read', id=id) else: # add more resources - redirect(h.url_for(controller='package', - action='new_resource', id=id)) + h.redirect_to(controller='package', action='new_resource', + id=id) # get resources for sidebar context = {'model': model, 'session': model.Session, @@ -906,7 +900,7 @@ def _save_new(self, context, package_type=None): url = h.url_for(controller='package', action='new_resource', id=pkg_dict['name']) - redirect(url) + h.redirect_to(url) # Make sure we don't index this dataset if request.params['save'] not in ['go-resource', 'go-metadata']: @@ -923,7 +917,7 @@ def _save_new(self, context, package_type=None): url = h.url_for(controller='package', action='new_resource', id=pkg_dict['name']) - redirect(url) + h.redirect_to(url) self._form_save_redirect(pkg_dict['name'], 'new', package_type=package_type) @@ -1010,7 +1004,7 @@ def _form_save_redirect(self, pkgname, action, package_type=None): id=pkgname) else: url = h.url_for('{0}_read'.format(package_type), id=pkgname) - redirect(url) + h.redirect_to(url) def delete(self, id): @@ -1165,7 +1159,7 @@ def resource_download(self, id, resource_id, filename=None): return app_iter elif not 'url' in rsc: abort(404, _('No download is available')) - redirect(rsc['url']) + h.redirect_to(rsc['url']) def follow(self, id): '''Start following this dataset.''' @@ -1264,8 +1258,7 @@ def groups(self, id): get_action('member_delete')(context, data_dict) except NotFound: abort(404, _('Group not found')) - redirect(h.url_for(controller='package', - action='groups', id=id)) + h.redirect_to(controller='package', action='groups', id=id) context['is_member'] = True users_groups = get_action('group_list_authz')(context, data_dict) @@ -1477,9 +1470,9 @@ def edit_view(self, id, resource_id, view_id=None): abort(403, _('Unauthorized to edit resource')) else: if not to_preview: - redirect(h.url_for(controller='package', - action='resource_views', - id=id, resource_id=resource_id)) + h.redirect_to(controller='package', + action='resource_views', + id=id, resource_id=resource_id) ## view_id exists only when updating if view_id: diff --git a/ckan/controllers/util.py b/ckan/controllers/util.py index 97c857b0ff5..4a41bfb0dc2 100644 --- a/ckan/controllers/util.py +++ b/ckan/controllers/util.py @@ -18,7 +18,7 @@ def redirect(self): base.abort(400, _('Missing Value') + ': url') if h.url_is_local(url): - return base.redirect(url) + return h.redirect_to(url) else: base.abort(403, _('Redirecting to external site is not allowed.')) diff --git a/ckan/lib/base.py b/ckan/lib/base.py index 3334a662599..d6e5ff2fb2d 100644 --- a/ckan/lib/base.py +++ b/ckan/lib/base.py @@ -11,7 +11,6 @@ from pylons import cache, session from pylons.controllers import WSGIController from pylons.controllers.util import abort as _abort -from pylons.controllers.util import redirect_to, redirect from pylons.decorators import jsonify from pylons.i18n import N_, gettext, ngettext from pylons.templating import cached_template, pylons_globals diff --git a/ckan/lib/celery_app.py b/ckan/lib/celery_app.py index 9f66e85f33b..047acfa1f01 100644 --- a/ckan/lib/celery_app.py +++ b/ckan/lib/celery_app.py @@ -1,5 +1,11 @@ # encoding: utf-8 +''' +Celery background tasks management. + +This module is DEPRECATED, use ``ckan.lib.jobs`` instead. +''' + import ConfigParser import os import logging @@ -9,6 +15,8 @@ log = logging.getLogger(__name__) +log.warning('ckan.lib.celery_app is deprecated, use ckan.lib.jobs instead.') + LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split() from celery import Celery diff --git a/ckan/lib/cli.py b/ckan/lib/cli.py index 5752edd80e1..888889a8fee 100644 --- a/ckan/lib/cli.py +++ b/ckan/lib/cli.py @@ -11,20 +11,23 @@ import itertools import json import logging +import urlparse +from optparse import OptionConflictError + +import sqlalchemy as sa +import routes +import paste.script +from paste.registry import Registry +from paste.script.util.logging_config import fileConfig + import ckan.logic as logic import ckan.model as model import ckan.include.rjsmin as rjsmin import ckan.include.rcssmin as rcssmin import ckan.lib.fanstatic_resources as fanstatic_resources import ckan.plugins as p -import sqlalchemy as sa -import urlparse -import routes from ckan.common import config -import paste.script -from paste.registry import Registry -from paste.script.util.logging_config import fileConfig #NB No CKAN imports are allowed until after the config file is loaded. # i.e. do the imports in methods, after _load_config is called. @@ -43,6 +46,16 @@ def deprecation_warning(message=None): sys.stderr.write(u'\n') +def error(msg): + ''' + Print an error message to STDOUT and exit with return code 1. + ''' + sys.stderr.write(msg) + if not msg.endswith('\n'): + sys.stderr.write('\n') + sys.exit(1) + + def parse_db_config(config_key='sqlalchemy.url'): ''' Takes a config key for a database connection url and parses it into a dictionary. Expects a url like: @@ -257,8 +270,7 @@ def command(self): elif cmd == 'migrate-filestore': self.migrate_filestore() else: - print 'Command %s not recognized' % cmd - sys.exit(1) + error('Command %s not recognized' % cmd) def _get_db_config(self): return parse_db_config() @@ -610,9 +622,8 @@ def export_datasets(self, out_folder): r = urllib2.urlopen(url).read() except urllib2.HTTPError, e: if e.code == 404: - print ('Please install ckanext-dcat and enable the ' + - '`dcat` plugin to use the RDF serializations') - sys.exit(1) + error('Please install ckanext-dcat and enable the ' + + '`dcat` plugin to use the RDF serializations') with open(fname, 'wb') as f: f.write(r) except IOError, ioe: @@ -801,16 +812,14 @@ def password_prompt(cls): password1 = getpass.getpass('Password: ') password2 = getpass.getpass('Confirm password: ') if password1 != password2: - print 'Passwords do not match' - sys.exit(1) + error('Passwords do not match') return password1 def add(self): import ckan.model as model if len(self.args) < 2: - print 'Need name of the user.' - sys.exit(1) + error('Need name of the user.') username = self.args[1] # parse args into data_dict @@ -842,8 +851,7 @@ def add(self): user_dict = logic.get_action('user_create')(context, data_dict) pprint(user_dict) except logic.ValidationError, e: - print e - sys.exit(1) + error(e) def remove(self): import ckan.model as model @@ -939,7 +947,9 @@ def purge(self, dataset_ref): class Celery(CkanCommand): - '''Celery daemon + '''Celery daemon [DEPRECATED] + + This command is DEPRECATED, use `paster jobs` instead. Usage: celeryd - run the celery daemon @@ -965,10 +975,10 @@ def command(self): elif cmd == 'clean': self.clean() else: - print 'Command %s not recognized' % cmd - sys.exit(1) + error('Command %s not recognized' % cmd) def run_(self): + deprecation_warning(u'Use `paster jobs worker` instead.') default_ini = os.path.join(os.getcwd(), 'development.ini') if self.options.config: @@ -976,8 +986,7 @@ def run_(self): elif os.path.isfile(default_ini): os.environ['CKAN_CONFIG'] = default_ini else: - print 'No .ini specified and none was found in current directory' - sys.exit(1) + error('No .ini specified and none was found in current directory') from ckan.lib.celery_app import celery celery_args = [] @@ -986,6 +995,7 @@ def run_(self): celery.worker_main(argv=['celeryd', '--loglevel=INFO'] + celery_args) def view(self): + deprecation_warning(u'Use `paster jobs list` instead.') self._load_config() import ckan.model as model from kombu.transport.sqlalchemy.models import Message @@ -1000,6 +1010,7 @@ def view(self): print '%i: Invisible Sent:%s' % (message.id, message.sent_at) def clean(self): + deprecation_warning(u'Use `paster jobs clear` instead.') self._load_config() import ckan.model as model query = model.Session.execute("select * from kombu_message") @@ -1013,8 +1024,7 @@ def clean(self): print '%i of %i tasks deleted' % (tasks_initially - tasks_afterwards, tasks_initially) if tasks_afterwards: - print 'ERROR: Failed to delete all tasks' - sys.exit(1) + error('Failed to delete all tasks') model.repo.commit_and_remove() @@ -1094,15 +1104,13 @@ def command(self): self.update_all(engine, start_date) elif cmd == 'export': if len(self.args) <= 1: - print self.__class__.__doc__ - sys.exit(1) + error(self.__class__.__doc__) output_file = self.args[1] start_date = self.args[2] if len(self.args) > 2 else None self.update_all(engine, start_date) self.export_tracking(engine, output_file) else: - print self.__class__.__doc__ - sys.exit(1) + error(self.__class__.__doc__) def update_all(self, engine, start_date=None): if start_date: @@ -2250,11 +2258,9 @@ def _get_view_plugins(self, view_plugin_types, set(loaded_view_plugins)) if plugins_not_found: - msg = ('View plugin(s) not found : {0}. '.format(plugins_not_found) - + 'Have they been added to the `ckan.plugins` configuration' - + ' option?') - log.error(msg) - sys.exit(1) + error('View plugin(s) not found : {0}. '.format(plugins_not_found) + + 'Have they been added to the `ckan.plugins` configuration' + + ' option?') return loaded_view_plugins @@ -2338,8 +2344,7 @@ def _update_search_params(self, search_data_dict): try: user_search_params = json.loads(self.options.search_params) except ValueError, e: - log.error('Unable to parse JSON search parameters: {0}'.format(e)) - sys.exit(1) + error('Unable to parse JSON search parameters: {0}'.format(e)) if user_search_params.get('q'): search_data_dict['q'] = user_search_params['q'] @@ -2413,8 +2418,7 @@ def create_views(self, view_plugin_types=[]): query = self._search_datasets(page, loaded_view_plugins) if page == 1 and query['count'] == 0: - log.info('No datasets to create resource views on, exiting...') - sys.exit(1) + error('No datasets to create resource views on, exiting...') elif page == 1 and not self.options.assume_yes: @@ -2426,8 +2430,7 @@ def create_views(self, view_plugin_types=[]): loaded_view_plugins)) if confirm == 'no': - log.info('Command aborted by user') - sys.exit(1) + error('Command aborted by user') if query['results']: for dataset_dict in query['results']: @@ -2472,8 +2475,7 @@ def clear_views(self, view_plugin_types=[]): result = query_yes_no(msg, default='no') if result == 'no': - log.info('Command aborted by user') - sys.exit(1) + error('Command aborted by user') context = {'user': self.site_user['name']} logic.get_action('resource_view_clear')( @@ -2551,15 +2553,161 @@ def command(self): if options: for option in options: if '=' not in option: - sys.stderr.write( + error( 'An option does not have an equals sign: %r ' 'It should be \'key=value\'. If there are spaces ' 'you\'ll need to quote the option.\n' % option) - sys.exit(1) try: config_tool.config_edit_using_option_strings( config_filepath, options, self.options.section, edit=self.options.edit) except config_tool.ConfigToolError, e: - sys.stderr.write(e.message) - sys.exit(1) + error(e) + + +class JobsCommand(CkanCommand): + '''Manage background jobs + + Usage: + + paster jobs worker [--burst] [QUEUES] + + Start a worker that fetches jobs from queues and executes + them. If no queue names are given then the worker listens + to the default queue, this is equivalent to + + paster jobs worker default + + If queue names are given then the worker listens to those + queues and only those: + + paster jobs worker my-custom-queue + + Hence, if you want the worker to listen to the default queue + and some others then you must list the default queue explicitly: + + paster jobs worker default my-custom-queue + + If the `--burst` option is given then the worker will exit + as soon as all its queues are empty. + + paster jobs list [QUEUES] + + List currently enqueued jobs from the given queues. If no queue + names are given then the jobs from all queues are listed. + + paster jobs show ID + + Show details about a specific job. + + paster jobs cancel ID + + Cancel a specific job. Jobs can only be canceled while they are + enqueued. Once a worker has started executing a job it cannot + be aborted anymore. + + paster jobs clear [QUEUES] + + Cancel all jobs on the given queues. If no queue names are + given then ALL queues are cleared. + + paster jobs test [QUEUES] + + Enqueue a test job. If no queue names are given then the job is + added to the default queue. If queue names are given then a + separate test job is added to each of the queues. + ''' + + summary = __doc__.split(u'\n')[0] + usage = __doc__ + min_args = 0 + + + def __init__(self, *args, **kwargs): + super(JobsCommand, self).__init__(*args, **kwargs) + try: + self.parser.add_option(u'--burst', action='store_true', + default=False, + help=u'Start worker in burst mode.') + except OptionConflictError: + # Option has already been added in previous call + pass + + def command(self): + self._load_config() + try: + cmd = self.args.pop(0) + except IndexError: + print(self.__doc__) + sys.exit(0) + if cmd == u'worker': + self.worker() + elif cmd == u'list': + self.list() + elif cmd == u'show': + self.show() + elif cmd == u'cancel': + self.cancel() + elif cmd == u'clear': + self.clear() + elif cmd == u'test': + self.test() + else: + error(u'Unknown command "{}"'.format(cmd)) + + def worker(self): + from ckan.lib.jobs import Worker + Worker(self.args).work(burst=self.options.burst) + + def list(self): + data_dict = { + u'queues': self.args, + } + jobs = p.toolkit.get_action(u'job_list')({}, data_dict) + for job in jobs: + if job[u'title'] is None: + job[u'title'] = '' + else: + job[u'title'] = u'"{}"'.format(job[u'title']) + print(u'{created} {id} {queue} {title}'.format(**job)) + + def show(self): + if not self.args: + error(u'You must specify a job ID') + id = self.args[0] + try: + job = p.toolkit.get_action(u'job_show')({}, {u'id': id}) + except logic.NotFound: + error(u'There is no job with ID "{}"'.format(id)) + print(u'ID: {}'.format(job[u'id'])) + if job[u'title'] is None: + title = u'None' + else: + title = u'"{}"'.format(job[u'title']) + print(u'Title: {}'.format(title)) + print(u'Created: {}'.format(job[u'created'])) + print(u'Queue: {}'.format(job[u'queue'])) + + def cancel(self): + if not self.args: + error(u'You must specify a job ID') + id = self.args[0] + try: + p.toolkit.get_action(u'job_cancel')({}, {u'id': id}) + except logic.NotFound: + error(u'There is no job with ID "{}"'.format(id)) + print(u'Cancelled job {}'.format(id)) + + def clear(self): + data_dict = { + u'queues': self.args, + } + queues = p.toolkit.get_action(u'job_clear')({}, data_dict) + queues = (u'"{}"'.format(q) for q in queues) + print(u'Cleared queue(s) {}'.format(u', '.join(queues))) + + def test(self): + from ckan.lib.jobs import DEFAULT_QUEUE_NAME, enqueue, test_job + for queue in (self.args or [DEFAULT_QUEUE_NAME]): + job = enqueue(test_job, [u'A test job'], title=u'A test job', queue=queue) + print(u'Added test job {} to queue "{}"'.format(job.id, queue)) diff --git a/ckan/lib/formatters.py b/ckan/lib/formatters.py index f731144c606..6c927a53399 100644 --- a/ckan/lib/formatters.py +++ b/ckan/lib/formatters.py @@ -99,12 +99,10 @@ def months_between(date1, date2): return months if not show_date: - now = datetime.datetime.utcnow() - if datetime_.tzinfo is not None: - now = now.replace(tzinfo=datetime_.tzinfo) - else: - now = now.replace(tzinfo=pytz.utc) + now = datetime.datetime.now(pytz.utc) + if datetime_.tzinfo is None: datetime_ = datetime_.replace(tzinfo=pytz.utc) + date_diff = now - datetime_ days = date_diff.days if days < 1 and now > datetime_: diff --git a/ckan/lib/helpers.py b/ckan/lib/helpers.py index fb90e5551c8..dabe561101b 100644 --- a/ckan/lib/helpers.py +++ b/ckan/lib/helpers.py @@ -26,8 +26,9 @@ from markdown import markdown from bleach import clean as clean_html from pylons import url as _pylons_default_url -from ckan.common import config -from routes import redirect_to as _redirect_to +from ckan.common import config, is_flask_request +from flask import redirect as _flask_redirect +from routes import redirect_to as _routes_redirect_to from routes import url_for as _routes_default_url_for import i18n @@ -140,7 +141,18 @@ def redirect_to(*args, **kw): ''' if are_there_flash_messages(): kw['__no_cache__'] = True - return _redirect_to(url_for(*args, **kw)) + + # Routes router doesn't like unicode args + uargs = map(lambda arg: str(arg) if isinstance(arg, unicode) else arg, + args) + _url = url_for(*uargs, **kw) + if _url.startswith('/'): + _url = str(config['ckan.site_url'].rstrip('/') + _url) + + if is_flask_request(): + return _flask_redirect(_url) + else: + return _routes_redirect_to(_url) @maintain.deprecated('h.url is deprecated please use h.url_for') diff --git a/ckan/lib/jobs.py b/ckan/lib/jobs.py new file mode 100644 index 00000000000..2f90ec57c77 --- /dev/null +++ b/ckan/lib/jobs.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python +# encoding: utf-8 + +u''' +Asynchronous background jobs. + +Note that most job management functions are not available from this +module but via the various ``job_*`` API functions. + +Internally, RQ queue names are prefixed with a string containing the +CKAN site ID to avoid key collisions when the same Redis database is +used for multiple CKAN instances. The functions of this module expect +unprefixed queue names (e.g. ``'default'``) unless noted otherwise. The +raw RQ objects (e.g. a queue returned by ``get_queue``) use the full, +prefixed names. Use the functions ``add_queue_name_prefix`` and +``remove_queue_name_prefix`` to manage queue name prefixes. + +.. versionadded:: 2.7 +''' + +import logging + +import rq +from rq.connections import push_connection +from rq.exceptions import NoSuchJobError +from rq.job import Job +from rq.utils import ensure_list + +from ckan.lib.redis import connect_to_redis +from ckan.common import config + + +log = logging.getLogger(__name__) + +DEFAULT_QUEUE_NAME = u'default' + +# RQ job queues. Do not use this directly, use ``get_queue`` instead. +_queues = {} + + +def _connect(): + u''' + Connect to Redis and tell RQ about it. + + Workaround for https://github.com/nvie/rq/issues/479. + ''' + conn = connect_to_redis() + push_connection(conn) + return conn + + +def _get_queue_name_prefix(): + u''' + Get the queue name prefix. + ''' + # This must be done at runtime since we need a loaded config + return u'ckan:{}:'.format(config[u'ckan.site_id']) + + +def add_queue_name_prefix(name): + u''' + Prefix a queue name. + + .. seealso:: :py:func:`remove_queue_name_prefix` + ''' + return _get_queue_name_prefix() + name + + +def remove_queue_name_prefix(name): + u''' + Remove a queue name's prefix. + + :raises ValueError: if the given name is not prefixed. + + .. seealso:: :py:func:`add_queue_name_prefix` + ''' + prefix = _get_queue_name_prefix() + if not name.startswith(prefix): + raise ValueError(u'Queue name "{}" is not prefixed.'.format(name)) + return name[len(prefix):] + + +def get_all_queues(): + u''' + Return all job queues currently in use. + + :returns: The queues. + :rtype: List of ``rq.queue.Queue`` instances + + .. seealso:: :py:func:`get_queue` + ''' + redis_conn = _connect() + prefix = _get_queue_name_prefix() + return [q for q in rq.Queue.all(connection=redis_conn) if + q.name.startswith(prefix)] + + +def get_queue(name=DEFAULT_QUEUE_NAME): + u''' + Get a job queue. + + The job queue is initialized if that hasn't happened before. + + :param string name: The name of the queue. If not given then the + default queue is returned. + + :returns: The job queue. + :rtype: ``rq.queue.Queue`` + + .. seealso:: :py:func:`get_all_queues` + ''' + global _queues + fullname = add_queue_name_prefix(name) + try: + return _queues[fullname] + except KeyError: + log.debug(u'Initializing background job queue "{}"'.format(name)) + redis_conn = _connect() + queue = _queues[fullname] = rq.Queue(fullname, connection=redis_conn) + return queue + + +def enqueue(fn, args=None, kwargs=None, title=None, queue=DEFAULT_QUEUE_NAME): + u''' + Enqueue a job to be run in the background. + + :param function fn: Function to be executed in the background + + :param list args: List of arguments to be passed to the function. + Pass an empty list if there are no arguments (default). + + :param dict kwargs: Dict of keyword arguments to be passed to the + function. Pass an empty dict if there are no keyword arguments + (default). + + :param string title: Optional human-readable title of the job. + + :param string queue: Name of the queue. If not given then the + default queue is used. + + :returns: The enqueued job. + :rtype: ``rq.job.Job`` + ''' + if args is None: + args = [] + if kwargs is None: + kwargs = {} + job = get_queue(queue).enqueue_call(func=fn, args=args, kwargs=kwargs) + job.meta[u'title'] = title + job.save() + msg = u'Added background job {}'.format(job.id) + if title: + msg = u'{} ("{}")'.format(msg, title) + msg = u'{} to queue "{}"'.format(msg, queue) + log.info(msg) + return job + + +def job_from_id(id): + u''' + Look up an enqueued job by its ID. + + :param string id: The ID of the job. + + :returns: The job. + :rtype: ``rq.job.Job`` + + :raises KeyError: if no job with that ID exists. + ''' + try: + return Job.fetch(id, connection=_connect()) + except NoSuchJobError: + raise KeyError(u'There is no job with ID "{}".'.format(id)) + + +def dictize_job(job): + u'''Convert a job to a dict. + + In contrast to ``rq.job.Job.to_dict`` this function includes only + the attributes that are relevant to our use case and promotes the + meta attributes that we use (e.g. ``title``). + + :param rq.job.Job job: The job to dictize. + + :returns: The dictized job. + :rtype: dict + ''' + return { + u'id': job.id, + u'title': job.meta.get(u'title'), + u'created': job.created_at.strftime(u'%Y-%m-%dT%H:%M:%S'), + u'queue': remove_queue_name_prefix(job.origin), + } + + +def test_job(*args): + u'''Test job. + + A test job for debugging purposes. Prints out any arguments it + receives. Can be scheduled via ``paster jobs test``. + ''' + print(args) + + +class Worker(rq.Worker): + u''' + CKAN-specific worker. + ''' + def __init__(self, queues=None, *args, **kwargs): + u''' + Constructor. + + Accepts the same arguments as the constructor of + ``rq.worker.Worker``. However, the behavior of the ``queues`` + parameter is different. + + :param queues: The job queue(s) to listen on. Can be a string + with the name of a single queue or a list of queue names. + If not given then the default queue is used. + ''' + queues = queues or [DEFAULT_QUEUE_NAME] + queues = [get_queue(q) for q in ensure_list(queues)] + rq.worker.logger.setLevel(logging.INFO) + super(Worker, self).__init__(queues, *args, **kwargs) + + def register_birth(self, *args, **kwargs): + result = super(Worker, self).register_birth(*args, **kwargs) + names = [remove_queue_name_prefix(n) for n in self.queue_names()] + names = u', '.join(u'"{}"'.format(n) for n in names) + log.info(u'Worker {} (PID {}) has started on queue(s) {} '.format( + self.key, self.pid, names)) + return result + + def execute_job(self, job, *args, **kwargs): + queue = remove_queue_name_prefix(job.origin) + log.info(u'Worker {} has started job {} from queue "{}"'.format( + self.key, job.id, queue)) + result = super(Worker, self).execute_job(job, *args, **kwargs) + log.info(u'Worker {} has finished job {} from queue "{}"'.format( + self.key, job.id, queue)) + return result + + def register_death(self, *args, **kwargs): + result = super(Worker, self).register_death(*args, **kwargs) + log.info(u'Worker {} (PID {}) has stopped'.format(self.key, self.pid)) + return result + + def handle_exception(self, job, *exc_info): + log.exception(u'Job {} on worker {} raised an exception: {}'.format( + job.id, self.key, exc_info[1])) + return super(Worker, self).handle_exception(job, *exc_info) diff --git a/ckan/lib/plugins.py b/ckan/lib/plugins.py index 0f0067bb901..dfb4df0c39f 100644 --- a/ckan/lib/plugins.py +++ b/ckan/lib/plugins.py @@ -243,6 +243,13 @@ def plugin_validate(plugin, context, data_dict, schema, action): return toolkit.navl_validate(data_dict, schema, context) +def get_permission_labels(): + '''Return the permission label plugin (or default implementation)''' + for plugin in plugins.PluginImplementations(plugins.IPermissionLabels): + return plugin + return DefaultPermissionLabels() + + class DefaultDatasetForm(object): '''The default implementation of :py:class:`~ckan.plugins.interfaces.IDatasetForm`. @@ -574,3 +581,32 @@ def i18n_domain(self): ckanext-{extension name}, hence your pot, po and mo files should be named ckanext-{extension name}.mo''' return 'ckanext-{name}'.format(name=self.name) + + +class DefaultPermissionLabels(object): + u''' + Default permissions for package_search/package_show: + - everyone can read public datasets "public" + - users can read their own drafts "creator-(user id)" + - users can read datasets belonging to their orgs "member-(org id)" + ''' + def get_dataset_labels(self, dataset_obj): + if dataset_obj.state == u'active' and not dataset_obj.private: + return [u'public'] + + if dataset_obj.owner_org: + return [u'member-%s' % dataset_obj.owner_org] + + return [u'creator-%s' % dataset_obj.creator_user_id] + + def get_user_dataset_labels(self, user_obj): + labels = [u'public'] + if not user_obj: + return labels + + labels.append(u'creator-%s' % user_obj.id) + + orgs = logic.get_action(u'organization_list_for_user')( + {u'user': user_obj.id}, {u'permission': u'read'}) + labels.extend(u'member-%s' % o[u'id'] for o in orgs) + return labels diff --git a/ckan/lib/redis.py b/ckan/lib/redis.py new file mode 100644 index 00000000000..0e1c5f80d9b --- /dev/null +++ b/ckan/lib/redis.py @@ -0,0 +1,64 @@ +# encoding: utf-8 + +u''' +Redis utilities. + +.. versionadded:: 2.7 +''' + +from __future__ import absolute_import + +import datetime +import logging + +from redis import ConnectionPool, Redis + +from ckan.common import config + + +log = logging.getLogger(__name__) + +REDIS_URL_SETTING_NAME = u'ckan.redis.url' + +REDIS_URL_DEFAULT_VALUE = u'redis://localhost:6379/0' + +# Redis connection pool. Do not use this directly, use ``connect_to_redis`` +# instead. +_connection_pool = None + + +def connect_to_redis(): + u''' + (Lazily) connect to Redis. + + The connection is set up but not actually established. The latter + happens automatically once the connection is used. + + :returns: A lazy Redis connection. + :rtype: ``redis.Redis`` + + .. seealso:: :py:func:`is_redis_available` + ''' + global _connection_pool + if _connection_pool is None: + url = config.get(REDIS_URL_SETTING_NAME, REDIS_URL_DEFAULT_VALUE) + log.debug(u'Using Redis at {}'.format(url)) + _connection_pool = ConnectionPool.from_url(url) + return Redis(connection_pool=_connection_pool) + + +def is_redis_available(): + u''' + Check whether Redis is available. + + :returns: The availability of Redis. + :rtype: boolean + + .. seealso:: :py:func:`connect_to_redis` + ''' + redis_conn = connect_to_redis() + try: + return redis_conn.ping() + except Exception: + log.exception(u'Redis is not available') + return False diff --git a/ckan/lib/search/__init__.py b/ckan/lib/search/__init__.py index 398158d6bc2..287a216ead0 100644 --- a/ckan/lib/search/__init__.py +++ b/ckan/lib/search/__init__.py @@ -31,7 +31,7 @@ def text_traceback(): return res -SUPPORTED_SCHEMA_VERSIONS = ['2.3'] +SUPPORTED_SCHEMA_VERSIONS = ['2.7'] DEFAULT_OPTIONS = { 'limit': 20, diff --git a/ckan/lib/search/index.py b/ckan/lib/search/index.py index 878a5fa3a8c..937dbefeeff 100644 --- a/ckan/lib/search/index.py +++ b/ckan/lib/search/index.py @@ -279,6 +279,13 @@ def index_package(self, pkg_dict, defer_commit=False): assert pkg_dict, 'Plugin must return non empty package dict on index' + # permission labels determine visibility in search, can't be set + # in original dataset or before_index plugins + labels = lib_plugins.get_permission_labels() + dataset = model.Package.get(pkg_dict['id']) + pkg_dict['permission_labels'] = labels.get_dataset_labels( + dataset) if dataset else [] # TestPackageSearchIndex-workaround + # send to solr: try: conn = make_connection() diff --git a/ckan/lib/search/query.py b/ckan/lib/search/query.py index c136b294ca4..6ec64588cab 100644 --- a/ckan/lib/search/query.py +++ b/ckan/lib/search/query.py @@ -280,12 +280,17 @@ def get_index(self,reference): return solr_response.docs[0] - def run(self, query): + def run(self, query, permission_labels=None, **kwargs): ''' Performs a dataset search using the given query. - @param query - dictionary with keys like: q, fq, sort, rows, facet - @return - dictionary with keys results and count + :param query: dictionary with keys like: q, fq, sort, rows, facet + :type query: dict + :param permission_labels: filter results to those that include at + least one of these labels. None to not filter (return everything) + :type permission_labels: list of unicode strings; or None + + :returns: dictionary with keys results and count May raise SearchQueryError or SearchError. ''' @@ -310,18 +315,23 @@ def run(self, query): rows_to_query = rows_to_return query['rows'] = rows_to_query + fq = [] + if 'fq' in query: + fq.append(query['fq']) + fq.extend(query.get('fq_list', [])) + # show only results from this CKAN instance - fq = query.get('fq', '') - if not '+site_id:' in fq: - fq += ' +site_id:"%s"' % config.get('ckan.site_id') + fq.append('+site_id:%s' % solr_literal(config.get('ckan.site_id'))) # filter for package status - if not '+state:' in fq: - fq += " +state:active" - query['fq'] = [fq] + if not '+state:' in query.get('fq', ''): + fq.append('+state:active') - fq_list = query.get('fq_list', []) - query['fq'].extend(fq_list) + # only return things we should be able to see + if permission_labels is not None: + fq.append('+permission_labels:(%s)' % ' OR '.join( + solr_literal(p) for p in permission_labels)) + query['fq'] = fq # faceting query['facet'] = query.get('facet', 'true') @@ -387,3 +397,13 @@ def run(self, query): self.facets[field] = dict(zip(values[0::2], values[1::2])) return {'results': self.results, 'count': self.count} + + +def solr_literal(t): + ''' + return a safe literal string for a solr query. Instead of escaping + each of + - && || ! ( ) { } [ ] ^ " ~ * ? : \ / we're just dropping + double quotes -- this method currently only used by tokens like site_id + and permission labels. + ''' + return u'"' + t.replace(u'"', u'') + u'"' diff --git a/ckan/logic/action/delete.py b/ckan/logic/action/delete.py index fe4300a2ef7..e4529d5ff0a 100644 --- a/ckan/logic/action/delete.py +++ b/ckan/logic/action/delete.py @@ -2,8 +2,11 @@ '''API functions for deleting data from CKAN.''' +import logging + import sqlalchemy as sqla +import ckan.lib.jobs as jobs import ckan.logic import ckan.logic.action import ckan.plugins as plugins @@ -12,6 +15,9 @@ from ckan.common import _ + +log = logging.getLogger('ckan.logic') + validate = ckan.lib.navl.dictization_functions.validate # Define some shortcuts @@ -701,3 +707,48 @@ def unfollow_group(context, data_dict): ckan.logic.schema.default_follow_group_schema()) _unfollow(context, data_dict, schema, context['model'].UserFollowingGroup) + + +@ckan.logic.validate(ckan.logic.schema.job_clear_schema) +def job_clear(context, data_dict): + '''Clear background job queues. + + Does not affect jobs that are already being processed. + + :param list queues: The queues to clear. If not given then ALL + queues are cleared. + + :returns: The cleared queues. + :rtype: list + + .. versionadded:: 2.7 + ''' + _check_access(u'job_clear', context, data_dict) + queues = data_dict.get(u'queues') + if queues: + queues = [jobs.get_queue(q) for q in queues] + else: + queues = jobs.get_all_queues() + names = [jobs.remove_queue_name_prefix(queue.name) for queue in queues] + for queue, name in zip(queues, names): + queue.empty() + log.info(u'Cleared background job queue "{}"'.format(name)) + return names + + +def job_cancel(context, data_dict): + '''Cancel a queued background job. + + Removes the job from the queue and deletes it. + + :param string id: The ID of the background job. + + .. versionadded:: 2.7 + ''' + _check_access(u'job_cancel', context, data_dict) + id = _get_or_bust(data_dict, u'id') + try: + jobs.job_from_id(id).delete() + log.info(u'Cancelled background job {}'.format(id)) + except KeyError: + raise NotFound diff --git a/ckan/logic/action/get.py b/ckan/logic/action/get.py index 8d375e0f593..5f583a496fa 100644 --- a/ckan/logic/action/get.py +++ b/ckan/logic/action/get.py @@ -17,6 +17,7 @@ import ckan.logic.action import ckan.logic.schema import ckan.lib.dictization.model_dictize as model_dictize +import ckan.lib.jobs as jobs import ckan.lib.navl.dictization_functions import ckan.model as model import ckan.model.misc as misc @@ -1113,6 +1114,8 @@ def resource_view_list(context, data_dict): def resource_status_show(context, data_dict): '''Return the statuses of a resource's tasks. + This function is DEPRECATED. + :param id: the id of the resource :type id: string @@ -1827,52 +1830,27 @@ def package_search(context, data_dict): # return a list of package ids data_dict['fl'] = 'id {0}'.format(data_source) - # we should remove any mention of capacity from the fq and - # instead set it to only retrieve public datasets - fq = data_dict.get('fq', '') - # Remove before these hit solr FIXME: whitelist instead include_private = asbool(data_dict.pop('include_private', False)) include_drafts = asbool(data_dict.pop('include_drafts', False)) - - capacity_fq = 'capacity:"public"' - if include_private and authz.is_sysadmin(user): - capacity_fq = None - elif include_private and user: - orgs = logic.get_action('organization_list_for_user')( - {'user': user}, {'permission': 'read'}) - if orgs: - capacity_fq = '({0} OR owner_org:({1}))'.format( - capacity_fq, - ' OR '.join(org['id'] for org in orgs)) - if include_drafts: - capacity_fq = '({0} OR creator_user_id:({1}))'.format( - capacity_fq, - authz.get_user_id_for_username(user)) - - if capacity_fq: - fq = ' '.join(p for p in fq.split() if 'capacity:' not in p) - data_dict['fq'] = fq + ' ' + capacity_fq - - fq = data_dict.get('fq', '') + data_dict.setdefault('fq', '') + if not include_private: + data_dict['fq'] += ' +capacity:public' if include_drafts: - user_id = authz.get_user_id_for_username(user, allow_none=True) - if authz.is_sysadmin(user): - data_dict['fq'] = fq + ' +state:(active OR draft)' - elif user_id: - # Query to return all active datasets, and all draft datasets - # for this user. - data_dict['fq'] = fq + \ - ' ((creator_user_id:{0} AND +state:(draft OR active))' \ - ' OR state:active)'.format(user_id) - elif not authz.is_sysadmin(user): - data_dict['fq'] = fq + ' +state:active' + data_dict['fq'] += ' +state:(active OR draft)' # Pop these ones as Solr does not need them extras = data_dict.pop('extras', None) + # enforce permission filter based on user + if context.get('ignore_auth') or (user and authz.is_sysadmin(user)): + labels = None + else: + labels = lib_plugins.get_permission_labels( + ).get_user_dataset_labels(context['auth_user_obj']) + query = search.query_for(model.Package) - query.run(data_dict) + query.run(data_dict, permission_labels=labels) # Add them back so extensions can use them on after_search data_dict['extras'] = extras @@ -3500,3 +3478,46 @@ def config_option_list(context, data_dict): schema = ckan.logic.schema.update_configuration_schema() return schema.keys() + + +@logic.validate(logic.schema.job_list_schema) +def job_list(context, data_dict): + '''List enqueued background jobs. + + :param list queues: Queues to list jobs from. If not given then the + jobs from all queues are listed. + + :returns: The currently enqueued background jobs. + :rtype: list + + .. versionadded:: 2.7 + ''' + _check_access(u'job_list', context, data_dict) + dictized_jobs = [] + queues = data_dict.get(u'queues') + if queues: + queues = [jobs.get_queue(q) for q in queues] + else: + queues = jobs.get_all_queues() + for queue in queues: + for job in queue.jobs: + dictized_jobs.append(jobs.dictize_job(job)) + return dictized_jobs + + +def job_show(context, data_dict): + '''Show details for a background job. + + :param string id: The ID of the background job. + + :returns: Details about the background job. + :rtype: dict + + .. versionadded:: 2.7 + ''' + _check_access(u'job_show', context, data_dict) + id = _get_or_bust(data_dict, u'id') + try: + return jobs.dictize_job(jobs.job_from_id(id)) + except KeyError: + raise NotFound diff --git a/ckan/logic/auth/delete.py b/ckan/logic/auth/delete.py index ff1a52044bc..4efd48300ad 100644 --- a/ckan/logic/auth/delete.py +++ b/ckan/logic/auth/delete.py @@ -137,3 +137,13 @@ def organization_member_delete(context, data_dict): def member_delete(context, data_dict): return authz.is_authorized('member_create', context, data_dict) + + +def job_clear(context, data_dict): + '''Clear background jobs. Only sysadmins.''' + return {'success': False} + + +def job_cancel(context, data_dict): + '''Cancel a background job. Only sysadmins.''' + return {'success': False} diff --git a/ckan/logic/auth/get.py b/ckan/logic/auth/get.py index 011b19c2e84..8a12d7fdaf3 100644 --- a/ckan/logic/auth/get.py +++ b/ckan/logic/auth/get.py @@ -5,6 +5,7 @@ from ckan.lib.base import _ from ckan.logic.auth import (get_package_object, get_group_object, get_resource_object) +from ckan.lib.plugins import get_permission_labels def sysadmin(context, data_dict): @@ -112,22 +113,15 @@ def package_relationships_list(context, data_dict): def package_show(context, data_dict): user = context.get('user') package = get_package_object(context, data_dict) - # draft state indicates package is still in the creation process - # so we need to check we have creation rights. - if package.state.startswith('draft'): - auth = authz.is_authorized('package_update', - context, data_dict) - authorized = auth.get('success') - elif package.owner_org is None and package.state == 'active': - return {'success': True} - else: - # anyone can see a public package - if not package.private and package.state == 'active': - return {'success': True} - authorized = authz.has_user_permission_for_group_or_org( - package.owner_org, user, 'read') + labels = get_permission_labels() + user_labels = labels.get_user_dataset_labels(context['auth_user_obj']) + authorized = any( + dl in user_labels for dl in labels.get_dataset_labels(package)) + if not authorized: - return {'success': False, 'msg': _('User %s not authorized to read package %s') % (user, package.id)} + return { + 'success': False, + 'msg': _('User %s not authorized to read package %s') % (user, package.id)} else: return {'success': True} @@ -343,3 +337,13 @@ def config_option_show(context, data_dict): def config_option_list(context, data_dict): '''List runtime-editable configuration options. Only sysadmins.''' return {'success': False} + + +def job_list(context, data_dict): + '''List background jobs. Only sysadmins.''' + return {'success': False} + + +def job_show(context, data_dict): + '''Show background job. Only sysadmins.''' + return {'success': False} diff --git a/ckan/logic/schema.py b/ckan/logic/schema.py index 78ac4ed5a40..259c74cdc8c 100644 --- a/ckan/logic/schema.py +++ b/ckan/logic/schema.py @@ -701,3 +701,15 @@ def update_configuration_schema(): schema = plugin.update_config_schema(schema) return schema + + +def job_list_schema(): + return { + u'queues': [ignore_missing, list_of_strings], + } + + +def job_clear_schema(): + return { + u'queues': [ignore_missing, list_of_strings], + } diff --git a/ckan/migration/versions/085_adjust_activity_timestamps.py b/ckan/migration/versions/085_adjust_activity_timestamps.py new file mode 100644 index 00000000000..ddcea2ece31 --- /dev/null +++ b/ckan/migration/versions/085_adjust_activity_timestamps.py @@ -0,0 +1,23 @@ +# encoding: utf-8 + +import datetime + + +def upgrade(migrate_engine): + u""" + The script assumes that the current timestamp was + recorded with the server's current set timezone + """ + # choose a fixed date (within DST) so migration depends only on + # server time zone not the current daylight savings state + magic_timestamp = datetime.datetime(2016, 6, 20).toordinal() + + utc_date = datetime.datetime.utcfromtimestamp(magic_timestamp) + local_date = datetime.datetime.fromtimestamp(magic_timestamp) + + if utc_date == local_date: + return + + with migrate_engine.begin() as connection: + sql = u"update activity set timestamp = timestamp + (%s - %s);" + connection.execute(sql, utc_date, local_date) diff --git a/ckan/model/activity.py b/ckan/model/activity.py index cfd6d02e2a5..fdfa52660cc 100644 --- a/ckan/model/activity.py +++ b/ckan/model/activity.py @@ -40,7 +40,7 @@ class Activity(domain_object.DomainObject): def __init__(self, user_id, object_id, revision_id, activity_type, data=None): self.id = _types.make_uuid() - self.timestamp = datetime.datetime.now() + self.timestamp = datetime.datetime.utcnow() self.user_id = user_id self.object_id = object_id self.revision_id = revision_id diff --git a/ckan/plugins/interfaces.py b/ckan/plugins/interfaces.py index 2651c394a87..aa4c3c4fce4 100644 --- a/ckan/plugins/interfaces.py +++ b/ckan/plugins/interfaces.py @@ -25,7 +25,8 @@ 'IFacets', 'IAuthenticator', 'ITranslation', - 'IUploader' + 'IUploader', + 'IPermissionLabels', ] from inspect import isclass @@ -1568,3 +1569,43 @@ def get_resource_uploader(self): :type id: string ''' + + +class IPermissionLabels(Interface): + ''' + Extensions implementing this interface can override the permission + labels applied to datasets to precisely control which datasets are + visible to each user. + + Implementations might want to consider mixing in + ``ckan.lib.plugins.DefaultPermissionLabels`` which provides + default behaviours for these methods. + + See ``ckanext/example_ipermissionlabels`` for an example plugin. + ''' + + def get_dataset_labels(self, dataset_obj): + ''' + Return a list of unicode strings to be stored in the search index + as the permission lables for a dataset dict. + + :param dataset_obj: dataset details + :type dataset_obj: Package model object + + :returns: permission labels + :rtype: list of unicode strings + ''' + + def get_user_dataset_labels(self, user_obj): + ''' + Return the permission labels that give a user permission to view + a dataset. If any of the labels returned from this method match + any of the labels returned from :py:meth:`.get_dataset_labels` + then this user is permitted to view that dataset. + + :param user_obj: user details + :type user_obj: User model object or None + + :returns: permission labels + :rtype: list of unicode strings + ''' diff --git a/ckan/plugins/toolkit.py b/ckan/plugins/toolkit.py index 899763806ea..ac475ed5811 100644 --- a/ckan/plugins/toolkit.py +++ b/ckan/plugins/toolkit.py @@ -96,6 +96,8 @@ class _Toolkit(object): 'auth_disallow_anonymous_access', # Helper not found error. 'HelperError', + # Enqueue background job + 'enqueue_job', # Fully defined in this file ## 'add_template_directory', @@ -134,6 +136,7 @@ def _initialize(self): CkanVersionException, HelperError ) + from ckan.lib.jobs import enqueue as enqueue_job from paste.deploy import converters import pylons @@ -271,6 +274,7 @@ def _initialize(self): t['check_ckan_version'] = self._check_ckan_version t['CkanVersionException'] = CkanVersionException t['HelperError'] = HelperError + t['enqueue_job'] = enqueue_job # check contents list correct errors = set(t).symmetric_difference(set(self.contents)) diff --git a/ckan/tests/controllers/test_tags.py b/ckan/tests/controllers/test_tags.py index 77d7e083311..46b4b255b6f 100644 --- a/ckan/tests/controllers/test_tags.py +++ b/ckan/tests/controllers/test_tags.py @@ -124,7 +124,7 @@ def test_tag_read_redirects_to_dataset_search(self): tag_url = url_for(controller='tag', action='read', id='find-me') tag_response = app.get(tag_url, status=302) assert_equal(tag_response.headers['Location'], - 'http://localhost/dataset?tags=find-me') + 'http://test.ckan.net/dataset?tags=find-me') def test_tag_read_not_found(self): '''Attempting access to non-existing tag returns a 404''' diff --git a/ckan/tests/controllers/test_util.py b/ckan/tests/controllers/test_util.py index adb02e03657..db76196e677 100644 --- a/ckan/tests/controllers/test_util.py +++ b/ckan/tests/controllers/test_util.py @@ -18,7 +18,7 @@ def test_redirect_ok(self): status=302, ) assert_equal(response.headers.get('Location'), - 'http://localhost/dataset') + 'http://test.ckan.net/dataset') def test_redirect_external(self): app = self._get_test_app() diff --git a/ckan/tests/helpers.py b/ckan/tests/helpers.py index bfda83a1a1e..5f8e61dba0a 100644 --- a/ckan/tests/helpers.py +++ b/ckan/tests/helpers.py @@ -19,12 +19,24 @@ This module is reserved for these very useful functions. ''' + +import collections +import contextlib +import errno +import functools +import logging +import os +import re + import webtest import nose.tools from nose.tools import assert_in, assert_not_in import mock +import rq from ckan.common import config +import ckan.lib.jobs as jobs +from ckan.lib.redis import connect_to_redis import ckan.lib.search as search import ckan.config.middleware import ckan.model as model @@ -207,6 +219,49 @@ def teardown_class(cls): config.update(cls._original_config) +class RQTestBase(object): + ''' + Base class for tests of RQ functionality. + ''' + def setup(self): + u''' + Delete all RQ queues and jobs. + ''' + # See https://github.com/nvie/rq/issues/731 + redis_conn = connect_to_redis() + for queue in rq.Queue.all(connection=redis_conn): + queue.empty() + redis_conn.srem(rq.Queue.redis_queues_keys, queue._key) + redis_conn.delete(queue._key) + + def all_jobs(self): + u''' + Get a list of all RQ jobs. + ''' + jobs = [] + redis_conn = connect_to_redis() + for queue in rq.Queue.all(connection=redis_conn): + jobs.extend(queue.jobs) + return jobs + + def enqueue(self, job=None, *args, **kwargs): + u''' + Enqueue a test job. + ''' + if job is None: + job = jobs.test_job + return jobs.enqueue(job, *args, **kwargs) + + +class FunctionalRQTestBase(FunctionalTestBase, RQTestBase): + ''' + Base class for functional tests of RQ functionality. + ''' + def setup(self): + FunctionalTestBase.setup(self) + RQTestBase.setup(self) + + def submit_and_follow(app, form, extra_environ=None, name=None, value=None, **args): ''' @@ -316,21 +371,41 @@ def test_ckan_site_title(self): :param value: the new config key's value, e.g. ``'My Test CKAN'`` :type value: string + + .. seealso:: The context manager :py:func:`changed_config` ''' def decorator(func): + @functools.wraps(func) def wrapper(*args, **kwargs): - _original_config = config.copy() - config[key] = value + with changed_config(key, value): + return func(*args, **kwargs) + return wrapper + return decorator - try: - return_value = func(*args, **kwargs) - finally: - config.clear() - config.update(_original_config) - return return_value - return nose.tools.make_decorator(func)(wrapper) - return decorator +@contextlib.contextmanager +def changed_config(key, value): + ''' + Context manager for temporarily changing a config value. + + Allows you to temporarily change the value of a CKAN configuration + option. The original value is restored once the context manager is + left. + + Usage:: + + with changed_config(u'ckan.site_title', u'My Test CKAN'): + assert config[u'ckan.site_title'] == u'My Test CKAN' + + .. seealso:: The decorator :py:func:`change_config` + ''' + _original_config = config.copy() + config[key] = value + try: + yield + finally: + config.clear() + config.update(_original_config) def mock_auth(auth_function_path): @@ -468,3 +543,128 @@ def wrapper(*args, **kwargs): return return_value return nose.tools.make_decorator(func)(wrapper) return decorator + + +@contextlib.contextmanager +def recorded_logs(logger=None, level=logging.DEBUG, + override_disabled=True, override_global_level=True): + u''' + Context manager for recording log messages. + + :param logger: The logger to record messages from. Can either be a + :py:class:`logging.Logger` instance or a string with the + logger's name. Defaults to the root logger. + + :param int level: Temporary log level for the target logger while + the context manager is active. Pass ``None`` if you don't want + the level to be changed. The level is automatically reset to its + original value when the context manager is left. + + :param bool override_disabled: A logger can be disabled by setting + its ``disabled`` attribute. By default, this context manager + sets that attribute to ``False`` at the beginning of its + execution and resets it when the context manager is left. Set + ``override_disabled`` to ``False`` to keep the current value + of the attribute. + + :param bool override_global_level: The ``logging.disable`` function + allows one to install a global minimum log level that takes + precedence over a logger's own level. By default, this context + manager makes sure that the global limit is at most ``level``, + and reduces it if necessary during its execution. Set + ``override_global_level`` to ``False`` to keep the global limit. + + :returns: A recording log handler that listens to ``logger`` during + the execution of the context manager. + :rtype: :py:class:`RecordingLogHandler` + + Example:: + + import logging + + logger = logging.getLogger(__name__) + + with recorded_logs(logger) as logs: + logger.info(u'Hello, world!') + + logs.assert_log(u'info', u'world') + ''' + if logger is None: + logger = logging.getLogger() + elif not isinstance(logger, logging.Logger): + logger = logging.getLogger(logger) + handler = RecordingLogHandler() + old_level = logger.level + manager_level = logger.manager.disable + disabled = logger.disabled + logger.addHandler(handler) + try: + if level is not None: + logger.setLevel(level) + if override_disabled: + logger.disabled = False + if override_global_level: + if (level is None) and (manager_level > old_level): + logger.manager.disable = old_level + elif (level is not None) and (manager_level > level): + logger.manager.disable = level + yield handler + finally: + logger.handlers.remove(handler) + logger.setLevel(old_level) + logger.disabled = disabled + logger.manager.disable = manager_level + + +class RecordingLogHandler(logging.Handler): + u''' + Log handler that records log messages for later inspection. + + You can inspect the recorded messages via the ``messages`` attribute + (a dict that maps log levels to lists of messages) or by using + ``assert_log``. + + This class is rarely useful on its own, instead use + :py:func:`recorded_logs` to temporarily record log messages. + ''' + def __init__(self, *args, **kwargs): + super(RecordingLogHandler, self).__init__(*args, **kwargs) + self.clear() + + def emit(self, record): + self.messages[record.levelname.lower()].append(record.getMessage()) + + def assert_log(self, level, pattern, msg=None): + u''' + Assert that a certain message has been logged. + + :param string pattern: A regex which the message has to match. + The match is done using ``re.search``. + + :param string level: The message level (``'debug'``, ...). + + :param string msg: Optional failure message in case the expected + log message was not logged. + + :raises AssertionError: If the expected message was not logged. + ''' + compiled_pattern = re.compile(pattern) + for log_msg in self.messages[level]: + if compiled_pattern.search(log_msg): + return + if not msg: + if self.messages[level]: + lines = u'\n '.join(self.messages[level]) + msg = (u'Pattern "{}" was not found in the log messages for ' + + u'level "{}":\n {}').format(pattern, level, lines) + else: + msg = (u'Pattern "{}" was not found in the log messages for ' + + u'level "{}" (no messages were recorded for that ' + + u'level).').format(pattern, level) + raise AssertionError(msg) + + def clear(self): + u''' + Clear all captured log messages. + ''' + self.messages = collections.defaultdict(list) diff --git a/ckan/tests/legacy/functional/test_user.py b/ckan/tests/legacy/functional/test_user.py index f61b8be35c7..d432e0789ac 100644 --- a/ckan/tests/legacy/functional/test_user.py +++ b/ckan/tests/legacy/functional/test_user.py @@ -1,6 +1,6 @@ # encoding: utf-8 -from routes import url_for +from ckan.lib.helpers import url_for from nose.tools import assert_equal from ckan.common import config import hashlib diff --git a/ckan/tests/lib/test_cli.py b/ckan/tests/lib/test_cli.py index c8ddd77222d..e94664687d4 100644 --- a/ckan/tests/lib/test_cli.py +++ b/ckan/tests/lib/test_cli.py @@ -1,22 +1,78 @@ # encoding: utf-8 +import datetime import logging +import os +import os.path +from StringIO import StringIO +import sys +import tempfile -from nose.tools import assert_raises +from nose.tools import (assert_raises, eq_ as eq, ok_ as ok, assert_in, + assert_not_in, assert_not_equal as neq, assert_false as nok) +from paste.script.command import run -from ckan.lib.cli import UserCmd +import ckan.lib.cli as cli +import ckan.lib.jobs as jobs import ckan.tests.helpers as helpers +from ckan.common import config log = logging.getLogger(__name__) +def paster(*args, **kwargs): + ''' + Call a paster command. + + All arguments are parsed and passed on to the command. The + ``--config`` option is automatically appended. + + By default, an ``AssertionError`` is raised if the command exits + with a non-zero return code or if anything is written to STDERR. + Pass ``fail_on_error=False`` to disable this behavior. + + Example:: + + code, stdout, stderr = paster(u'jobs', u'list') + assert u'My Job Title' in stdout + + code, stdout, stderr = paster(u'jobs', u'foobar', + fail_on_error=False) + assert code == 1 + assert u'Unknown command' in stderr + + Any ``SystemExit`` raised by the command is swallowed. + + :returns: A tuple containing the return code, the content of + STDOUT, and the content of STDERR. + ''' + fail_on_error = kwargs.pop(u'fail_on_error', True) + args = list(args) + [u'--config=' + config[u'__file__']] + sys.stdout, sys.stderr = StringIO(u''), StringIO(u'') + code = 0 + try: + run(args) + except SystemExit as e: + code = e.code + finally: + stdout, stderr = sys.stdout.getvalue(), sys.stderr.getvalue() + sys.stdout, sys.stderr = sys.__stdout__, sys.__stderr__ + if code != 0 and fail_on_error: + raise AssertionError(u'Paster command exited with non-zero ' + + u'return code {}: {}'.format(code, stderr)) + if stderr.strip() and fail_on_error: + raise AssertionError(u'Paster command wrote to STDERR: {}'.format( + stderr)) + return code, stdout, stderr + + class TestUserAdd(object): '''Tests for UserCmd.add''' @classmethod def setup_class(cls): - cls.user_cmd = UserCmd('user-command') + cls.user_cmd = cli.UserCmd('user-command') def setup(self): helpers.reset_db() @@ -70,3 +126,220 @@ def test_cli_user_add_unicode_fullname_system_exit(self): self.user_cmd.add() except SystemExit: assert False, "SystemExit exception shouldn't be raised" + + +class TestJobsUnknown(helpers.RQTestBase): + ''' + Test unknown sub-command for ``paster jobs``. + ''' + def test_unknown_command(self): + ''' + Test error handling for unknown ``paster jobs`` sub-command. + ''' + code, stdout, stderr = paster(u'jobs', u'does-not-exist', + fail_on_error=False) + neq(code, 0) + assert_in(u'Unknown command', stderr) + + +class TestJobsList(helpers.RQTestBase): + ''' + Tests for ``paster jobs list``. + ''' + def test_list_default_queue(self): + ''' + Test output of ``jobs list`` for default queue. + ''' + job = self.enqueue() + stdout = paster(u'jobs', u'list')[1] + fields = stdout.split() + eq(len(fields), 3) + dt = datetime.datetime.strptime(fields[0], u'%Y-%m-%dT%H:%M:%S') + now = datetime.datetime.utcnow() + ok(abs((now - dt).total_seconds()) < 10) + eq(fields[1], job.id) + eq(fields[2], jobs.DEFAULT_QUEUE_NAME) + + def test_list_other_queue(self): + ''' + Test output of ``jobs.list`` for non-default queue. + ''' + job = self.enqueue(queue=u'my_queue') + stdout = paster(u'jobs', u'list')[1] + fields = stdout.split() + eq(len(fields), 3) + eq(fields[2], u'my_queue') + + def test_list_title(self): + ''' + Test title output of ``jobs list``. + ''' + job = self.enqueue(title=u'My_Title') + stdout = paster(u'jobs', u'list')[1] + fields = stdout.split() + eq(len(fields), 4) + eq(fields[3], u'"My_Title"') + + def test_list_filter(self): + ''' + Test filtering by queues for ``jobs list``. + ''' + job1 = self.enqueue(queue=u'q1') + job2 = self.enqueue(queue=u'q2') + job3 = self.enqueue(queue=u'q3') + stdout = paster(u'jobs', u'list', u'q1', u'q2')[1] + assert_in(u'q1', stdout) + assert_in(u'q2', stdout) + assert_not_in(u'q3', stdout) + + +class TestJobShow(helpers.RQTestBase): + ''' + Tests for ``paster jobs show``. + ''' + def test_show_existing(self): + ''' + Test ``jobs show`` for an existing job. + ''' + job = self.enqueue(queue=u'my_queue', title=u'My Title') + stdout = paster(u'jobs', u'show', job.id)[1] + assert_in(job.id, stdout) + assert_in(jobs.remove_queue_name_prefix(job.origin), stdout) + + def test_show_missing_id(self): + ''' + Test ``jobs show`` with a missing ID. + ''' + code, stdout, stderr = paster(u'jobs', u'show', fail_on_error=False) + neq(code, 0) + ok(stderr) + + +class TestJobsCancel(helpers.RQTestBase): + ''' + Tests for ``paster jobs cancel``. + ''' + def test_cancel_existing(self): + ''' + Test ``jobs cancel`` for an existing job. + ''' + job1 = self.enqueue() + job2 = self.enqueue() + stdout = paster(u'jobs', u'cancel', job1.id)[1] + all_jobs = self.all_jobs() + eq(len(all_jobs), 1) + eq(all_jobs[0].id, job2.id) + assert_in(job1.id, stdout) + + def test_cancel_not_existing(self): + ''' + Test ``jobs cancel`` for a not existing job. + ''' + code, stdout, stderr = paster(u'jobs', u'cancel', u'does-not-exist', + fail_on_error=False) + neq(code, 0) + assert_in(u'does-not-exist', stderr) + + def test_cancel_missing_id(self): + ''' + Test ``jobs cancel`` with a missing ID. + ''' + code, stdout, stderr = paster(u'jobs', u'cancel', fail_on_error=False) + neq(code, 0) + ok(stderr) + + +class TestJobsClear(helpers.RQTestBase): + ''' + Tests for ``paster jobs clear``. + ''' + def test_clear_all_queues(self): + ''' + Test clearing all queues via ``jobs clear``. + ''' + self.enqueue() + self.enqueue() + self.enqueue(queue=u'q1') + self.enqueue(queue=u'q2') + stdout = paster(u'jobs', u'clear')[1] + assert_in(jobs.DEFAULT_QUEUE_NAME, stdout) + assert_in(u'q1', stdout) + assert_in(u'q2', stdout) + eq(self.all_jobs(), []) + + def test_clear_specific_queues(self): + ''' + Test clearing specific queues via ``jobs clear``. + ''' + job1 = self.enqueue() + job2 = self.enqueue(queue=u'q1') + self.enqueue(queue=u'q2') + self.enqueue(queue=u'q2') + self.enqueue(queue=u'q3') + stdout = paster(u'jobs', u'clear', u'q2', u'q3')[1] + assert_in(u'q2', stdout) + assert_in(u'q3', stdout) + assert_not_in(jobs.DEFAULT_QUEUE_NAME, stdout) + assert_not_in(u'q1', stdout) + all_jobs = self.all_jobs() + eq(set(all_jobs), {job1, job2}) + + +class TestJobsTest(helpers.RQTestBase): + ''' + Tests for ``paster jobs test``. + ''' + def test_test_default_queue(self): + ''' + Test ``jobs test`` for the default queue. + ''' + stdout = paster(u'jobs', u'test')[1] + all_jobs = self.all_jobs() + eq(len(all_jobs), 1) + eq(jobs.remove_queue_name_prefix(all_jobs[0].origin), + jobs.DEFAULT_QUEUE_NAME) + + def test_test_specific_queues(self): + ''' + Test ``jobs test`` for specific queues. + ''' + stdout = paster(u'jobs', u'test', u'q1', u'q2')[1] + all_jobs = self.all_jobs() + eq(len(all_jobs), 2) + eq({jobs.remove_queue_name_prefix(j.origin) for j in all_jobs}, + {u'q1', u'q2'}) + + +class TestJobsWorker(helpers.RQTestBase): + ''' + Tests for ``paster jobs worker``. + ''' + # All tests of ``jobs worker`` must use the ``--burst`` option to + # make sure that the worker exits. + + def test_worker_default_queue(self): + ''' + Test ``jobs worker`` with the default queue. + ''' + with tempfile.NamedTemporaryFile(delete=False) as f: + self.enqueue(os.remove, args=[f.name]) + paster(u'jobs', u'worker', u'--burst') + all_jobs = self.all_jobs() + eq(all_jobs, []) + nok(os.path.isfile(f.name)) + + def test_worker_specific_queues(self): + ''' + Test ``jobs worker`` with specific queues. + ''' + with tempfile.NamedTemporaryFile(delete=False) as f: + with tempfile.NamedTemporaryFile(delete=False) as g: + job1 = self.enqueue() + job2 = self.enqueue(queue=u'q2') + self.enqueue(os.remove, args=[f.name], queue=u'q3') + self.enqueue(os.remove, args=[g.name], queue=u'q4') + paster(u'jobs', u'worker', u'--burst', u'q3', u'q4') + all_jobs = self.all_jobs() + eq(set(all_jobs), {job1, job2}) + nok(os.path.isfile(f.name)) + nok(os.path.isfile(g.name)) diff --git a/ckan/tests/lib/test_jobs.py b/ckan/tests/lib/test_jobs.py new file mode 100644 index 00000000000..b69e7ab3bd2 --- /dev/null +++ b/ckan/tests/lib/test_jobs.py @@ -0,0 +1,201 @@ +# encoding: utf-8 + +u''' +Tests for ``ckan.lib.jobs``. +''' + +import datetime + +from nose.tools import ok_, assert_equal, raises +import rq + +import ckan.lib.jobs as jobs +from ckan.tests.helpers import changed_config, recorded_logs, RQTestBase +from ckan.common import config + + +class TestQueueNamePrefixes(RQTestBase): + + def test_queue_name_prefix_contains_site_id(self): + prefix = jobs.add_queue_name_prefix(u'') + ok_(config[u'ckan.site_id'] in prefix) + + def test_queue_name_removal_with_prefix(self): + plain = u'foobar' + prefixed = jobs.add_queue_name_prefix(plain) + assert_equal(jobs.remove_queue_name_prefix(prefixed), plain) + + @raises(ValueError) + def test_queue_name_removal_without_prefix(self): + jobs.remove_queue_name_prefix(u'foobar') + + +class TestEnqueue(RQTestBase): + + def test_enqueue_return_value(self): + job = self.enqueue() + ok_(isinstance(job, rq.job.Job)) + + def test_enqueue_args(self): + self.enqueue() + self.enqueue(args=[1, 2]) + all_jobs = self.all_jobs() + assert_equal(len(all_jobs), 2) + assert_equal(len(all_jobs[0].args), 0) + assert_equal(all_jobs[1].args, [1, 2]) + + def test_enqueue_kwargs(self): + self.enqueue() + self.enqueue(kwargs={u'foo': 1}) + all_jobs = self.all_jobs() + assert_equal(len(all_jobs), 2) + assert_equal(len(all_jobs[0].kwargs), 0) + assert_equal(all_jobs[1].kwargs, {u'foo': 1}) + + def test_enqueue_title(self): + self.enqueue() + self.enqueue(title=u'Title') + all_jobs = self.all_jobs() + assert_equal(len(all_jobs), 2) + assert_equal(all_jobs[0].meta[u'title'], None) + assert_equal(all_jobs[1].meta[u'title'], u'Title') + + def test_enqueue_queue(self): + self.enqueue() + self.enqueue(queue=u'my_queue') + all_jobs = self.all_jobs() + assert_equal(len(all_jobs), 2) + assert_equal(all_jobs[0].origin, + jobs.add_queue_name_prefix(jobs.DEFAULT_QUEUE_NAME)) + assert_equal(all_jobs[1].origin, + jobs.add_queue_name_prefix(u'my_queue')) + + +class TestGetAllQueues(RQTestBase): + + def test_foreign_queues_are_ignored(self): + u''' + Test that foreign RQ-queues are ignored. + ''' + # Create queues for this CKAN instance + self.enqueue(queue=u'q1') + self.enqueue(queue=u'q2') + # Create queue for another CKAN instance + with changed_config(u'ckan.site_id', u'some-other-ckan-instance'): + self.enqueue(queue=u'q2') + # Create queue not related to CKAN + rq.Queue(u'q4').enqueue_call(jobs.test_job) + all_queues = jobs.get_all_queues() + names = {jobs.remove_queue_name_prefix(q.name) for q in all_queues} + assert_equal(names, {u'q1', u'q2'}) + + +class TestGetQueue(RQTestBase): + + def test_get_queue_default_queue(self): + u''' + Test that the default queue is returned if no queue is given. + ''' + q = jobs.get_queue() + assert_equal(jobs.remove_queue_name_prefix(q.name), + jobs.DEFAULT_QUEUE_NAME) + + def test_get_queue_other_queue(self): + u''' + Test that a different queue can be given. + ''' + q = jobs.get_queue(u'my_queue') + assert_equal(jobs.remove_queue_name_prefix(q.name), u'my_queue') + + +class TestJobFromID(RQTestBase): + + def test_job_from_id_existing(self): + job = self.enqueue() + assert_equal(jobs.job_from_id(job.id), job) + job = self.enqueue(queue=u'my_queue') + assert_equal(jobs.job_from_id(job.id), job) + + @raises(KeyError) + def test_job_from_id_not_existing(self): + jobs.job_from_id(u'does-not-exist') + + +class TestDictizeJob(RQTestBase): + + def test_dictize_job(self): + job = self.enqueue(title=u'Title', queue=u'my_queue') + d = jobs.dictize_job(job) + assert_equal(d[u'id'], job.id) + assert_equal(d[u'title'], u'Title') + assert_equal(d[u'queue'], u'my_queue') + dt = datetime.datetime.strptime(d[u'created'], u'%Y-%m-%dT%H:%M:%S') + now = datetime.datetime.utcnow() + ok_(abs((now - dt).total_seconds()) < 10) + + +def failing_job(): + u''' + A background job that fails. + ''' + raise RuntimeError(u'JOB FAILURE') + + +class TestWorker(RQTestBase): + + def test_worker_logging_lifecycle(self): + u''' + Test that a logger's lifecycle is logged. + ''' + queue = u'my_queue' + job = self.enqueue(queue=queue) + with recorded_logs(u'ckan.lib.jobs') as logs: + worker = jobs.Worker([queue]) + worker.work(burst=True) + messages = logs.messages[u'info'] + # We expect 4 log messages: Worker start, job start, job end, + # worker end. + assert_equal(len(messages), 4) + ok_(worker.key in messages[0]) + ok_(queue in messages[0]) + ok_(worker.key in messages[1]) + ok_(job.id in messages[1]) + ok_(worker.key in messages[2]) + ok_(job.id in messages[2]) + ok_(worker.key in messages[3]) + + def test_worker_exception_logging(self): + u''' + Test that exceptions in a job are logged. + ''' + job = self.enqueue(failing_job) + worker = jobs.Worker() + + # Prevent worker from forking so that we can capture log + # messages from within the job + def execute_job(*args, **kwargs): + return worker.perform_job(*args, **kwargs) + + worker.execute_job = execute_job + with recorded_logs(u'ckan.lib.jobs') as logs: + worker.work(burst=True) + logs.assert_log(u'error', u'JOB FAILURE') + + def test_worker_default_queue(self): + self.enqueue() + self.enqueue(queue=u'my_queue') + jobs.Worker().work(burst=True) + all_jobs = self.all_jobs() + assert_equal(len(all_jobs), 1) + assert_equal(jobs.remove_queue_name_prefix(all_jobs[0].origin), + u'my_queue') + + def test_worker_multiple_queues(self): + self.enqueue() + self.enqueue(queue=u'queue1') + self.enqueue(queue=u'queue2') + jobs.Worker([u'queue1', u'queue2']).work(burst=True) + all_jobs = self.all_jobs() + assert_equal(len(all_jobs), 1) + assert_equal(jobs.remove_queue_name_prefix(all_jobs[0].origin), + jobs.DEFAULT_QUEUE_NAME) diff --git a/ckan/tests/logic/action/test_delete.py b/ckan/tests/logic/action/test_delete.py index ad9eba3ab90..6563cb926b5 100644 --- a/ckan/tests/logic/action/test_delete.py +++ b/ckan/tests/logic/action/test_delete.py @@ -1,5 +1,7 @@ # encoding: utf-8 +import re + import nose.tools import ckan.tests.helpers as helpers @@ -7,10 +9,15 @@ import ckan.logic as logic import ckan.model as model import ckan.plugins as p +import ckan.lib.jobs as jobs import ckan.lib.search as search + assert_equals = nose.tools.assert_equals assert_raises = nose.tools.assert_raises +eq = nose.tools.eq_ +ok = nose.tools.ok_ +raises = nose.tools.raises class TestDelete: @@ -482,3 +489,57 @@ def test_missing_id_returns_error(self): def test_bad_id_returns_404(self): assert_raises(logic.NotFound, helpers.call_action, 'dataset_purge', id='123') + + +class TestJobClear(helpers.FunctionalRQTestBase): + + def test_all_queues(self): + ''' + Test clearing all queues. + ''' + self.enqueue() + self.enqueue(queue=u'q') + self.enqueue(queue=u'q') + self.enqueue(queue=u'q') + queues = helpers.call_action(u'job_clear') + eq({jobs.DEFAULT_QUEUE_NAME, u'q'}, set(queues)) + all_jobs = self.all_jobs() + eq(len(all_jobs), 0) + + def test_specific_queues(self): + ''' + Test clearing specific queues. + ''' + job1 = self.enqueue() + job2 = self.enqueue(queue=u'q1') + job3 = self.enqueue(queue=u'q1') + job4 = self.enqueue(queue=u'q2') + with helpers.recorded_logs(u'ckan.logic') as logs: + queues = helpers.call_action(u'job_clear', queues=[u'q1', u'q2']) + eq({u'q1', u'q2'}, set(queues)) + all_jobs = self.all_jobs() + eq(len(all_jobs), 1) + eq(all_jobs[0], job1) + logs.assert_log(u'info', u'q1') + logs.assert_log(u'info', u'q2') + + +class TestJobCancel(helpers.FunctionalRQTestBase): + + def test_existing_job(self): + ''' + Test cancelling an existing job. + ''' + job1 = self.enqueue(queue=u'q') + job2 = self.enqueue(queue=u'q') + with helpers.recorded_logs(u'ckan.logic') as logs: + helpers.call_action(u'job_cancel', id=job1.id) + all_jobs = self.all_jobs() + eq(len(all_jobs), 1) + eq(all_jobs[0], job2) + assert_raises(KeyError, jobs.job_from_id, job1.id) + logs.assert_log(u'info', re.escape(job1.id)) + + @raises(logic.NotFound) + def test_not_existing_job(self): + helpers.call_action(u'job_cancel', id=u'does-not-exist') diff --git a/ckan/tests/logic/action/test_get.py b/ckan/tests/logic/action/test_get.py index 4aa1051edee..d0649cf3415 100644 --- a/ckan/tests/logic/action/test_get.py +++ b/ckan/tests/logic/action/test_get.py @@ -1,5 +1,7 @@ # encoding: utf-8 +import datetime + import nose.tools import ckan.logic as logic @@ -11,6 +13,7 @@ eq = nose.tools.eq_ +ok = nose.tools.ok_ assert_raises = nose.tools.assert_raises @@ -961,10 +964,10 @@ def test_package_search_with_fq_excludes_private(self): factories.Dataset(user=user, private=True, owner_org=org['name']) fq = "capacity:private" - results = helpers.call_action('package_search', fq=fq)['results'] + results = logic.get_action('package_search')( + {}, {'fq': fq})['results'] - eq(len(results), 1) - eq(results[0]['name'], dataset['name']) + eq(len(results), 0) def test_package_search_with_fq_excludes_drafts(self): ''' @@ -997,7 +1000,8 @@ def test_package_search_with_include_drafts_option_excludes_drafts_for_anon_user draft_dataset = factories.Dataset(user=user, state='draft') factories.Dataset(user=user, private=True, owner_org=org['name']) - results = helpers.call_action('package_search', include_drafts=True)['results'] + results = logic.get_action('package_search')( + {}, {'include_drafts': True})['results'] eq(len(results), 1) nose.tools.assert_not_equals(results[0]['name'], draft_dataset['name']) @@ -1018,8 +1022,8 @@ def test_package_search_with_include_drafts_option_includes_drafts_for_sysadmin( other_draft_dataset = factories.Dataset(user=other_user, state='draft') factories.Dataset(user=user, private=True, owner_org=org['name']) - results = helpers.call_action('package_search', include_drafts=True, - context={'user': sysadmin['name']})['results'] + results = logic.get_action('package_search')( + {'user': sysadmin['name']}, {'include_drafts': True})['results'] eq(len(results), 3) names = [r['name'] for r in results] @@ -1042,8 +1046,8 @@ def test_package_search_with_include_drafts_false_option_doesnot_include_drafts_ other_draft_dataset = factories.Dataset(user=other_user, state='draft') factories.Dataset(user=user, private=True, owner_org=org['name']) - results = helpers.call_action('package_search', include_drafts=False, - context={'user': sysadmin['name']})['results'] + results = logic.get_action('package_search')( + {'user': sysadmin['name']}, {'include_drafts': False})['results'] eq(len(results), 1) names = [r['name'] for r in results] @@ -1066,8 +1070,8 @@ def test_package_search_with_include_drafts_option_includes_drafts_for_user(self other_draft_dataset = factories.Dataset(user=other_user, state='draft', name="other-draft-dataset") factories.Dataset(user=user, private=True, owner_org=org['name'], name="private-dataset") - results = helpers.call_action('package_search', include_drafts=True, - context={'user': user['name']})['results'] + results = logic.get_action('package_search')( + {'user': user['name']}, {'include_drafts': True})['results'] eq(len(results), 3) names = [r['name'] for r in results] @@ -1092,8 +1096,8 @@ def test_package_search_with_fq_for_create_user_id_will_include_datasets_for_oth factories.Dataset(user=user, private=True, owner_org=org['name'], name="private-dataset") fq = "creator_user_id:{0}".format(other_user['id']) - results = helpers.call_action('package_search', fq=fq, - context={'user': user['name']})['results'] + results = logic.get_action('package_search')( + {'user': user['name']}, {'fq': fq})['results'] eq(len(results), 1) names = [r['name'] for r in results] @@ -1118,8 +1122,9 @@ def test_package_search_with_fq_for_create_user_id_will_not_include_drafts_for_o factories.Dataset(user=user, private=True, owner_org=org['name'], name="private-dataset") fq = "(creator_user_id:{0} AND +state:draft)".format(other_user['id']) - results = helpers.call_action('package_search', fq=fq, - context={'user': user['name']})['results'] + results = logic.get_action('package_search')( + {'user': user['name']}, + {'fq': fq, 'include_drafts': True})['results'] eq(len(results), 0) @@ -1139,8 +1144,9 @@ def test_package_search_with_fq_for_creator_user_id_and_drafts_and_include_draft factories.Dataset(user=user, private=True, owner_org=org['name'], name="private-dataset") fq = "(creator_user_id:{0} AND +state:draft)".format(other_user['id']) - results = helpers.call_action('package_search', fq=fq, include_drafts=True, - context={'user': user['name']})['results'] + results = logic.get_action('package_search')( + {'user': user['name']}, + {'fq': fq, 'include_drafts': True})['results'] eq(len(results), 0) @@ -1160,8 +1166,9 @@ def test_package_search_with_fq_for_creator_user_id_and_include_drafts_option_wi factories.Dataset(user=user, private=True, owner_org=org['name'], name="private-dataset") fq = "creator_user_id:{0}".format(other_user['id']) - results = helpers.call_action('package_search', fq=fq, include_drafts=True, - context={'user': user['name']})['results'] + results = logic.get_action('package_search')( + {'user': user['name']}, + {'fq': fq, 'include_drafts': True})['results'] names = [r['name'] for r in results] eq(len(results), 1) @@ -1184,8 +1191,9 @@ def test_package_search_with_fq_for_create_user_id_will_include_drafts_for_other factories.Dataset(user=user, private=True, owner_org=org['name'], name="private-dataset") fq = "(creator_user_id:{0} AND +state:draft)".format(user['id']) - results = helpers.call_action('package_search', fq=fq, - context={'user': sysadmin['name']})['results'] + results = logic.get_action('package_search')( + {'user': sysadmin['name']}, + {'fq': fq})['results'] names = [r['name'] for r in results] eq(len(results), 1) @@ -1203,10 +1211,8 @@ def test_package_search_private_with_include_private(self): factories.Dataset(user=user, state='draft') private_dataset = factories.Dataset(user=user, private=True, owner_org=org['name']) - results = helpers.call_action( - 'package_search', - include_private=True, - context={'user': user['name']})['results'] + results = logic.get_action('package_search')( + {'user': user['name']}, {'include_private': True})['results'] eq([r['name'] for r in results], [private_dataset['name']]) @@ -1217,10 +1223,8 @@ def test_package_search_private_with_include_private_wont_show_other_orgs_privat org2 = factories.Organization(user=user2) private_dataset = factories.Dataset(user=user2, private=True, owner_org=org2['name']) - results = helpers.call_action( - 'package_search', - include_private=True, - context={'user': user['name']})['results'] + results = logic.get_action('package_search')( + {'user': user['name']}, {'include_private': True})['results'] eq([r['name'] for r in results], []) @@ -1230,10 +1234,8 @@ def test_package_search_private_with_include_private_syadmin(self): org = factories.Organization(user=user) private_dataset = factories.Dataset(user=user, private=True, owner_org=org['name']) - results = helpers.call_action( - 'package_search', - include_private=True, - context={'user': sysadmin['name']})['results'] + results = logic.get_action('package_search')( + {'user': sysadmin['name']}, {'include_private': True})['results'] eq([r['name'] for r in results], [private_dataset['name']]) @@ -2087,3 +2089,55 @@ def test_followee_list_with_q(self): eq(len(followee_list), 1) eq(followee_list[0]['display_name'], 'Environment') + + +class TestJobList(helpers.FunctionalRQTestBase): + + def test_all_queues(self): + ''' + Test getting jobs from all queues. + ''' + job1 = self.enqueue() + job2 = self.enqueue() + job3 = self.enqueue(queue=u'my_queue') + jobs = helpers.call_action(u'job_list') + eq(len(jobs), 3) + eq({job[u'id'] for job in jobs}, {job1.id, job2.id, job3.id}) + + def test_specific_queues(self): + ''' + Test getting jobs from specific queues. + ''' + job1 = self.enqueue() + job2 = self.enqueue(queue=u'q2') + job3 = self.enqueue(queue=u'q3') + job4 = self.enqueue(queue=u'q3') + jobs = helpers.call_action(u'job_list', queues=[u'q2']) + eq(len(jobs), 1) + eq(jobs[0][u'id'], job2.id) + jobs = helpers.call_action(u'job_list', queues=[u'q2', u'q3']) + eq(len(jobs), 3) + eq({job[u'id'] for job in jobs}, {job2.id, job3.id, job4.id}) + + +class TestJobShow(helpers.FunctionalRQTestBase): + + def test_existing_job(self): + ''' + Test showing an existing job. + ''' + job = self.enqueue(queue=u'my_queue', title=u'Title') + d = helpers.call_action(u'job_show', id=job.id) + eq(d[u'id'], job.id) + eq(d[u'title'], u'Title') + eq(d[u'queue'], u'my_queue') + dt = datetime.datetime.strptime(d[u'created'], u'%Y-%m-%dT%H:%M:%S') + now = datetime.datetime.utcnow() + ok(abs((now - dt).total_seconds()) < 10) + + @nose.tools.raises(logic.NotFound) + def test_not_existing_job(self): + ''' + Test showing a not existing job. + ''' + helpers.call_action(u'job_show', id=u'does-not-exist') diff --git a/ckan/tests/test_coding_standards.py b/ckan/tests/test_coding_standards.py index d5129476756..679ca764859 100644 --- a/ckan/tests/test_coding_standards.py +++ b/ckan/tests/test_coding_standards.py @@ -219,8 +219,10 @@ def find_unprefixed_string_literals(filename): break except ValueError: continue - first_char = lines[lineno][col_offset] - if first_char not in u'ub': # Don't allow capital U and B either + leading = lines[lineno][col_offset - 1:col_offset + 1] + if leading[:-1] == u'[': # data['id'] is unambiguous, ignore these + continue + if leading[-1:] not in u'ub': # Don't allow capital U and B either problems.append((lineno + 1, col_offset + 1)) return sorted(problems) diff --git a/ckanext/datapusher/plugin.py b/ckanext/datapusher/plugin.py index ea9a5fc8bb4..a7db0f66aa7 100644 --- a/ckanext/datapusher/plugin.py +++ b/ckanext/datapusher/plugin.py @@ -41,11 +41,11 @@ def resource_data(self, id, resource_id): except logic.ValidationError: pass - base.redirect(core_helpers.url_for( + core_helpers.redirect_to( controller='ckanext.datapusher.plugin:ResourceDataController', action='resource_data', id=id, - resource_id=resource_id) + resource_id=resource_id ) try: diff --git a/ckanext/example_igroupform/tests/test_controllers.py b/ckanext/example_igroupform/tests/test_controllers.py index be598a13aae..6f64973ff5a 100644 --- a/ckanext/example_igroupform/tests/test_controllers.py +++ b/ckanext/example_igroupform/tests/test_controllers.py @@ -136,7 +136,7 @@ def test_save(self): response = submit_and_follow(app, form, env, 'save') # check correct redirect assert_equal(response.req.url, - 'http://localhost/%s/saved' % custom_group_type) + 'http://test.ckan.net/%s/saved' % custom_group_type) # check saved ok group = model.Group.by_name(u'saved') assert_equal(group.title, u'') @@ -172,7 +172,7 @@ def test_save(self): response = submit_and_follow(app, form, env, 'save') # check correct redirect assert_equal(response.req.url, - 'http://localhost/%s/saved' % group_type) + 'http://test.ckan.net/%s/saved' % group_type) # check saved ok group = model.Group.by_name(u'saved') assert_equal(group.title, u'') diff --git a/ckanext/example_ipermissionlabels/__init__.py b/ckanext/example_ipermissionlabels/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ckanext/example_ipermissionlabels/plugin.py b/ckanext/example_ipermissionlabels/plugin.py new file mode 100644 index 00000000000..7987fe85b5b --- /dev/null +++ b/ckanext/example_ipermissionlabels/plugin.py @@ -0,0 +1,42 @@ +# encoding: utf-8 + +from ckan import plugins +from ckan.lib.plugins import DefaultPermissionLabels +from ckan.plugins.toolkit import get_action + + +class ExampleIPermissionLabelsPlugin( + plugins.SingletonPlugin, DefaultPermissionLabels): + u''' + Example permission labels plugin that makes datasets whose + notes field starts with "Proposed:" visible only to their + creator and Admin users in the organization assigned to the + dataset. + ''' + plugins.implements(plugins.IPermissionLabels) + + def get_dataset_labels(self, dataset_obj): + u''' + Use creator-*, admin-* labels for proposed datasets + ''' + if dataset_obj.notes.startswith(u'Proposed:'): + labels = [u'creator-%s' % dataset_obj.creator_user_id] + if dataset_obj.owner_org: + return labels + [u'admin-%s' % dataset_obj.owner_org] + return labels + + return super(ExampleIPermissionLabelsPlugin, self).get_dataset_labels( + dataset_obj) + + def get_user_dataset_labels(self, user_obj): + u''' + Include admin-* labels for users in addition to default labels + creator-*, member-* and public + ''' + labels = super(ExampleIPermissionLabelsPlugin, self + ).get_user_dataset_labels(user_obj) + if user_obj: + orgs = get_action(u'organization_list_for_user')( + {u'user': user_obj.id}, {u'permission': u'admin'}) + labels.extend(u'admin-%s' % o['id'] for o in orgs) + return labels diff --git a/ckanext/example_ipermissionlabels/tests/__init__.py b/ckanext/example_ipermissionlabels/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ckanext/example_ipermissionlabels/tests/test_example_ipermissionlabels.py b/ckanext/example_ipermissionlabels/tests/test_example_ipermissionlabels.py new file mode 100644 index 00000000000..77a1572dedb --- /dev/null +++ b/ckanext/example_ipermissionlabels/tests/test_example_ipermissionlabels.py @@ -0,0 +1,112 @@ +# encoding: utf-8 + +''' +Tests for the ckanext.example_ipermissionlabels extension +''' +from nose.tools import assert_raises, assert_equal + +import ckan.plugins +from ckan.plugins.toolkit import get_action, NotAuthorized +from ckan.tests.helpers import FunctionalTestBase, call_auth +from ckan.tests import factories +from ckan import model + + +class TestExampleIPermissionLabels(FunctionalTestBase): + @classmethod + def setup_class(cls): + # Test code should use CKAN's plugins.load() function to load plugins + # to be tested. + ckan.plugins.load(u'example_ipermissionlabels') + + @classmethod + def teardown_class(cls): + ckan.plugins.unload(u'example_ipermissionlabels') + + def test_normal_dataset_permissions_are_normal(self): + user = factories.User() + user2 = factories.User() + user3 = factories.User() + org = factories.Organization(user=user) + org2 = factories.Organization( + user=user2, + users=[{u'name': user3['id'], u'capacity': u'member'}]) + + dataset = factories.Dataset( + name=u'd1', user=user, private=True, owner_org=org['id']) + dataset2 = factories.Dataset( + name=u'd2', user=user2, private=True, owner_org=org2['id']) + + results = get_action(u'package_search')( + {u'user': user['name']}, {u'include_private': True})['results'] + names = [r['name'] for r in results] + assert_equal(names, [u'd1']) + + results = get_action(u'package_search')( + {u'user': user3['name']}, {u'include_private': True})['results'] + names = [r['name'] for r in results] + assert_equal(names, [u'd2']) + + def test_proposed_overrides_public(self): + user = factories.User() + dataset = factories.Dataset( + name=u'd1', notes=u'Proposed:', user=user) + + results = get_action(u'package_search')( + {}, {u'include_private': True})['results'] + names = [r['name'] for r in results] + assert_equal(names, []) + + assert_raises( + NotAuthorized, call_auth, u'package_show', + {u'user': u'', u'model': model}, id=u'd1') + + def test_proposed_dataset_visible_to_creator(self): + user = factories.User() + dataset = factories.Dataset( + name=u'd1', notes=u'Proposed:', user=user) + + results = get_action(u'package_search')( + {u'user': user['name']}, {u'include_private': True})['results'] + names = [r['name'] for r in results] + assert_equal(names, [u'd1']) + + ret = call_auth(u'package_show', + {u'user': user['name'], u'model': model}, id=u'd1') + assert ret + + def test_proposed_dataset_visible_to_org_admin(self): + user = factories.User() + user2 = factories.User() + org = factories.Organization( + user=user2, + users=[{u'name': user['id'], u'capacity': u'editor'}]) + dataset = factories.Dataset( + name=u'd1', notes=u'Proposed:', user=user, owner_org=org['id']) + + results = get_action(u'package_search')( + {u'user': user2[u'name']}, {u'include_private': True})['results'] + names = [r['name'] for r in results] + assert_equal(names, [u'd1']) + + ret = call_auth(u'package_show', + {u'user': user2['name'], u'model': model}, id=u'd1') + assert ret + + def test_proposed_dataset_invisible_to_another_editor(self): + user = factories.User() + user2 = factories.User() + org = factories.Organization( + user=user2, + users=[{u'name': user['id'], u'capacity': u'editor'}]) + dataset = factories.Dataset( + name=u'd1', notes=u'Proposed:', user=user2, owner_org=org['id']) + + results = get_action(u'package_search')( + {u'user': user['name']}, {u'include_private': True})['results'] + names = [r['name'] for r in results] + assert_equal(names, []) + + assert_raises( + NotAuthorized, call_auth, u'package_show', + {u'user': user['name'], u'model': model}, id=u'd1') diff --git a/doc/api/index.rst b/doc/api/index.rst index fac6ede30ec..c5b2bce8848 100644 --- a/doc/api/index.rst +++ b/doc/api/index.rst @@ -1,3 +1,5 @@ +.. _action api: + ========= API guide ========= diff --git a/doc/contributing/test.rst b/doc/contributing/test.rst index 0258c2be0ef..bcbf0adcac6 100644 --- a/doc/contributing/test.rst +++ b/doc/contributing/test.rst @@ -60,6 +60,8 @@ Create test databases: This database connection is specified in the ``test-core.ini`` file by the ``sqlalchemy.url`` parameter. +You should also make sure that the :ref:`Redis database ` +configured in ``test-core.ini`` is different from your production database. ~~~~~~~~~~~~~ Run the tests diff --git a/doc/extensions/best-practices.rst b/doc/extensions/best-practices.rst index 422dc50bec2..dcc27ff5608 100644 --- a/doc/extensions/best-practices.rst +++ b/doc/extensions/best-practices.rst @@ -10,6 +10,8 @@ Follow CKAN's coding standards See :doc:`/contributing/index`. +.. _use the plugins toolkit: + ------------------------------------------------- Use the plugins toolkit instead of importing CKAN ------------------------------------------------- @@ -37,17 +39,67 @@ Implement each plugin class in a separate Python module This keeps CKAN's plugin loading order simple, see :ref:`ckan.plugins`. -.. _extension config setting names best practice: +.. _avoid name clashes: + +------------------ +Avoid name clashes +------------------ +Many of the names you pick for your identifiers and files must be unique in +relation to the names used by core CKAN and other extensions. To avoid +conflicts you should prefix any public name that your extension introduces with +the name of your extension. For example: + +* The names of *configuration settings* introduced by your extension should + have the form ``my_extension.my_config_setting``. + +* The names of *templates and template snippets* introduced by your extension + should begin with the name of your extension:: + + snippets/my_extension_useful_snippet.html + + If you have add a lot of templates you can also put them into a separate + folder named after your extension instead. + +* The names of *template helper functions* introduced by your extension should + begin with the name of your extension. For example: + + .. literalinclude:: /../ckanext/example_theme/v08_custom_helper_function/plugin.py + :pyobject: ExampleThemePlugin.get_helpers + +* The names of *JavaScript modules* introduced by your extension should begin + with the name of your extension. For example + ``fanstatic/example_theme_popover.js``: + + .. literalinclude:: /../ckanext/example_theme/v16_initialize_a_javascript_module/fanstatic/example_theme_popover.js + +* The names of *API action functions* introduced by your extension should begin + with the name of your extension. For example + ``my_extension_foobarize_everything``. + +* The names of *background job queues* introduced by your extension should + begin with the name of your extension. For example + ``my_extension:super-special-job-queue``. + +In some situations, a resource may even be shared between multiple CKAN +*instances*, which requires an even higher degree of uniqueness for the +corresponding names. In that case, you should also prefix your identifiers with +the CKAN site ID, which is available via + +:: + + try: + # CKAN 2.7 and later + from ckan.common import config + except ImportError: + # CKAN 2.6 and earlier + from pylons import config ------------------------------------------------------------------ -Names of config settings should include the name of the extension ------------------------------------------------------------------ + site_id = config[u'ckan.site_id'] -Names of config settings provided by extensions should include the name -of the extension, to avoid conflicting with core config settings or with -config settings from other extensions. For example:: +Currently this only affects the :ref:`Redis database `: - ckan.my_extension.show_most_popular_groups = True +* All *keys in the Redis database* created by your extension should be prefixed + with both the CKAN site ID and your extension's name. ------------------------------------- diff --git a/doc/extensions/custom-config-settings.rst b/doc/extensions/custom-config-settings.rst index 81c350fdc1c..f9d0ef0589c 100644 --- a/doc/extensions/custom-config-settings.rst +++ b/doc/extensions/custom-config-settings.rst @@ -32,7 +32,7 @@ will be allowed to create groups. Names of config settings provided by extensions should include the name of the extension, to avoid conflicting with core config settings or with config settings from other extensions. - See :ref:`extension config setting names best practice`. + See :ref:`avoid name clashes`. .. note:: diff --git a/doc/extensions/plugins-toolkit.rst b/doc/extensions/plugins-toolkit.rst index fea2fc57421..905a11954df 100644 --- a/doc/extensions/plugins-toolkit.rst +++ b/doc/extensions/plugins-toolkit.rst @@ -1,3 +1,5 @@ +.. py:module:: ckan.plugins.toolkit + ------------------------- Plugins toolkit reference ------------------------- diff --git a/doc/maintaining/authorization.rst b/doc/maintaining/authorization.rst index acf9adf0265..342eec63b0c 100644 --- a/doc/maintaining/authorization.rst +++ b/doc/maintaining/authorization.rst @@ -86,5 +86,14 @@ Extensions CKAN extensions can implement custom authorization rules by overriding the authorization functions that CKAN uses. This is done by implementing the -:py:class:`~ckan.plugins.interfaces.IAuthFunctions` plugin interface. To get -started with writing CKAN extensions, see :doc:`/extensions/index`. +:py:class:`~ckan.plugins.interfaces.IAuthFunctions` plugin interface. + +Dataset visibility is determined by permission labels stored in the +search index. +Implement the :py:class:`~ckan.plugins.interfaces.IPermissionLabels` +plugin interface then :ref:`rebuild your search index ` +to change your dataset visibility rules. There is no +no need to override the ``package_show`` auth function, it will inherit +these changes automatically. + +To get started with writing CKAN extensions, see :doc:`/extensions/index`. diff --git a/doc/maintaining/background-tasks.rst b/doc/maintaining/background-tasks.rst index db43cdc4570..8c68c380dca 100644 --- a/doc/maintaining/background-tasks.rst +++ b/doc/maintaining/background-tasks.rst @@ -1,175 +1,356 @@ -================ -Background tasks -================ +.. _background jobs: -.. version-added: 1.5.1 - -CKAN allows you to create tasks that run in the 'background', that is -asynchronously and without blocking the main application (these tasks can also -be automatically retried in the case of transient failures). Such tasks can be +=============== +Background jobs +=============== +CKAN allows you to create jobs that run in the 'background', i.e. +asynchronously and without blocking the main application. Such jobs can be created in :doc:`Extensions ` or in core CKAN. -Background tasks can be essential to providing certain kinds of functionality, +Background jobs can be essential to providing certain kinds of functionality, for example: -* Creating webhooks that notify other services when certain changes occur (for +* Creating web-hooks that notify other services when certain changes occur (for example a dataset is updated) + * Performing processing or validation or on data (as done by the Archiver and DataStorer Extensions) +Basically, any piece of work that takes too long to perform while the main +application is waiting is a good candidate for a background job. + +.. note:: + + The current background job system is based on RQ_ and was introduced in + CKAN 2.7. See :ref:`background jobs migration` for details on how to + migrate your jobs from the previous system introduced in CKAN 1.5. + + .. _RQ: http://python-rq.org + + +.. _background jobs writing: + +Writing and enqueuing background jobs +===================================== + +.. note:: + + This section is only relevant for developers working on CKAN or an + extension. + +The core of a background job is a regular Python function. For example, here's +a very simply job function that logs a message:: + + import logging + + def log_job(msg, level=logging.INFO, logger=u'ckan'): + u''' + Background job to log a message. + ''' + logger = logging.getLogger(logger) + logger.log(level, msg) + + +And that's it. Your job function can use all the usual Python features. Just +keep in mind that your function will be run in a separate process by a +:ref:`worker `, so your function should not depend on +the current state of global variables, etc. Ideally your job function should +receive all the information it needs via its arguments. + +In addition, the module that contains your job function must be importable by +the worker, which must also be able to get the function from its module. This +means that nested functions, lambdas and instance methods cannot be used as job +functions. While class methods of top-level classes can be used it's best to +stick to ordinary module-level functions. + +.. note:: + + Background jobs do not support return values (since they run asynchronously + there is no place to return those values to). If your job function produces + a result then it needs to store that result, for example in a file or in + CKAN's database. + +Once you have a job function, all you need to do is to use +``ckan.lib.jobs.enqueue`` to create an actual job out of it:: + + import ckan.lib.jobs as jobs + + jobs.enqueue(log_job, [u'My log message']) + +This will place a job on the :ref:`job queue ` where it +can be picked up and executed by a worker. + +.. note:: + + Extensions should use :py:func:`ckan.plugins.toolkit.enqueue_job` instead. + It's the same function but accessing it via :py:mod:`ckan.plugins.toolkit` + :ref:`decouples your code from CKAN's internal structure `. + +The first argument to ``enqueue`` is the job function to use. The second is a +list of the arguments which should be passed to the function. You can omit it +in which case no arguments will be passed. You can also pass keyword arguments +in a dict as the third argument:: + + jobs.enqueue(log_job, [u'My log message'], {u'logger': u'ckanext.foo'}) + +You can also give the job a title which can be useful for identifying it when +:ref:`managing the job queue `:: + + jobs.enqueue(log_job, [u'My log message'], title=u'My log job') + + +.. _background jobs workers: + +Running background jobs +======================= +Jobs are placed on the :ref:`job queue `, from which +they can be retrieved and executed. Since jobs are designed to run +asynchronously that happens in a separate process called a *worker*. + +After it has been started, a worker listens on the queue until a job is +enqueued. The worker then removes the job from the queue and executes it. +Afterwards the worker waits again for the next job to be enqueued. + +.. note:: + + Executed jobs are discarded. In particular, no information about past jobs + is kept. + +Workers can be started using the :ref:`paster jobs worker` command:: + + paster --plugin=ckan jobs worker --config=/etc/ckan/default/development.ini + +The worker process will run indefinitely (you can stop it using ``CTRL+C``). + +.. note:: + + You can run multiple workers if your setup uses many or particularly long + background jobs. + + +.. _background jobs supervisor: + +Using Supervisor +^^^^^^^^^^^^^^^^ +In a production setting, the worker should be run in a more robust way. One +possibility is to use Supervisor_. + +.. _Supervisor: http://supervisord.org/ + +First install Supervisor:: + + sudo apt-get install supervisor + +Next copy the configuration file template:: + + sudo cp /usr/lib/ckan/default/src/ckan/ckan/config/supervisor-ckan-worker.conf /etc/supervisor/conf.d + +Open ``/etc/supervisor/conf.d/supervisor-ckan-worker.conf`` in your favourite +text editor and make sure all the settings suit your needs. If you installed +CKAN in a non-default location (somewhere other than ``/usr/lib/ckan/default``) +then you will need to update the paths in the config file (see the comments in +the file for details). + +Restart Supervisor:: + + sudo service supervisor restart + +The worker should now be running. To check its status, use + +:: + + sudo supervisorctl status + +You can restart the worker via + +:: + + sudo supervisorctl restart ckan-worker:* + +To test that background jobs are processed correctly you can enqueue a test job +via + +:: + + paster --plugin=ckan jobs test -c /etc/ckan/default/production.ini + +The worker's log (``/var/log/ckan-worker.log``) should then show how the job +was processed by the worker. + +In case you run into problems, make sure to check the logs of Supervisor and +the worker:: + + cat /var/log/supervisor/supervisord.log + cat /var/log/ckan-worker.log + + + +.. _background jobs management: + +Managing background jobs +======================== +Once they are enqueued, background jobs can be managed via +:ref:`paster ` and the :ref:`web API `. -Enabling background tasks -========================= +List enqueues jobs +^^^^^^^^^^^^^^^^^^ +* :ref:`paster jobs list ` +* :py:func:`ckan.logic.action.get.job_list` -To manage and run background tasks requires a job queue and CKAN uses celery_ -(plus the CKAN database) for this purpose. Thus, to use background tasks you -need to install and run celery_. +Show details about a job +^^^^^^^^^^^^^^^^^^^^^^^^ +* :ref:`paster jobs show ` +* :py:func:`ckan.logic.action.get.job_show` -Installation of celery_ will normally be taken care of by whichever component -or extension utilizes it so we skip that here. +Cancel a job +^^^^^^^^^^^^ +A job that hasn't been processed yet can be canceled via -.. _celery: http://celeryproject.org/ +* :ref:`paster jobs cancel ` +* :py:func:`ckan.logic.action.delete.job_cancel` -To run the celery daemon you have two options: +Clear all enqueued jobs +^^^^^^^^^^^^^^^^^^^^^^^ +* :ref:`paster jobs clear ` +* :py:func:`ckan.logic.action.delete.job_clear` -1. In development setup you can just use paster. This can be done as simply - as:: +Logging +^^^^^^^ +Information about enqueued and processed background jobs is automatically +logged to the CKAN logs. You may need to update your logging configuration to +record messages at the *INFO* level for the messages to be stored. - paster celeryd +.. _background jobs queues: - This only works if you have a ``development.ini`` file in ckan root. +Background job queues +===================== +By default, all functionality related to background jobs uses a single job +queue that is specific to the current CKAN instance. However, in some +situations it is useful to have more than one queue. For example, you might +want to distinguish between short, urgent jobs and longer, less urgent ones. +The urgent jobs should be processed even if a long and less urgent job is +already running. -2. In production, the daemon should be run with a different ini file and be run - as an init script. The simplest way to do this is to install supervisor:: +For such scenarios, the job system supports multiple queues. To use a different +queue, all you have to do is pass the (arbitrary) queue name. For example, to +enqueue a job at a non-default queue:: - apt-get install supervisor + jobs.enqueue(log_job, [u"I'm from a different queue!"], + queue=u'my-own-queue') - Using this file as a template and copy to ``/etc/supservisor/conf.d``:: +Similarly, to start a worker that only listens to the queue you just posted a +job to:: - https://github.com/ckan/ckan/blob/master/ckan/config/celery-supervisor.conf + paster --plugin=ckan jobs worker my-own-queue --config=/etc/ckan/default/development.ini - Alternatively, you can run:: +See the documentation of the various functions and commands for details on how +to use non-standard queues. - paster celeryd --config=/path/to/file.ini +.. note:: + If you create a custom queue in your extension then you should prefix the + queue name using your extension's name. See :ref:`avoid name clashes`. -Writing background tasks -========================== + Queue names are internally automatically prefixed with the CKAN site ID, + so multiple parallel CKAN instances are not a problem. -These instructions should show you how to write an background task and how to -call it from inside CKAN or another extension using celery. -Examples --------- +.. _background jobs migration: -Here are some existing real examples of writing CKAN tasks: +Migrating from CKAN's previous background job system +==================================================== +Before version 2.7 (starting from 1.5), CKAN offered a different background job +system built around Celery_. That system is still available but deprecated and +will be removed in future versions of CKAN. You should therefore update your +code to use the new system described above. -* https://github.com/ckan/ckanext-archiver -* https://github.com/ckan/ckanext-qa -* https://github.com/ckan/ckanext-datastorer +.. _Celery: http://celeryproject.org/ -Setup ------ +Migrating existing job functions is easy. In the old system, a job function +would look like this:: -An entry point is required inside the ``setup.py`` for your extension, and so -you should add something resembling the following that points to a function in -a module. In this case the function is called task_imports in the -``ckanext.NAME.celery_import`` module:: + @celery.task(name=u'my_extension.echofunction') + def echo(message): + print message - entry_points = """ - [ckan.celery_task] - tasks = ckanext.NAME.celery_import:task_imports - """ +As :ref:`described above `, under the new system the +same function would be simply written as -The function, in this case ``task_imports`` should be a function that returns -fully qualified module paths to modules that contain the defined task (see the -next section). In this case we will put all of our tasks in a file called -``tasks.py`` and so ``task_imports`` should be in a file called -``ckanext/NAME/celery_import.py``:: +:: - def task_imports(): - return ['ckanext.NAME.tasks'] + def echo(message): + print message -This returns an iterable of all of the places to look to find tasks, in this -example we are only putting them in one place. +There is no need for a special decorator. In the new system there is also no +need for registering your tasks via ``setup.py``. +Migrating the code that enqueues a task is also easy. Previously it would look +like this:: -Implementing the tasks ----------------------- + celery.send_task(u'my_extension.echofunction', args=[u'Hello World'], + task_id=str(uuid.uuid4())) -The most straightforward way of defining tasks in our ``tasks.py`` module, is -to use the decorators provided by celery. These decorators make it easy to just -define a function and then give it a name and make it accessible to celery. -Make sure you import celery from ckan.lib.celery_app:: +With the new system, it looks as follows:: - from ckan.lib.celery_app import celery + import ckan.lib.jobs as jobs -Implement your function, specifying the arguments you wish it to take. For our -sample we will use a simple echo task that will print out its argument to the -console:: + jobs.enqueue(ckanext.my_extension.plugin.echo, [u'Hello World']) - def echo( message ): - print message +As you can see, the new system does not use strings to identify job functions +but uses the functions directly instead. There is also no need for creating a +job ID, that will be done automatically for you. -Next it is important to decorate your function with the celery task decorator. -You should give the task a name, which is used later on when calling the task:: - @celery.task(name = "NAME.echofunction") - def echo( message ): - print message +Supporting both systems at once +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Not all CKAN installations will immediately update to CKAN 2.7. It might +therefore make sense for you to support both the new and the old job system. +That way you are ready when the old system is removed but can continue to +support older CKAN installations. -That's it, your function is ready to be run asynchronously outside of the main -execution of the CKAN app. Next you should make sure you run ``python setup.py -develop`` in your extensions folder and then go to your CKAN installation -folder (normally pyenv/src/ckan/) to run the following command:: +Such a setup might look as follows. First split your Celery-based job +functions into the job itself and its Celery handler. That is, change - paster celeryd +:: -Once you have done this your task name ``NAME.echofunction`` should appear in -the list of tasks loaded. If it is there then you are all set and ready to go. -If not then you should try the following to try and resolve the problem: + @celery.task(name=u'my_extension.echofunction') + def echo(message): + print message -1. Make sure the entry point is defined correctly in your ``setup.py`` and that - you have executed ``python setup.py develop`` -2. Check that your task_imports function returns an iterable with valid module - names in -3. Ensure that the decorator marks the functions (if there is more than one - decorator, make sure the celery.task is the first one - which means it will - execute last). -4. If none of the above helps, go into #ckan on irc.freenode.net where there - should be people who can help you resolve your issue. +to -Calling the task ----------------- +:: -Now that the task is defined, and has been loaded by celery it is ready to be -called. To call a background task you need to know only the name of the task, -and the arguments that it expects as well as providing it a task id.:: + def echo(message): + print message - import uuid - from ckan.lib.celery_app import celery - celery.send_task("NAME.echofunction", args=["Hello World"], task_id=str(uuid.uuid4())) + @celery.task(name=u'my_extension.echofunction') + def echo_celery(*args, **kwargs): + echo(*args, **kwargs) -After executing this code you should see the message printed in the console -where you ran ``paster celeryd``. +That way, you can call ``echo`` using the new system and use the name for +Celery. +Then use the new system if it is available and fall back to Celery otherwise:: -Retrying on errors ------------------- + def compat_enqueue(name, fn, args=None): + u''' + Enqueue a background job using Celery or RQ. + ''' + try: + # Try to use RQ + from ckan.lib.jobs import enqueue + enqueue(fn, args=args) + except ImportError: + # Fallback to Celery + import uuid + from ckan.lib.celery_app import celery + celery.send_task(name, args=args, task_id=str(uuid.uuid4())) -Should your task fail to complete because of a transient error, it is possible -to ask celery to retry the task, after some period of time. The default wait -before retrying is three minutes, but you can optionally specify this in the -call to retry via the countdown parameter, and you can also specify the -exception that triggered the failure. For our example the call to retry would -look like the following - note that it calls the function name, not the task -name given in the decorator:: +Use that function as follows for enqueuing a job:: - try: - ... some work that may fail, http request? - except Exception, e: - # Retry again in 2 minutes - echo.retry(args=(message), exc=e, countdown=120, max_retries=10) + compat_enqueue(u'my_extension.echofunction', + ckanext.my_extension.plugin.echo, + [u'Hello World']) -If you don't want to wait a period of time you can use the eta datetime -parameter to specify an explicit time to run the task (i.e. 9AM tomorrow) diff --git a/doc/maintaining/configuration.rst b/doc/maintaining/configuration.rst index 52910fdb1c6..35596ba442c 100644 --- a/doc/maintaining/configuration.rst +++ b/doc/maintaining/configuration.rst @@ -695,6 +695,25 @@ Default value: ``None`` List of the extra resource fields that would be used when searching. +Redis Settings +--------------- + +.. _ckan_redis_url: + +ckan.redis.url +^^^^^^^^^^^^^^ + +Example:: + + ckan.redis.url = redis://localhost:7000/1 + +Default value: ``redis://localhost:6379/0`` + +URL to your Redis instance, including the database to be used. + +.. versionadded:: 2.7 + + CORS Settings ------------- diff --git a/doc/maintaining/installing/deployment.rst b/doc/maintaining/installing/deployment.rst index d99decffcd3..8f7bd183031 100644 --- a/doc/maintaining/installing/deployment.rst +++ b/doc/maintaining/installing/deployment.rst @@ -112,7 +112,7 @@ CKAN to run in). 6. Create the Apache config file -------------------------------- -Create your site's Apache config file at ``|apache_config_file|``, with the +Create your site's Apache config file at |apache_config_file|, with the following contents: .. parsed-literal:: @@ -203,7 +203,7 @@ Open ``/etc/apache2/ports.conf``. We need to replace the default port 80 with th 8. Create the Nginx config file ------------------------------- -Create your site's Nginx config file at ``|nginx_config_file|``, with the +Create your site's Nginx config file at |nginx_config_file|, with the following contents: .. parsed-literal:: @@ -248,6 +248,17 @@ You should now be able to visit your server in a web browser and see your new CKAN instance. +-------------------------------------- +10. Setup a worker for background jobs +-------------------------------------- +CKAN uses asynchronous :ref:`background jobs` for long tasks. These jobs are +executed by a separate process which is called a :ref:`worker `. + +To run the worker in a robust way, :ref:`install and configure Supervisor +`. + + --------------- Troubleshooting --------------- diff --git a/doc/maintaining/installing/install-from-package.rst b/doc/maintaining/installing/install-from-package.rst index 37125251238..43ebfd43ba2 100644 --- a/doc/maintaining/installing/install-from-package.rst +++ b/doc/maintaining/installing/install-from-package.rst @@ -29,7 +29,7 @@ CKAN: #. Install the Ubuntu packages that CKAN requires (and 'git', to enable you to install CKAN extensions):: - sudo apt-get install -y nginx apache2 libapache2-mod-wsgi libpq5 git-core + sudo apt-get install -y nginx apache2 libapache2-mod-wsgi libpq5 redis-server git-core #. Download the CKAN package: diff --git a/doc/maintaining/installing/install-from-source.rst b/doc/maintaining/installing/install-from-source.rst index 4bcc247ca26..dcb94d7c7ba 100644 --- a/doc/maintaining/installing/install-from-source.rst +++ b/doc/maintaining/installing/install-from-source.rst @@ -23,7 +23,7 @@ work on CKAN. If you're using a Debian-based operating system (such as Ubuntu) install the required packages with this command:: - sudo apt-get install python-dev postgresql libpq-dev python-pip python-virtualenv git-core solr-jetty openjdk-6-jdk + sudo apt-get install python-dev postgresql libpq-dev python-pip python-virtualenv git-core solr-jetty openjdk-6-jdk redis-server If you're not using a Debian-based operating system, find the best way to install the following packages on your operating system (see @@ -42,6 +42,7 @@ Git `A distributed version control system `_ Jetty `An HTTP server `_ (used for Solr). OpenJDK 6 JDK `The Java Development Kit `_ +Redis `An in-memory data structure store `_ ===================== =============================================== diff --git a/doc/maintaining/paster.rst b/doc/maintaining/paster.rst index 3ec72316443..88ac3b769e9 100644 --- a/doc/maintaining/paster.rst +++ b/doc/maintaining/paster.rst @@ -1,3 +1,5 @@ +.. _paster: + ====================== Command Line Interface ====================== @@ -169,7 +171,6 @@ Paster Commands Reference The following paster commands are supported by CKAN: ================= ============================================================ -celeryd Control celery daemon. check-po-files Check po files for common mistakes color Create or remove a color scheme. create-test-data Create test data in the database. @@ -177,6 +178,7 @@ dataset Manage datasets. datastore Perform commands to set up the datastore. db Perform various tasks on the database. front-end-build Creates and minifies css and JavaScript files +jobs Manage background jobs less Compile all root less documents into their CSS counterparts minify Create minified versions of the given Javascript and CSS files. notify Send out modification notifications. @@ -192,18 +194,6 @@ user Manage users. ================= ============================================================ -celeryd: Control celery daemon -============================== - -Usage:: - - celeryd - run the celery daemon - celeryd run concurrency - run the celery daemon with - argument 'concurrency' - celeryd view - view all tasks in the queue - celeryd clean - delete all tasks in the queue - - check-po-files: Check po files for common mistakes ================================================== @@ -268,6 +258,114 @@ Usage:: front-end-build +.. _paster jobs: + +jobs: Manage background jobs +============================ + +The ``jobs`` command can be used to manage :ref:`background jobs`. + +.. versionadded:: 2.7 + + +.. _paster jobs worker: + +Run a background job worker +^^^^^^^^^^^^^^^^^^^^^^^^^^^ +:: + + paster jobs worker [--burst] [QUEUES] + +Starts a worker that fetches job from the :ref:`job queues ` and executes them. If no queue names are given then it listens to +the default queue. This is equivalent to + +:: + + paster jobs worker default + +If queue names are given then the worker listens to those queues and only +those:: + + paster jobs worker my-custom-queue another-special-queue + +Hence, if you want the worker to listen to the default queue and some others +then you must list the default queue explicitly:: + + paster jobs worker default my-custom-queue + +If the ``--burst`` option is given then the worker will exit as soon as all its +queues are empty. Otherwise it will wait indefinitely until a new job is +enqueued (this is the default). + +.. note:: + + In a production setting you should :ref:`use a more robust way of running + background workers `. + + +.. _paster jobs list: + +List enqueued jobs +^^^^^^^^^^^^^^^^^^ +:: + + paster jobs list [QUEUES] + +Lists the currently enqueued jobs from the given :ref:`job queues `. If no queue names are given then the jobs from all queues are +listed. + + +.. _paster jobs show: + +Show details about a job +^^^^^^^^^^^^^^^^^^^^^^^^ +:: + + paster jobs show ID + +Shows details about the enqueued job with the given ID. + + +.. _paster jobs cancel: + +Cancel a job +^^^^^^^^^^^^ +:: + + paster jobs cancel ID + +Cancels the enqueued job with the given ID. Jobs can only be canceled while +they are enqueued. Once a worker has started executing a job it cannot be +aborted anymore. + + +.. _paster jobs clear: + +Clear job queues +^^^^^^^^^^^^^^^^ +:: + + paster jobs clear [QUEUES] + +Cancels all jobs on the given :ref:`job queues `. If no +queues are given then *all* queues are cleared. + + +.. _paster jobs test: + +Enqueue a test job +^^^^^^^^^^^^^^^^^^ +:: + + paster jobs test [QUEUES] + +Enqueues a test job. If no :ref:`job queues ` are given +then the job is added to the default queue. If queue names are given then a +separate test job is added to each of the queues. + + .. _less: less: Compile all root less documents into their CSS counterparts diff --git a/doc/theming/best-practices.rst b/doc/theming/best-practices.rst index 91b9bc5f44e..9126b165d29 100644 --- a/doc/theming/best-practices.rst +++ b/doc/theming/best-practices.rst @@ -50,42 +50,12 @@ All user-visible strings should be internationalized, see :doc:`/contributing/string-i18n`. ------------------------------------------------------------------ -Helper function names should begin with the name of the extension ------------------------------------------------------------------ +------------------ +Avoid name clashes +------------------ -Namespacing helper functions in this way avoids accidentally overriding, or -being overriden by, a core helper function, or a helper function from another -extension. For example: +See :ref:`avoid name clashes`. -.. literalinclude:: /../ckanext/example_theme/v08_custom_helper_function/plugin.py - :pyobject: ExampleThemePlugin.get_helpers - - -.. _snippet filenames best practice: - -------------------------------------------------------------- -Snippet filenames should begin with the name of the extension -------------------------------------------------------------- - -Namespacing snippets in this way avoids accidentally overriding, or being -overridden by, a core snippet, or a snippet from another extension. -For example:: - - snippets/example_theme_most_popular_groups.html - - -.. _javascript module names best practice: - ----------------------------------------------------------------------- -|javascript| modules names should begin with the name of the extension ----------------------------------------------------------------------- - -Namespacing |javascript| modules in this way avoids accidentally overriding, or -being overridden by, a core module, or a module from another extension. For -example: ``fanstatic/example_theme_popover.js``: - -.. literalinclude:: /../ckanext/example_theme/v16_initialize_a_javascript_module/fanstatic/example_theme_popover.js .. _javascript module docstrings best practice: diff --git a/doc/theming/javascript.rst b/doc/theming/javascript.rst index a1feaef4c0f..15ed997ea19 100644 --- a/doc/theming/javascript.rst +++ b/doc/theming/javascript.rst @@ -88,8 +88,7 @@ To get CKAN to call some custom JavaScript code, we need to: .. note:: |javascript| module names should begin with the name of the extension, - to avoid conflicting with other modules. - See :ref:`javascript module names best practice`. + to avoid conflicting with other modules. See :ref:`avoid name clashes`. .. note:: diff --git a/doc/theming/templates.rst b/doc/theming/templates.rst index 91ef55e1ab7..479fc1124c0 100644 --- a/doc/theming/templates.rst +++ b/doc/theming/templates.rst @@ -657,8 +657,7 @@ should see the most popular groups rendered differently: To avoid unintended conflicts, we recommend that snippet filenames begin with the name of the extension they belong to, e.g. - ``snippets/example_theme_*.html``. - See :ref:`snippet filenames best practice`. + ``snippets/example_theme_*.html``. See :ref:`avoid name clashes`. .. note:: @@ -754,8 +753,7 @@ the most popular groups on the front page. First, add a new helper function to Names of config settings provided by extensions should include the name of the extension, to avoid conflicting with core config settings or with - config settings from other extensions. - See :ref:`extension config setting names best practice`. + config settings from other extensions. See :ref:`avoid name clashes`. Now we can call this helper function from our ``layout1.html`` template: diff --git a/requirements.in b/requirements.in index b3ff4ca7fce..fed7a94fda3 100644 --- a/requirements.in +++ b/requirements.in @@ -22,6 +22,7 @@ repoze.who-friendlyform==1.0.8 repoze.who==2.3 requests==2.10.0 Routes==1.13 +rq==0.6.0 sqlalchemy-migrate==0.10.0 SQLAlchemy==0.9.6 sqlparse==0.1.19 diff --git a/requirements.txt b/requirements.txt index 7e12c29998a..9810f299ab4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,61 +1,64 @@ # # This file is autogenerated by pip-compile -# Make changes in requirements.in, then run this to update: +# To update, run: # -# pip-compile requirements.in +# pip-compile --output-file requirements.txt requirements.in # argparse==1.4.0 # via ofs Babel==2.3.4 -Beaker==1.8.0 +Beaker==1.8.0 # via pylons bleach==1.4.3 +click==6.6 # via rq decorator==4.0.6 # via pylons, sqlalchemy-migrate fanstatic==0.12 Flask==0.10.1 -FormEncode==1.3.0 +FormEncode==1.3.0 # via pylons +funcsigs==1.0.2 # via beaker html5lib==0.9999999 # via bleach itsdangerous==0.24 # via flask -Jinja2==2.8 -Mako==1.0.4 +Jinja2==2.8 # via flask +Mako==1.0.4 # via pylons Markdown==2.6.6 -MarkupSafe==0.23 +MarkupSafe==0.23 # via jinja2, mako, webhelpers nose==1.3.7 # via pylons ofs==0.4.2 ordereddict==1.1 Pairtree==0.7.1-T passlib==1.6.5 paste==1.7.5.1 -PasteDeploy==1.5.2 -PasteScript==2.0.2 -pbr==0.11.1 # via sqlalchemy-migrate +PasteDeploy==1.5.2 # via pastescript, pylons +PasteScript==2.0.2 # via pylons +pbr==1.10.0 # via sqlalchemy-migrate psycopg2==2.4.5 -pysolr==3.5.0 -Pygments==2.1.3 +Pygments==2.1.3 # via weberror Pylons==0.9.7 +pysolr==3.5.0 python-dateutil==1.5 pytz==2016.4 pyutilib.component.core==4.6.4 +redis==2.10.5 # via rq repoze.lru==0.6 # via routes repoze.who-friendlyform==1.0.8 repoze.who==2.3 requests==2.10.0 -Routes==1.13 +Routes==1.13 # via pylons +rq==0.6.0 simplejson==3.8.2 # via pylons -six==1.10.0 # via bleach, html5lib, pastescript, sqlalchemy-migrate +six==1.10.0 # via bleach, html5lib, pastescript, pyutilib.component.core, sqlalchemy-migrate sqlalchemy-migrate==0.10.0 -SQLAlchemy==0.9.6 +SQLAlchemy==0.9.6 # via sqlalchemy-migrate sqlparse==0.1.19 -Tempita==0.5.2 +Tempita==0.5.2 # via pylons, sqlalchemy-migrate, weberror tzlocal==1.2.2 unicodecsv==0.14.1 vdm==0.13 -WebError==0.13.1 -WebHelpers==1.3 -WebOb==1.0.8 -WebTest==1.4.3 -Werkzeug==0.11.10 +WebError==0.13.1 # via pylons +WebHelpers==1.3 # via pylons +WebOb==1.0.8 # via fanstatic, pylons, repoze.who, repoze.who-friendlyform, weberror, webtest +WebTest==1.4.3 # via pylons +Werkzeug==0.11.10 # via flask zope.interface==4.2.0 # The following packages are commented out because they are # considered to be unsafe in a requirements file: -# pip # via pbr # setuptools # via repoze.who, zope.interface diff --git a/setup.py b/setup.py index a07f9aff729..c633d2ea217 100644 --- a/setup.py +++ b/setup.py @@ -49,6 +49,7 @@ 'front-end-build = ckan.lib.cli:FrontEndBuildCommand', 'views = ckan.lib.cli:ViewsCommand', 'config-tool = ckan.lib.cli:ConfigToolCommand', + 'jobs = ckan.lib.cli:JobsCommand', ], 'console_scripts': [ 'ckan-admin = bin.ckan_admin:Command', @@ -142,6 +143,7 @@ 'example_iconfigurer_v1 = ckanext.example_iconfigurer.plugin_v1:ExampleIConfigurerPlugin', 'example_iconfigurer_v2 = ckanext.example_iconfigurer.plugin_v2:ExampleIConfigurerPlugin', 'example_iuploader = ckanext.example_iuploader.plugin:ExampleIUploader', + 'example_ipermissionlabels = ckanext.example_ipermissionlabels.plugin:ExampleIPermissionLabelsPlugin', ], 'ckan.system_plugins': [ 'domain_object_mods = ckan.model.modification:DomainObjectModificationExtension', diff --git a/test-core.ini b/test-core.ini index 5687a2f0867..97426d8f17d 100644 --- a/test-core.ini +++ b/test-core.ini @@ -31,6 +31,9 @@ ckan.datapusher.formats = csv xls xlsx tsv application/csv application/vnd.ms-ex ## Solr support solr_url = http://127.0.0.1:8983/solr/ckan +# Redis URL. Use a separate Redis database for testing. +ckan.redis.url = redis://localhost:6379/1 + ckan.auth.user_create_organizations = true ckan.auth.user_create_groups = true ckan.auth.create_user_via_api = false