From ee0348798dfe4eb788dcb0153120c61dc552f9f9 Mon Sep 17 00:00:00 2001 From: Konstantin Sivakov Date: Tue, 25 Sep 2018 11:29:27 +0200 Subject: [PATCH] Adding rebuild fast, revert the workaround for the translator object --- ckan/cli/search_index.py | 53 ++++++++++++++++++++++++++++++++++++---- ckan/common.py | 7 +++--- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/ckan/cli/search_index.py b/ckan/cli/search_index.py index cfa26ca9b6a..12f1ea64307 100644 --- a/ckan/cli/search_index.py +++ b/ckan/cli/search_index.py @@ -3,6 +3,7 @@ import os import click +import sqlalchemy as sa from flask import Flask, current_app from werkzeug.serving import run_simple @@ -33,13 +34,15 @@ def search_index(): @click.pass_context def rebuild(ctx, verbose, force, refresh, only_missing, quiet, commit_each): u''' Rebuild search index ''' + flask_app = ctx.obj.app.apps['flask_app']._wsgi_app from ckan.lib.search import rebuild, commit try: - rebuild(only_missing=only_missing, - force=force, - refresh=refresh, - defer_commit=(not commit_each), - quiet=quiet) + with flask_app.test_request_context(): + rebuild(only_missing=only_missing, + force=force, + refresh=refresh, + defer_commit=(not commit_each), + quiet=quiet) except Exception as e: click.echo(e, err=True) if not commit_each: @@ -73,3 +76,43 @@ def clear(dataset_name): clear(dataset_name) else: clear_all() + + +@search_index.command(name=u'rebuild-fast', + short_help=u'Reindex with multiprocessing') +@click.help_option(u'-h', u'--help') +@click.pass_context +def rebuild_fast(ctx): + conf = ctx.obj.config + db_url = conf['sqlalchemy.url'] + engine = sa.create_engine(db_url) + package_ids = [] + result = engine.execute(u"select id from package where state = 'active';") + for row in result: + package_ids.append(row[0]) + + def start(ids): + ## load actual enviroment for each subprocess, so each have thier own + ## sa session + self._load_config() + from ckan.lib.search import rebuild, commit + rebuild(package_ids=ids) + commit() + + def chunks(l, n): + u""" Yield n successive chunks from l. + u""" + newn = int(len(l) / n) + for i in xrange(0, n-1): + yield l[i*newn:i*newn+newn] + yield l[n*newn-newn:] + + processes = [] + for chunk in chunks(package_ids, mp.cpu_count()): + process = mp.Process(target=start, args=(chunk,)) + processes.append(process) + process.daemon = True + process.start() + + for process in processes: + process.join() diff --git a/ckan/common.py b/ckan/common.py index 5bf5163e44a..4dd1a0854e1 100644 --- a/ckan/common.py +++ b/ckan/common.py @@ -68,9 +68,10 @@ def streaming_response( def ugettext(*args, **kwargs): - # this is on purpose, as we have a problem to check - # for pylons or flask request - return flask_ugettext(*args, **kwargs) + if is_flask_request(): + return flask_ugettext(*args, **kwargs) + else: + return pylons_ugettext(*args, **kwargs) _ = ugettext