diff --git a/ckan/lib/cli.py b/ckan/lib/cli.py index 1fc8de336df..cb6731eb5b7 100644 --- a/ckan/lib/cli.py +++ b/ckan/lib/cli.py @@ -1,5 +1,6 @@ import collections import csv +import multiprocessing as mp import os import datetime import sys @@ -8,6 +9,7 @@ import ckan.include.rjsmin as rjsmin import ckan.include.rcssmin as rcssmin import ckan.lib.fanstatic_resources as fanstatic_resources +import sqlalchemy as sa import paste.script from paste.registry import Registry @@ -69,7 +71,7 @@ class CkanCommand(paste.script.command.Command): default_verbosity = 1 group_name = 'ckan' - def _load_config(self): + def _get_config(self): from paste.deploy import appconfig if not self.options.config: msg = 'No config file supplied' @@ -78,7 +80,10 @@ def _load_config(self): if not os.path.exists(self.filename): raise AssertionError('Config filename %r does not exist.' % self.filename) fileConfig(self.filename) - conf = appconfig('config:' + self.filename) + return appconfig('config:' + self.filename) + + def _load_config(self): + conf = self._get_config() assert 'ckan' not in dir() # otherwise loggers would be disabled # We have now loaded the config. Now we can import ckan for the # first time. @@ -308,11 +313,13 @@ def version(self): print Session.execute('select version from migrate_version;').fetchall() + class SearchIndexCommand(CkanCommand): '''Creates a search index for all datasets Usage: search-index [-i] [-o] [-r] [-e] rebuild [dataset-name] - reindex dataset-name if given, if not then rebuild full search index (all datasets) + search-index rebuild_fast - reindex using multiprocessing using all cores. This acts in the same way as rubuild -r [EXPERIMENTAL] search-index check - checks for datasets not indexed search-index show {dataset-name} - shows index of a dataset search-index clear [dataset-name] - clears the search index for the provided dataset or for the whole ckan instance @@ -344,14 +351,18 @@ def __init__(self,name): ) def command(self): - self._load_config() - if not self.args: # default to printing help print self.usage return cmd = self.args[0] + # Do not run load_config yet + if cmd == 'rebuild_fast': + self.rebuild_fast() + return + + self._load_config() if cmd == 'rebuild': self.rebuild() elif cmd == 'check': @@ -400,6 +411,44 @@ def clear(self): package_id =self.args[1] if len(self.args) > 1 else None clear(package_id) + def rebuild_fast(self): + ### Get out config but without starting pylons environment #### + conf = self._get_config() + + ### Get ids using own engine, otherwise multiprocess will balk + db_url = conf['sqlalchemy.url'] + engine = sa.create_engine(db_url) + package_ids = [] + result = engine.execute("select id from package where state = 'active';") + for row in result: + package_ids.append(row[0]) + + def start(ids): + ## load actual enviroment for each subprocess, so each have thier own + ## sa session + self._load_config() + from ckan.lib.search import rebuild, commit + rebuild(package_ids=ids) + commit() + + def chunks(l, n): + """ Yield n successive chunks from l. + """ + newn = int(len(l) / n) + for i in xrange(0, n-1): + yield l[i*newn:i*newn+newn] + yield l[n*newn-newn:] + + for chunk in chunks(package_ids, mp.cpu_count()): + processes = [] + process = mp.Process(target=start, args=(chunk,)) + processes.append(process) + process.daemon = True + process.start() + + for process in processes: + process.join() + class Notification(CkanCommand): '''Send out modification notifications. diff --git a/ckan/lib/search/__init__.py b/ckan/lib/search/__init__.py index 4c2f473ce8e..d7afc157e3a 100644 --- a/ckan/lib/search/__init__.py +++ b/ckan/lib/search/__init__.py @@ -134,7 +134,7 @@ def notify(self, entity, operation): log.warn("Discarded Sync. indexing for: %s" % entity) -def rebuild(package_id=None, only_missing=False, force=False, refresh=False, defer_commit=False): +def rebuild(package_id=None, only_missing=False, force=False, refresh=False, defer_commit=False, package_ids=None): ''' Rebuilds the search index. @@ -155,6 +155,13 @@ def rebuild(package_id=None, only_missing=False, force=False, refresh=False, def log.info('Indexing just package %r...', pkg_dict['name']) package_index.remove_dict(pkg_dict) package_index.insert_dict(pkg_dict) + elif package_ids: + for package_id in package_ids: + pkg_dict = logic.get_action('package_show')( + {'model': model, 'ignore_auth': True, 'validate': False}, + {'id': package_id}) + log.info('Indexing just package %r...', pkg_dict['name']) + package_index.update_dict(pkg_dict, True) else: package_ids = [r[0] for r in model.Session.query(model.Package.id). filter(model.Package.state == 'active').all()]