Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up search index, using multiple cores. #700

Merged
merged 4 commits into from
Jun 13, 2013
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 53 additions & 4 deletions ckan/lib/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import csv
import multiprocessing as mp
import os
import datetime
import sys
Expand All @@ -8,6 +9,7 @@
import ckan.include.rjsmin as rjsmin
import ckan.include.rcssmin as rcssmin
import ckan.lib.fanstatic_resources as fanstatic_resources
import sqlalchemy as sa

import paste.script
from paste.registry import Registry
Expand Down Expand Up @@ -69,7 +71,7 @@ class CkanCommand(paste.script.command.Command):
default_verbosity = 1
group_name = 'ckan'

def _load_config(self):
def _get_config(self):
from paste.deploy import appconfig
if not self.options.config:
msg = 'No config file supplied'
Expand All @@ -78,7 +80,10 @@ def _load_config(self):
if not os.path.exists(self.filename):
raise AssertionError('Config filename %r does not exist.' % self.filename)
fileConfig(self.filename)
conf = appconfig('config:' + self.filename)
return appconfig('config:' + self.filename)

def _load_config(self):
conf = self._get_config()
assert 'ckan' not in dir() # otherwise loggers would be disabled
# We have now loaded the config. Now we can import ckan for the
# first time.
Expand Down Expand Up @@ -308,11 +313,13 @@ def version(self):
print Session.execute('select version from migrate_version;').fetchall()



class SearchIndexCommand(CkanCommand):
'''Creates a search index for all datasets

Usage:
search-index [-i] [-o] [-r] [-e] rebuild [dataset-name] - reindex dataset-name if given, if not then rebuild full search index (all datasets)
search-index rebuild_fast - reindex using multiprocessing using all cores. This acts in the same way as rubuild -r [EXPERIMENTAL]
search-index check - checks for datasets not indexed
search-index show {dataset-name} - shows index of a dataset
search-index clear [dataset-name] - clears the search index for the provided dataset or for the whole ckan instance
Expand Down Expand Up @@ -344,14 +351,18 @@ def __init__(self,name):
)

def command(self):
self._load_config()

if not self.args:
# default to printing help
print self.usage
return

cmd = self.args[0]
# Do not run load_config yet
if cmd == 'rebuild_fast':
self.rebuild_fast()
return

self._load_config()
if cmd == 'rebuild':
self.rebuild()
elif cmd == 'check':
Expand Down Expand Up @@ -400,6 +411,44 @@ def clear(self):
package_id =self.args[1] if len(self.args) > 1 else None
clear(package_id)

def rebuild_fast(self):
### Get out config but without starting pylons environment ####
conf = self._get_config()

### Get ids using own engine, otherwise multiprocess will balk
db_url = conf['sqlalchemy.url']
engine = sa.create_engine(db_url)
package_ids = []
result = engine.execute("select id from package where state = 'active';")
for row in result:
package_ids.append(row[0])

def start(ids):
## load actual enviroment for each subprocess, so each have thier own
## sa session
self._load_config()
from ckan.lib.search import rebuild, commit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from ckan.lib import search

rebuild(package_ids=ids)
commit()

def chunks(l, n):
""" Yield n successive chunks from l.
"""
newn = int(len(l) / n)
for i in xrange(0, n-1):
yield l[i*newn:i*newn+newn]
yield l[n*newn-newn:]

processes = []
for chunk in chunks(package_ids, mp.cpu_count()):
process = mp.Process(target=start, args=(chunk,))
processes.append(process)
process.daemon = True
process.start()

for process in processes:
process.join()

class Notification(CkanCommand):
'''Send out modification notifications.

Expand Down
9 changes: 8 additions & 1 deletion ckan/lib/search/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def notify(self, entity, operation):
log.warn("Discarded Sync. indexing for: %s" % entity)


def rebuild(package_id=None, only_missing=False, force=False, refresh=False, defer_commit=False):
def rebuild(package_id=None, only_missing=False, force=False, refresh=False, defer_commit=False, package_ids=None):
'''
Rebuilds the search index.

Expand All @@ -155,6 +155,13 @@ def rebuild(package_id=None, only_missing=False, force=False, refresh=False, def
log.info('Indexing just package %r...', pkg_dict['name'])
package_index.remove_dict(pkg_dict)
package_index.insert_dict(pkg_dict)
elif package_ids:
for package_id in package_ids:
pkg_dict = logic.get_action('package_show')(
{'model': model, 'ignore_auth': True, 'validate': False},
{'id': package_id})
log.info('Indexing just package %r...', pkg_dict['name'])
package_index.update_dict(pkg_dict, True)
else:
package_ids = [r[0] for r in model.Session.query(model.Package.id).
filter(model.Package.state == 'active').all()]
Expand Down