From 4ad7ac00af18f69a4ef27dc7fd6ba18627033ff1 Mon Sep 17 00:00:00 2001 From: amercader Date: Thu, 2 Aug 2012 12:06:51 +0100 Subject: [PATCH] [#2788] Commit only once at the end of a search index rebuild Commiting changes in the Solr search index is a heavy task that takes significant time. We are currently commiting changes after each update, which is probably fine for individual updated (ie users editing or creating a dataset), but when rebuilding the whole index it is unnecessary. A single commit at the end of the process is needed, and that speeds the reindexing about a 85%. A flag has been added (`-e` or `--commit-each`) to allow the old behaviour (commiting after each edit). --- ckan/lib/cli.py | 21 ++++++++++++++++++--- ckan/lib/search/__init__.py | 12 +++++++++--- ckan/lib/search/index.py | 23 ++++++++++++++++++----- ckan/tests/lib/test_cli.py | 4 ++-- 4 files changed, 47 insertions(+), 13 deletions(-) diff --git a/ckan/lib/cli.py b/ckan/lib/cli.py index baaa4674ba9..11272281628 100644 --- a/ckan/lib/cli.py +++ b/ckan/lib/cli.py @@ -277,7 +277,7 @@ class SearchIndexCommand(CkanCommand): '''Creates a search index for all datasets Usage: - search-index [-i] [-o] [-r] rebuild [dataset-name] - reindex dataset-name if given, if not then rebuild full search index (all datasets) + search-index [-i] [-o] [-r] [-e] rebuild [dataset-name] - reindex dataset-name if given, if not then rebuild full search index (all datasets) search-index check - checks for datasets not indexed search-index show {dataset-name} - shows index of a dataset search-index clear [dataset-name] - clears the search index for the provided dataset or for the whole ckan instance @@ -301,6 +301,13 @@ def __init__(self,name): self.parser.add_option('-r', '--refresh', dest='refresh', action='store_true', default=False, help='Refresh current index (does not clear the existing one)') + self.parser.add_option('-e', '--commit-each', dest='commit_each', + action='store_true', default=False, help= +'''Perform a commit after indexing each dataset. This ensures that changes are +immediately available on the search, but slows significantly the process. +Default is false.''' + ) + def command(self): self._load_config() @@ -322,14 +329,22 @@ def command(self): print 'Command %s not recognized' % cmd def rebuild(self): - from ckan.lib.search import rebuild + from ckan.lib.search import rebuild, commit + + # BY default we don't commit after each request to Solr, as it is + # a really heavy operation and slows things a lot if len(self.args) > 1: rebuild(self.args[1]) else: rebuild(only_missing=self.options.only_missing, force=self.options.force, - refresh=self.options.refresh) + refresh=self.options.refresh, + defer_commit=(not self.options.commit_each)) + + if not self.options.commit_each: + commit() + def check(self): from ckan.lib.search import check diff --git a/ckan/lib/search/__init__.py b/ckan/lib/search/__init__.py index f003efd9350..dfb16e301f8 100644 --- a/ckan/lib/search/__init__.py +++ b/ckan/lib/search/__init__.py @@ -131,7 +131,7 @@ def notify(self, entity, operation): log.warn("Discarded Sync. indexing for: %s" % entity) -def rebuild(package_id=None, only_missing=False, force=False, refresh=False): +def rebuild(package_id=None, only_missing=False, force=False, refresh=False, defer_commit=False): ''' Rebuilds the search index. @@ -175,12 +175,13 @@ def rebuild(package_id=None, only_missing=False, force=False, refresh=False): for pkg_id in package_ids: try: - package_index.insert_dict( + package_index.update_dict( get_action('package_show')( {'model': model, 'ignore_auth': True, 'validate': False}, {'id': pkg_id} - ) + ), + defer_commit ) except Exception, e: log.error('Error while indexing dataset %s: %s' % @@ -195,6 +196,11 @@ def rebuild(package_id=None, only_missing=False, force=False, refresh=False): log.info('Finished rebuilding search index.') +def commit(): + package_index = index_for(model.Package) + package_index.commit() + log.info('Commited pending changes on the search index') + def check(): from ckan import model package_query = query_for(model.Package) diff --git a/ckan/lib/search/index.py b/ckan/lib/search/index.py index 1d912eef1c7..dee7e27c7e8 100644 --- a/ckan/lib/search/index.py +++ b/ckan/lib/search/index.py @@ -92,10 +92,10 @@ class PackageSearchIndex(SearchIndex): def remove_dict(self, pkg_dict): self.delete_package(pkg_dict) - def update_dict(self, pkg_dict): - self.index_package(pkg_dict) + def update_dict(self, pkg_dict, defer_commit=False): + self.index_package(pkg_dict, defer_commit) - def index_package(self, pkg_dict): + def index_package(self, pkg_dict, defer_commit=False): if pkg_dict is None: return pkg_dict['data_dict'] = json.dumps(pkg_dict) @@ -222,14 +222,27 @@ def index_package(self, pkg_dict): # send to solr: try: conn = make_connection() - conn.add_many([pkg_dict], _commit=True) + commit = not defer_commit + conn.add_many([pkg_dict], _commit=commit) + except Exception, e: + log.exception(e) + raise SearchIndexError(e) + finally: + conn.close() + + commit_debug_msg = 'Not commited yet' if defer_commit else 'Commited' + log.debug('Updated index for %s [%s]' % (pkg_dict.get('name'), commit_debug_msg)) + + def commit(self): + try: + conn = make_connection() + conn.commit(wait_flush=False, wait_searcher=False) except Exception, e: log.exception(e) raise SearchIndexError(e) finally: conn.close() - log.debug("Updated index for %s" % pkg_dict.get('name')) def delete_package(self, pkg_dict): conn = make_connection() diff --git a/ckan/tests/lib/test_cli.py b/ckan/tests/lib/test_cli.py index 7bea16bb484..5aee50ace1a 100644 --- a/ckan/tests/lib/test_cli.py +++ b/ckan/tests/lib/test_cli.py @@ -80,7 +80,7 @@ def test_clear_and_rebuild_index(self): # Rebuild index self.search.args = () - self.search.options = FakeOptions(only_missing=False,force=False,refresh=False) + self.search.options = FakeOptions(only_missing=False,force=False,refresh=False,commit_each=False) self.search.rebuild() pkg_count = model.Session.query(model.Package).filter(model.Package.state==u'active').count() @@ -103,7 +103,7 @@ def test_clear_and_rebuild_only_one(self): # Rebuild index for annakarenina self.search.args = ('rebuild annakarenina').split() - self.search.options = FakeOptions(only_missing=False,force=False,refresh=False) + self.search.options = FakeOptions(only_missing=False,force=False,refresh=False,commit_each=False) self.search.rebuild() self.query.run({'q':'*:*'})