From 551e46ca423b78309d4f753f195e2da67a191a8c Mon Sep 17 00:00:00 2001 From: Tyler Kennedy Date: Thu, 16 Jun 2016 10:12:14 -0400 Subject: [PATCH] Started removing existing search. Start of SolrIndexer, NoIndexer, PostgresIndexer. --- ckan/config/environment.py | 10 - ckan/config/solr/README.txt | 12 - ckan/config/solr/schema-1.2.xml | 170 ------------- ckan/config/solr/schema-1.3.xml | 179 -------------- ckan/config/solr/schema-1.4.xml | 190 --------------- ckan/config/solr/schema-2.0.xml | 173 -------------- ckan/config/solr/schema.xml | 187 --------------- ckan/controllers/home.py | 71 +++--- ckan/controllers/package.py | 171 ++++++------- ckan/lib/dictization/model_dictize.py | 1 + ckan/lib/search/__init__.py | 329 ++------------------------ ckan/lib/search/common.py | 85 ------- ckan/logic/action/get.py | 246 ++++--------------- ckan/model/modification.py | 12 - ckan/plugins/core.py | 7 - ckan/plugins/interfaces.py | 36 +++ ckanext/search/__init__.py | 48 ++++ ckanext/search/solr.py | 105 ++++++++ setup.py | 10 +- 19 files changed, 373 insertions(+), 1669 deletions(-) delete mode 100644 ckan/config/solr/README.txt delete mode 100644 ckan/config/solr/schema-1.2.xml delete mode 100644 ckan/config/solr/schema-1.3.xml delete mode 100644 ckan/config/solr/schema-1.4.xml delete mode 100644 ckan/config/solr/schema-2.0.xml delete mode 100644 ckan/config/solr/schema.xml create mode 100644 ckanext/search/__init__.py create mode 100644 ckanext/search/solr.py diff --git a/ckan/config/environment.py b/ckan/config/environment.py index 131a794f18c..6351b4a022e 100644 --- a/ckan/config/environment.py +++ b/ckan/config/environment.py @@ -15,7 +15,6 @@ import ckan.lib.helpers as helpers import ckan.lib.app_globals as app_globals import ckan.lib.render as render -import ckan.lib.search as search import ckan.logic as logic import ckan.authz as authz import ckan.lib.jinja_extensions as jinja_extensions @@ -168,15 +167,6 @@ def update_config(): favicon = config.get('ckan.favicon', '/base/images/ckan.ico') config['ckan.favicon'] = favicon - # Init SOLR settings and check if the schema is compatible - # from ckan.lib.search import SolrSettings, check_solr_schema_version - - # lib.search is imported here as we need the config enabled and parsed - search.SolrSettings.init(config.get('solr_url'), - config.get('solr_user'), - config.get('solr_password')) - search.check_solr_schema_version() - routes_map = routing.make_map() config['routes.map'] = routes_map # The RoutesMiddleware needs its mapper updating if it exists diff --git a/ckan/config/solr/README.txt b/ckan/config/solr/README.txt deleted file mode 100644 index d8fcff40f42..00000000000 --- a/ckan/config/solr/README.txt +++ /dev/null @@ -1,12 +0,0 @@ -CKAN Solr schema -================ - -This folder contains the Solr schema file used by CKAN (schema.xml). - -Starting from 2.2 this is the only file that should be used by users and -modified by devs. The rest of files (schema-{version}.xml) are kept for -backwards compatibility purposes and should not be used, as they might be -removed in future versions. - -When upgrading CKAN, always check the CHANGELOG on each release to see if -you need to update the schema file and reindex your datasets. diff --git a/ckan/config/solr/schema-1.2.xml b/ckan/config/solr/schema-1.2.xml deleted file mode 100644 index 4f9b11a580b..00000000000 --- a/ckan/config/solr/schema-1.2.xml +++ /dev/null @@ -1,170 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -id -text - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/config/solr/schema-1.3.xml b/ckan/config/solr/schema-1.3.xml deleted file mode 100644 index 89fea815356..00000000000 --- a/ckan/config/solr/schema-1.3.xml +++ /dev/null @@ -1,179 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -index_id -text - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/config/solr/schema-1.4.xml b/ckan/config/solr/schema-1.4.xml deleted file mode 100644 index 98cbf378dbe..00000000000 --- a/ckan/config/solr/schema-1.4.xml +++ /dev/null @@ -1,190 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -index_id -text - - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/config/solr/schema-2.0.xml b/ckan/config/solr/schema-2.0.xml deleted file mode 100644 index b306bb5d8c7..00000000000 --- a/ckan/config/solr/schema-2.0.xml +++ /dev/null @@ -1,173 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -index_id -text - - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/config/solr/schema.xml b/ckan/config/solr/schema.xml deleted file mode 100644 index e8893f70ff9..00000000000 --- a/ckan/config/solr/schema.xml +++ /dev/null @@ -1,187 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -index_id -text - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/ckan/controllers/home.py b/ckan/controllers/home.py index 3f76427c36c..5437c963e4e 100644 --- a/ckan/controllers/home.py +++ b/ckan/controllers/home.py @@ -1,9 +1,10 @@ -from pylons import config, cache +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from pylons import cache import sqlalchemy.exc import ckan.logic as logic import ckan.lib.maintain as maintain -import ckan.lib.search as search import ckan.lib.base as base import ckan.model as model import ckan.lib.helpers as h @@ -38,41 +39,37 @@ def __before__(self, action, **env): raise def index(self): - try: - # package search - context = {'model': model, 'session': model.Session, - 'user': c.user, 'auth_user_obj': c.userobj} - data_dict = { - 'q': '*:*', - 'facet.field': g.facets, - 'rows': 4, - 'start': 0, - 'sort': 'views_recent desc', - 'fq': 'capacity:"public"' - } - query = logic.get_action('package_search')( - context, data_dict) - c.search_facets = query['search_facets'] - c.package_count = query['count'] - c.datasets = query['results'] - - c.facets = query['facets'] - maintain.deprecate_context_item( - 'facets', - 'Use `c.search_facets` instead.') - - c.search_facets = query['search_facets'] - - c.facet_titles = { - 'organization': _('Organizations'), - 'groups': _('Groups'), - 'tags': _('Tags'), - 'res_format': _('Formats'), - 'license': _('Licenses'), - } - - except search.SearchError: - c.package_count = 0 + # package search + context = {'model': model, 'session': model.Session, + 'user': c.user, 'auth_user_obj': c.userobj} + data_dict = { + 'q': '*:*', + 'facet.field': g.facets, + 'rows': 4, + 'start': 0, + 'sort': 'views_recent desc', + 'fq': 'capacity:"public"' + } + query = logic.get_action('package_search')( + context, data_dict) + c.search_facets = query['search_facets'] + c.package_count = query['count'] + c.datasets = query['results'] + + c.facets = query['facets'] + maintain.deprecate_context_item( + 'facets', + 'Use `c.search_facets` instead.') + + c.search_facets = query['search_facets'] + + c.facet_titles = { + 'organization': _('Organizations'), + 'groups': _('Groups'), + 'tags': _('Tags'), + 'res_format': _('Formats'), + 'license': _('Licenses'), + } if c.userobj and not c.userobj.email: url = h.url_for(controller='user', action='edit') diff --git a/ckan/controllers/package.py b/ckan/controllers/package.py index 0064500eb35..dd8846a3d74 100644 --- a/ckan/controllers/package.py +++ b/ckan/controllers/package.py @@ -130,13 +130,14 @@ def _guess_package_type(self, expecting_name=False): return pt def search(self): - from ckan.lib.search import SearchError, SearchQueryError - package_type = self._guess_package_type() try: - context = {'model': model, 'user': c.user, - 'auth_user_obj': c.userobj} + context = { + 'model': model, + 'user': c.user, + 'auth_user_obj': c.userobj + } check_access('site_read', context) except NotAuthorized: abort(403, _('Not authorized to see this page')) @@ -199,101 +200,85 @@ def pager_url(q=None, page=None): c.search_url_params = urlencode(_encode_params(params_nopage)) - try: - c.fields = [] - # c.fields_grouped will contain a dict of params containing - # a list of values eg {'tags':['tag1', 'tag2']} - c.fields_grouped = {} - search_extras = {} - fq = '' - for (param, value) in request.params.items(): - if param not in ['q', 'page', 'sort'] \ - and len(value) and not param.startswith('_'): - if not param.startswith('ext_'): - c.fields.append((param, value)) - fq += ' %s:"%s"' % (param, value) - if param not in c.fields_grouped: - c.fields_grouped[param] = [value] - else: - c.fields_grouped[param].append(value) + c.fields = [] + # c.fields_grouped will contain a dict of params containing + # a list of values eg {'tags':['tag1', 'tag2']} + c.fields_grouped = {} + search_extras = {} + fq = '' + for (param, value) in request.params.items(): + if param not in ['q', 'page', 'sort'] \ + and len(value) and not param.startswith('_'): + if not param.startswith('ext_'): + c.fields.append((param, value)) + fq += ' %s:"%s"' % (param, value) + if param not in c.fields_grouped: + c.fields_grouped[param] = [value] else: - search_extras[param] = value + c.fields_grouped[param].append(value) + else: + search_extras[param] = value - context = {'model': model, 'session': model.Session, - 'user': c.user, 'for_view': True, - 'auth_user_obj': c.userobj} + context = {'model': model, 'session': model.Session, + 'user': c.user, 'for_view': True, + 'auth_user_obj': c.userobj} - if package_type and package_type != 'dataset': - # Only show datasets of this particular type - fq += ' +dataset_type:{type}'.format(type=package_type) - else: - # Unless changed via config options, don't show non standard - # dataset types on the default search page - if not asbool( - config.get('ckan.search.show_all_types', 'False')): - fq += ' +dataset_type:dataset' - - facets = OrderedDict() - - default_facet_titles = { - 'organization': _('Organizations'), - 'groups': _('Groups'), - 'tags': _('Tags'), - 'res_format': _('Formats'), - 'license_id': _('Licenses'), - } - - for facet in g.facets: - if facet in default_facet_titles: - facets[facet] = default_facet_titles[facet] - else: - facets[facet] = facet - - # Facet titles - for plugin in p.PluginImplementations(p.IFacets): - facets = plugin.dataset_facets(facets, package_type) - - c.facet_titles = facets - - data_dict = { - 'q': q, - 'fq': fq.strip(), - 'facet.field': facets.keys(), - 'rows': limit, - 'start': (page - 1) * limit, - 'sort': sort_by, - 'extras': search_extras + if package_type and package_type != 'dataset': + # Only show datasets of this particular type + fq += ' +dataset_type:{type}'.format(type=package_type) + else: + # Unless changed via config options, don't show non standard + # dataset types on the default search page + if not asbool( + config.get('ckan.search.show_all_types', 'False')): + fq += ' +dataset_type:dataset' + + facets = OrderedDict() + + default_facet_titles = { + 'organization': _('Organizations'), + 'groups': _('Groups'), + 'tags': _('Tags'), + 'res_format': _('Formats'), + 'license_id': _('Licenses'), } - query = get_action('package_search')(context, data_dict) - c.sort_by_selected = query['sort'] + for facet in g.facets: + if facet in default_facet_titles: + facets[facet] = default_facet_titles[facet] + else: + facets[facet] = facet + + # Facet titles + for plugin in p.PluginImplementations(p.IFacets): + facets = plugin.dataset_facets(facets, package_type) + + c.facet_titles = facets + + data_dict = { + 'q': q, + 'fq': fq.strip(), + 'facet.field': facets.keys(), + 'rows': limit, + 'start': (page - 1) * limit, + 'sort': sort_by, + 'extras': search_extras + } + + query = get_action('package_search')(context, data_dict) + c.sort_by_selected = query['sort'] + + c.page = h.Page( + collection=query['results'], + page=page, + url=pager_url, + item_count=query['count'], + items_per_page=limit + ) + c.facets = query['facets'] + c.search_facets = query['search_facets'] + c.page.items = query['results'] - c.page = h.Page( - collection=query['results'], - page=page, - url=pager_url, - item_count=query['count'], - items_per_page=limit - ) - c.facets = query['facets'] - c.search_facets = query['search_facets'] - c.page.items = query['results'] - except SearchQueryError, se: - # User's search parameters are invalid, in such a way that is not - # achievable with the web interface, so return a proper error to - # discourage spiders which are the main cause of this. - log.info('Dataset search query rejected: %r', se.args) - abort(400, _('Invalid search query: {error_message}') - .format(error_message=str(se))) - except SearchError, se: - # May be bad input from the user, but may also be more serious like - # bad code causing a SOLR syntax error, or a problem connecting to - # SOLR - log.error('Dataset search error: %r', se.args) - c.query_error = True - c.facets = {} - c.search_facets = {} - c.page = h.Page(collection=[]) c.search_facets_limits = {} for facet in c.search_facets.keys(): try: diff --git a/ckan/lib/dictization/model_dictize.py b/ckan/lib/dictization/model_dictize.py index 47b62f77825..36aad0b2d20 100644 --- a/ckan/lib/dictization/model_dictize.py +++ b/ckan/lib/dictization/model_dictize.py @@ -328,6 +328,7 @@ def _get_members(context, group, member_type): def get_group_dataset_counts(): '''For all public groups, return their dataset counts, as a SOLR facet''' + return 0 query = search.PackageSearchQuery() q = {'q': '+capacity:public', 'fl': 'groups', 'facet.field': ['groups', 'owner_org'], diff --git a/ckan/lib/search/__init__.py b/ckan/lib/search/__init__.py index 182d2e6d1bb..61cb921d8e6 100644 --- a/ckan/lib/search/__init__.py +++ b/ckan/lib/search/__init__.py @@ -1,318 +1,23 @@ -import logging -import sys -import cgitb -import warnings -import xml.dom.minidom -import urllib2 +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from ckan import plugins -from pylons import config -from paste.deploy.converters import asbool -import ckan.model as model -import ckan.plugins as p -import ckan.logic as logic +def index(package): + for plugin in plugins.PluginImplementations(plugins.IIndexer): + return plugin.index(package) -from common import (SearchIndexError, SearchError, SearchQueryError, - make_connection, is_available, SolrSettings) -from index import PackageSearchIndex, NoopSearchIndex -from query import (TagSearchQuery, ResourceSearchQuery, PackageSearchQuery, - QueryOptions, convert_legacy_parameters_to_solr) -log = logging.getLogger(__name__) +def query(search_query, facets=None, limit=1000, sort=None): + for plugin in plugins.PluginImplementations(plugins.IIndexer): + return plugin.query( + search_query, + facets=facets, + limit=limit, + sort=sort + ) - -def text_traceback(): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - res = 'the original traceback:'.join( - cgitb.text(sys.exc_info()).split('the original traceback:')[1:] - ).strip() - return res - -SIMPLE_SEARCH = asbool(config.get('ckan.simple_search', False)) - -SUPPORTED_SCHEMA_VERSIONS = ['2.3'] - -DEFAULT_OPTIONS = { - 'limit': 20, - 'offset': 0, - # about presenting the results - 'order_by': 'rank', - 'return_objects': False, - 'ref_entity_with_attr': 'name', - 'all_fields': False, - 'search_tags': True, - 'callback': None, # simply passed through -} - -_INDICES = { - 'package': PackageSearchIndex -} - -_QUERIES = { - 'tag': TagSearchQuery, - 'resource': ResourceSearchQuery, - 'package': PackageSearchQuery -} - -SOLR_SCHEMA_FILE_OFFSET = '/admin/file/?file=schema.xml' - -if SIMPLE_SEARCH: - import sql as sql - _INDICES['package'] = NoopSearchIndex - _QUERIES['package'] = sql.PackageSearchQuery - - -def _normalize_type(_type): - if isinstance(_type, model.domain_object.DomainObject): - _type = _type.__class__ - if isinstance(_type, type): - _type = _type.__name__ - return _type.strip().lower() - - -def index_for(_type): - """ Get a SearchIndex instance sub-class suitable for - the specified type. """ - try: - _type_n = _normalize_type(_type) - return _INDICES[_type_n]() - except KeyError, ke: - log.warn("Unknown search type: %s" % _type) - return NoopSearchIndex() - - -def query_for(_type): - """ Get a SearchQuery instance sub-class suitable for the specified - type. """ - try: - _type_n = _normalize_type(_type) - return _QUERIES[_type_n]() - except KeyError, ke: - raise SearchError("Unknown search type: %s" % _type) - - -def dispatch_by_operation(entity_type, entity, operation): - """Call the appropriate index method for a given notification.""" - try: - index = index_for(entity_type) - if operation == model.domain_object.DomainObjectOperation.new: - index.insert_dict(entity) - elif operation == model.domain_object.DomainObjectOperation.changed: - index.update_dict(entity) - elif operation == model.domain_object.DomainObjectOperation.deleted: - index.remove_dict(entity) - else: - log.warn("Unknown operation: %s" % operation) - except Exception, ex: - log.exception(ex) - # we really need to know about any exceptions, so reraise - # (see #1172) - raise - - -class SynchronousSearchPlugin(p.SingletonPlugin): - """Update the search index automatically.""" - p.implements(p.IDomainObjectModification, inherit=True) - - def notify(self, entity, operation): - if not isinstance(entity, model.Package): - return - if operation != model.domain_object.DomainObjectOperation.deleted: - dispatch_by_operation( - entity.__class__.__name__, - logic.get_action('package_show')( - {'model': model, 'ignore_auth': True, 'validate': False, - 'use_cache': False}, - {'id': entity.id}), - operation - ) - elif operation == model.domain_object.DomainObjectOperation.deleted: - dispatch_by_operation(entity.__class__.__name__, - {'id': entity.id}, operation) - else: - log.warn("Discarded Sync. indexing for: %s" % entity) - - -def rebuild(package_id=None, only_missing=False, force=False, refresh=False, - defer_commit=False, package_ids=None, quiet=False): - ''' - Rebuilds the search index. - - If a dataset id is provided, only this dataset will be reindexed. - When reindexing all datasets, if only_missing is True, only the - datasets not already indexed will be processed. If force equals - True, if an exception is found, the exception will be logged, but - the process will carry on. - ''' - log.info("Rebuilding search index...") - - package_index = index_for(model.Package) - context = {'model': model, 'ignore_auth': True, 'validate': False, - 'use_cache': False} - - if package_id: - pkg_dict = logic.get_action('package_show')(context, - {'id': package_id}) - log.info('Indexing just package %r...', pkg_dict['name']) - package_index.remove_dict(pkg_dict) - package_index.insert_dict(pkg_dict) - elif package_ids: - for package_id in package_ids: - pkg_dict = logic.get_action('package_show')(context, - {'id': package_id}) - log.info('Indexing just package %r...', pkg_dict['name']) - package_index.update_dict(pkg_dict, True) - else: - package_ids = [r[0] for r in model.Session.query(model.Package.id). - filter(model.Package.state != 'deleted').all()] - if only_missing: - log.info('Indexing only missing packages...') - package_query = query_for(model.Package) - indexed_pkg_ids = set(package_query.get_all_entity_ids( - max_results=len(package_ids))) - # Packages not indexed - package_ids = set(package_ids) - indexed_pkg_ids - - if len(package_ids) == 0: - log.info('All datasets are already indexed') - return - else: - log.info('Rebuilding the whole index...') - # When refreshing, the index is not previously cleared - if not refresh: - package_index.clear() - - total_packages = len(package_ids) - for counter, pkg_id in enumerate(package_ids): - if not quiet: - sys.stdout.write( - "\rIndexing dataset {0}/{1}".format( - counter +1, total_packages) - ) - sys.stdout.flush() - try: - package_index.update_dict( - logic.get_action('package_show')(context, - {'id': pkg_id} - ), - defer_commit - ) - except Exception, e: - log.error(u'Error while indexing dataset %s: %s' % - (pkg_id, repr(e))) - if force: - log.error(text_traceback()) - continue - else: - raise - - model.Session.commit() - log.info('Finished rebuilding search index.') - - -def commit(): - package_index = index_for(model.Package) - package_index.commit() - log.info('Commited pending changes on the search index') - -def check(): - package_query = query_for(model.Package) - - log.debug("Checking packages search index...") - pkgs_q = model.Session.query(model.Package).filter_by( - state=model.State.ACTIVE) - pkgs = set([pkg.id for pkg in pkgs_q]) - indexed_pkgs = set(package_query.get_all_entity_ids(max_results=len(pkgs))) - pkgs_not_indexed = pkgs - indexed_pkgs - print 'Packages not indexed = %i out of %i' % (len(pkgs_not_indexed), - len(pkgs)) - for pkg_id in pkgs_not_indexed: - pkg = model.Session.query(model.Package).get(pkg_id) - print pkg.revision.timestamp.strftime('%Y-%m-%d'), pkg.name - - -def show(package_reference): - package_query = query_for(model.Package) - - return package_query.get_index(package_reference) - - -def clear(package_reference): - package_index = index_for(model.Package) - log.debug("Clearing search index for dataset %s..." % - package_reference) - package_index.delete_package({'id': package_reference}) - - -def clear_all(): - if not SIMPLE_SEARCH: - package_index = index_for(model.Package) - log.debug("Clearing search index...") - package_index.clear() - - -def check_solr_schema_version(schema_file=None): - ''' - Checks if the schema version of the SOLR server is compatible - with this CKAN version. - - The schema will be retrieved from the SOLR server, using the - offset defined in SOLR_SCHEMA_FILE_OFFSET - ('/admin/file/?file=schema.xml'). The schema_file parameter - allows to override this pointing to different schema file, but - it should only be used for testing purposes. - - If the CKAN instance is configured to not use SOLR or the SOLR - server is not available, the function will return False, as the - version check does not apply. If the SOLR server is available, - a SearchError exception will be thrown if the version could not - be extracted or it is not included in the supported versions list. - - :schema_file: Absolute path to an alternative schema file. Should - be only used for testing purposes (Default is None) - ''' - - - if SIMPLE_SEARCH: - # Not using the SOLR search backend - return False - - if not is_available(): - # Something is wrong with the SOLR server - log.warn('Problems were found while connecting to the SOLR server') - return False - - # Try to get the schema XML file to extract the version - if not schema_file: - solr_url, solr_user, solr_password = SolrSettings.get() - - http_auth = None - if solr_user is not None and solr_password is not None: - http_auth = solr_user + ':' + solr_password - http_auth = 'Basic ' + http_auth.encode('base64').strip() - - url = solr_url.strip('/') + SOLR_SCHEMA_FILE_OFFSET - - req = urllib2.Request(url=url) - if http_auth: - req.add_header('Authorization', http_auth) - - res = urllib2.urlopen(req) - else: - url = 'file://%s' % schema_file - res = urllib2.urlopen(url) - - tree = xml.dom.minidom.parseString(res.read()) - - version = tree.documentElement.getAttribute('version') - if not len(version): - raise SearchError('Could not extract version info from the SOLR' - ' schema, using file: \n%s' % url) - - if not version in SUPPORTED_SCHEMA_VERSIONS: - raise SearchError('SOLR schema version not supported: %s. Supported' - ' versions are [%s]' - % (version, ', '.join(SUPPORTED_SCHEMA_VERSIONS))) - return True +def reindex(cursor): + for plugin in plugins.PluginImplementations(plugins.IIndexer): + return plugin.reindex(cursor) diff --git a/ckan/lib/search/common.py b/ckan/lib/search/common.py index 98a0dba141a..e69de29bb2d 100644 --- a/ckan/lib/search/common.py +++ b/ckan/lib/search/common.py @@ -1,85 +0,0 @@ -import datetime -import logging -import re -import pysolr -import simplejson -log = logging.getLogger(__name__) - - -class SearchIndexError(Exception): - pass - - -class SearchError(Exception): - pass - - -class SearchQueryError(SearchError): - pass - -DEFAULT_SOLR_URL = 'http://127.0.0.1:8983/solr' - - -class SolrSettings(object): - _is_initialised = False - _url = None - _user = None - _password = None - - @classmethod - def init(cls, url, user=None, password=None): - if url is not None: - cls._url = url - cls._user = user - cls._password = password - else: - cls._url = DEFAULT_SOLR_URL - cls._is_initialised = True - - @classmethod - def get(cls): - if not cls._is_initialised: - raise SearchIndexError('SOLR URL not initialised') - if not cls._url: - raise SearchIndexError('SOLR URL is blank') - return (cls._url, cls._user, cls._password) - - -def is_available(): - """ - Return true if we can successfully connect to Solr. - """ - try: - conn = make_connection() - conn.search(q="*:*", rows=1) - except Exception, e: - log.exception(e) - return False - return True - - -def make_connection(decode_dates=True): - solr_url, solr_user, solr_password = SolrSettings.get() - if decode_dates: - decoder = simplejson.JSONDecoder(object_hook=solr_datetime_decoder) - return pysolr.Solr(solr_url, decoder=decoder) - else: - return pysolr.Solr(solr_url) - - -def solr_datetime_decoder(d): - for k, v in d.items(): - if isinstance(v, basestring): - possible_datetime = re.search(pysolr.DATETIME_REGEX, v) - if possible_datetime: - date_values = possible_datetime.groupdict() - for dk, dv in date_values.items(): - date_values[dk] = int(dv) - - d[k] = datetime.datetime(date_values['year'], - date_values['month'], - date_values['day'], - date_values['hour'], - date_values['minute'], - date_values['second']) - return d diff --git a/ckan/logic/action/get.py b/ckan/logic/action/get.py index e57e4a1819d..cfbbc3c203c 100644 --- a/ckan/logic/action/get.py +++ b/ckan/logic/action/get.py @@ -2,9 +2,7 @@ import uuid import logging -import json import datetime -import socket from pylons import config import sqlalchemy @@ -86,7 +84,7 @@ def _activity_stream_get_filtered_users(): def _package_list_with_resources(context, package_revision_list): package_list = [] for package in package_revision_list: - result_dict = model_dictize.package_dictize(package,context) + result_dict = model_dictize.package_dictize(package, context) package_list.append(result_dict) return package_list @@ -138,7 +136,7 @@ def package_list(context, data_dict): if offset: query = query.offset(offset) - ## Returns the first field in each result record + # Returns the first field in each result record return [r[0] for r in query.execute()] @@ -162,12 +160,11 @@ def current_package_list_with_resources(context, data_dict): :rtype: list of dictionaries ''' - model = context["model"] limit = data_dict.get('limit') offset = data_dict.get('offset', 0) user = context['user'] - if not 'offset' in data_dict and 'page' in data_dict: + if 'offset' not in data_dict and 'page' in data_dict: log.warning('"page" parameter is deprecated. ' 'Use the "offset" parameter instead') page = data_dict['page'] @@ -591,11 +588,12 @@ def organization_list_for_user(context, data_dict): '''Return the organizations that the user has a given permission for. By default this returns the list of organizations that the currently - authorized user can edit, i.e. the list of organizations that the user is an - admin of. + authorized user can edit, i.e. the list of organizations that the user is + an admin of. Specifically it returns the list of organizations that the currently - authorized user has a given permission (for example: "manage_group") against. + authorized user has a given permission (for example: "manage_group") + against. When a user becomes a member of an organization in CKAN they're given a "capacity" (sometimes called a "role"), for example "member", "editor" or @@ -851,7 +849,7 @@ def user_list(context, data_dict): # Filter deleted users query = query.filter(model.User.state != model.State.DELETED) - ## hack for pagination + # hack for pagination if context.get('return_query'): return query @@ -878,7 +876,7 @@ def package_relationships_list(context, data_dict): :rtype: list of dictionaries ''' - ##TODO needs to work with dictization layer + # TODO needs to work with dictization layer model = context['model'] api = context.get('api_version') @@ -909,8 +907,9 @@ def package_relationships_list(context, data_dict): % (id, rel, id2)) relationship_dicts = [ - rel.as_dict(pkg1, ref_package_by=ref_package_by) - for rel in relationships] + rrel.as_dict(pkg1, ref_package_by=ref_package_by) + for rrel in relationships + ] return relationship_dicts @@ -947,29 +946,6 @@ def package_show(context, data_dict): include_tracking = asbool(data_dict.get('include_tracking', False)) package_dict = None - use_cache = (context.get('use_cache', True) - and not 'revision_id' in context - and not 'revision_date' in context) - if use_cache: - try: - search_result = search.show(name_or_id) - except (search.SearchError, socket.error): - pass - else: - use_validated_cache = 'schema' not in context - if use_validated_cache and 'validated_data_dict' in search_result: - package_json = search_result['validated_data_dict'] - package_dict = json.loads(package_json) - package_dict_validated = True - else: - package_dict = json.loads(search_result['data_dict']) - package_dict_validated = False - metadata_modified = pkg.metadata_modified.isoformat() - search_metadata_modified = search_result['metadata_modified'] - # solr stores less precice datetime, - # truncate to 22 charactors to get good enough match - if metadata_modified[:22] != search_metadata_modified[:22]: - package_dict = None if not package_dict: package_dict = model_dictize.package_dictize(pkg, context) @@ -1046,8 +1022,13 @@ def resource_show(context, data_dict): pkg_dict = logic.get_action('package_show')( dict(context), - {'id': resource.package.id, - 'include_tracking': asbool(data_dict.get('include_tracking', False))}) + { + 'id': resource.package.id, + 'include_tracking': asbool( + data_dict.get('include_tracking', False) + ) + } + ) for resource_dict in pkg_dict['resources']: if resource_dict['id'] == id: @@ -1100,7 +1081,7 @@ def resource_view_list(context, data_dict): context['resource'] = resource _check_access('resource_view_list', context, data_dict) q = model.Session.query(model.ResourceView).filter_by(resource_id=id) - ## only show views when there is the correct plugin enabled + # only show views when there is the correct plugin enabled resource_views = [ resource_view for resource_view in q.order_by(model.ResourceView.order).all() @@ -1123,6 +1104,7 @@ def resource_status_show(context, data_dict): try: import ckan.lib.celery_app as celery_app + assert celery_app except ImportError: return {'message': 'queue is not installed on this instance'} @@ -1645,10 +1627,13 @@ def organization_autocomplete(context, data_dict): _check_access('organization_autocomplete', context, data_dict) q = data_dict['q'] - limit = data_dict.get('limit', 20) model = context['model'] - query = model.Group.search_by_name_or_title(q, group_type=None, is_org=True) + query = model.Group.search_by_name_or_title( + q, + group_type=None, + is_org=True + ) organization_list = [] for organization in query.all(): @@ -1777,164 +1762,33 @@ def package_search(context, data_dict): query cannot be changed. CKAN always returns the matched datasets as dictionary objects. ''' - # sometimes context['schema'] is None - schema = (context.get('schema') or - logic.schema.default_package_search_schema()) - data_dict, errors = _validate(data_dict, schema, context) - # put the extras back into the data_dict so that the search can - # report needless parameters - data_dict.update(data_dict.get('__extras', {})) - data_dict.pop('__extras', None) - if errors: - raise ValidationError(errors) - - model = context['model'] - session = context['session'] - user = context.get('user') - + # Ensure the user/agent can call package_search. _check_access('package_search', context, data_dict) - # Move ext_ params to extras and remove them from the root of the search - # params, so they don't cause and error - data_dict['extras'] = data_dict.get('extras', {}) - for key in [key for key in data_dict.keys() if key.startswith('ext_')]: - data_dict['extras'][key] = data_dict.pop(key) - - # check if some extension needs to modify the search params - for item in plugins.PluginImplementations(plugins.IPackageController): - data_dict = item.before_search(data_dict) - - # the extension may have decided that it is not necessary to perform - # the query - abort = data_dict.get('abort_search', False) - - if data_dict.get('sort') in (None, 'rank'): - data_dict['sort'] = 'score desc, metadata_modified desc' - - results = [] - if not abort: - if asbool(data_dict.get('use_default_schema')): - data_source = 'data_dict' - else: - data_source = 'validated_data_dict' - data_dict.pop('use_default_schema', None) - # return a list of package ids - data_dict['fl'] = 'id {0}'.format(data_source) - - # If this query hasn't come from a controller that has set this flag - # then we should remove any mention of capacity from the fq and - # instead set it to only retrieve public datasets - fq = data_dict.get('fq', '') - if not context.get('ignore_capacity_check', False): - fq = ' '.join(p for p in fq.split(' ') - if 'capacity:' not in p) - data_dict['fq'] = fq + ' capacity:"public"' - - # Solr doesn't need 'include_drafts`, so pop it. - include_drafts = data_dict.pop('include_drafts', False) - fq = data_dict.get('fq', '') - if include_drafts: - user_id = authz.get_user_id_for_username(user, allow_none=True) - if authz.is_sysadmin(user): - data_dict['fq'] = fq + ' +state:(active OR draft)' - elif user_id: - # Query to return all active datasets, and all draft datasets - # for this user. - data_dict['fq'] = fq + \ - ' ((creator_user_id:{0} AND +state:(draft OR active))' \ - ' OR state:active)'.format(user_id) - elif not authz.is_sysadmin(user): - data_dict['fq'] = fq + ' +state:active' - - # Pop these ones as Solr does not need them - extras = data_dict.pop('extras', None) - - query = search.query_for(model.Package) - query.run(data_dict) - - # Add them back so extensions can use them on after_search - data_dict['extras'] = extras - - for package in query.results: - # get the package object - package_dict = package.get(data_source) - ## use data in search index if there - if package_dict: - # the package_dict still needs translating when being viewed - package_dict = json.loads(package_dict) - if context.get('for_view'): - for item in plugins.PluginImplementations( - plugins.IPackageController): - package_dict = item.before_view(package_dict) - results.append(package_dict) - else: - log.error('No package_dict is coming from solr for package ' - 'id %s', package['id']) - - count = query.count - facets = query.facets - else: - count = 0 - facets = {} - results = [] - - search_results = { - 'count': count, - 'facets': facets, - 'results': results, - 'sort': data_dict['sort'] - } - - # create a lookup table of group name to title for all the groups and - # organizations in the current search's facets. - group_names = [] - for field_name in ('groups', 'organization'): - group_names.extend(facets.get(field_name, {}).keys()) - - groups = (session.query(model.Group.name, model.Group.title) - .filter(model.Group.name.in_(group_names)) - .all() - if group_names else []) - group_titles_by_name = dict(groups) - - # Transform facets into a more useful data structure. - restructured_facets = {} - for key, value in facets.items(): - restructured_facets[key] = { - 'title': key, - 'items': [] - } - for key_, value_ in value.items(): - new_facet_dict = {} - new_facet_dict['name'] = key_ - if key in ('groups', 'organization'): - display_name = group_titles_by_name.get(key_, key_) - display_name = display_name if display_name and display_name.strip() else key_ - new_facet_dict['display_name'] = display_name - elif key == 'license_id': - license = model.Package.get_license_register().get(key_) - if license: - new_facet_dict['display_name'] = license.title - else: - new_facet_dict['display_name'] = key_ - else: - new_facet_dict['display_name'] = key_ - new_facet_dict['count'] = value_ - restructured_facets[key]['items'].append(new_facet_dict) - search_results['search_facets'] = restructured_facets - - # check if some extension needs to modify the search results - for item in plugins.PluginImplementations(plugins.IPackageController): - search_results = item.after_search(search_results, data_dict) - - # After extensions have had a chance to modify the facets, sort them by - # display name. - for facet in search_results['search_facets']: - search_results['search_facets'][facet]['items'] = sorted( - search_results['search_facets'][facet]['items'], - key=lambda facet: facet['display_name'], reverse=True) + # Load and validate the schema. + schema = ( + context.get('schema') or + logic.schema.default_package_search_schema() + ) + data_dict, errors = _validate(data_dict, schema, context) - return search_results + # The new-style sort arguments should be a list of tuples in the form + # (, ) where order is either desc or asc. + sort = data_dict.get('sort') + if sort and isinstance(sort, basestring): + sort = [ + (k.strip(), o.strip()) for k, o in ( + pair.strip().split(' ') for pair in sort.split(',') + ) + ] + + # Run the actual search and return the results. + return search.query( + data_dict.get('q', '*'), + facets=data_dict.get('facet.field'), + limit=min(1000, int(data_dict.get('rows', 1000))), + sort=sort + ) @logic.validate(logic.schema.default_resource_search_schema) diff --git a/ckan/model/modification.py b/ckan/model/modification.py index a7be9c37f51..0598ffe0f80 100644 --- a/ckan/model/modification.py +++ b/ckan/model/modification.py @@ -1,7 +1,5 @@ import logging -from ckan.lib.search import SearchIndexError - import ckan.plugins as plugins import domain_object import package as _package @@ -84,11 +82,6 @@ def notify(self, entity, operation): plugins.IDomainObjectModification): try: observer.notify(entity, operation) - except SearchIndexError, search_error: - log.exception(search_error) - # Reraise, since it's pretty crucial to ckan if it can't index - # a dataset - raise except Exception, ex: log.exception(ex) # Don't reraise other exceptions since they are generally of @@ -99,11 +92,6 @@ def notify_after_commit(self, entity, operation): plugins.IDomainObjectModification): try: observer.notify_after_commit(entity, operation) - except SearchIndexError, search_error: - log.exception(search_error) - # Reraise, since it's pretty crucial to ckan if it can't index - # a dataset - raise except Exception, ex: log.exception(ex) # Don't reraise other exceptions since they are generally of diff --git a/ckan/plugins/core.py b/ckan/plugins/core.py index 864805590d0..bdeec5f6888 100644 --- a/ckan/plugins/core.py +++ b/ckan/plugins/core.py @@ -9,7 +9,6 @@ from pyutilib.component.core import ExtensionPoint as PluginImplementations from pyutilib.component.core import SingletonPlugin as _pca_SingletonPlugin from pyutilib.component.core import Plugin as _pca_Plugin -from paste.deploy.converters import asbool import interfaces @@ -124,12 +123,6 @@ def load_all(config): unload_all() plugins = config.get('ckan.plugins', '').split() + find_system_plugins() - # Add the synchronous search plugin, unless already loaded or - # explicitly disabled - if 'synchronous_search' not in plugins and \ - asbool(config.get('ckan.search.automatic_indexing', True)): - log.debug('Loading the synchronous search plugin') - plugins.append('synchronous_search') load(*plugins) diff --git a/ckan/plugins/interfaces.py b/ckan/plugins/interfaces.py index e29d390f132..e193cc9d3e7 100644 --- a/ckan/plugins/interfaces.py +++ b/ckan/plugins/interfaces.py @@ -23,6 +23,7 @@ 'IFacets', 'IAuthenticator', 'ITranslation', + 'IIndexer', 'IUploader' ] @@ -1566,3 +1567,38 @@ def get_resource_uploader(self): :type id: string ''' + +class IIndexer(Interface): + """ + Extensions implementing this interface can implement custom indexing and + search methods, such as implmeneting support for ElasticSearch, Sphinx, + etc... + """ + def index(self, package): + """ + Indexes the provided package. + """ + + def query(self, search_query, facets=None, limit=1000, sort=None): + """ + Search for `search_query`, optionally limited by `facets`, and return the + results. + + :param search_query: The query to search on. + :param facets: The facets to filter on. + :param limit: The maximum number of results to return + :param sort: A list of tuples in the form `(, )`. + """ + + def reindex(self, cursor): + """ + Reindex all of the packages returned by `cursor`, a package + interator. + """ + for package in cursor: + self.index(package) + + def remove(self, package): + """ + Remove the given package from the search index. + """ diff --git a/ckanext/search/__init__.py b/ckanext/search/__init__.py new file mode 100644 index 00000000000..f5da42858f6 --- /dev/null +++ b/ckanext/search/__init__.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from ckan import plugins + + +class NoIndexer(plugins.SingletonPlugin): + """ + A search indexer that simply does nothing. Documents will not be indexed + and search results will always return nothing. + """ + plugins.implements(plugins.IIndexer) + + def index(self, package): + return True + + def query(self, *args, **kwargs): + return { + 'results': [], + 'count': 0, + 'search_facets': {}, + 'facets': {}, + 'sort': 'score' + } + + def reindex(self, cursor): + return + + def remove(self, package): + return + + +class PostgresIndexer(plugins.SingletonPlugin): + """ + A search indexer that utilizes PostgreSQL FTS. + """ + plugins.implements(plugins.IIndexer) + + def index(self, package): + pass + + def query(self, search_query, facets=None): + return {'results': [], 'count': 0} + + def reindex(self, cursor): + return + + def remove(self, package): + return diff --git a/ckanext/search/solr.py b/ckanext/search/solr.py new file mode 100644 index 00000000000..b3953d4c1dc --- /dev/null +++ b/ckanext/search/solr.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import json +import datetime + +import pysolr +from pylons import config +from ckan import plugins + + +def solr_datetime_decoder(d): + for k, v in d.items(): + if not isinstance(v, basestring): + continue + + possible_datetime = pysolr.DATETIME_REGEX.search(v) + if possible_datetime: + date_values = possible_datetime.groupdict() + + for dk, dv in date_values.iteritems(): + date_values[dk] = int(dv) + + d[k] = datetime.datetime( + date_values['year'], + date_values['month'], + date_values['day'], + date_values['hour'], + date_values['minute'], + date_values['second'] + ) + + return d + + +class SolrIndexer(plugins.SingletonPlugin): + """ + A search indexer that utilizes Solr. + """ + plugins.implements(plugins.IIndexer, inherit=True) + + def index(self, package): + pass + + def query(self, search_query, facets=None, limit=1000, sort=None, + include_inactive=False): + solr = self._get_connection() + + # By default we want to return all packages. + if search_query in (None, ''): + search_query = '*:*' + + # By default we want to sort by search score, with a secondary sort + # to order by last-updated. + if sort is None: + sort = [('score', 'desc'), ('metadata_modified', 'desc')] + + # LEGACY: Multiple CKAN instances can live on the same Solr core. + # Filter the results to only show the current site results. + fq = ['+site_id:{0}'.format(config.get('ckan.site_id'))] + + if not include_inactive: + fq.append('+state:active') + + response = solr.search(search_query, **{ + 'rows': limit, + 'sort': ', '.join(' '.join(pair) for pair in sort), + 'facet': 'true', + 'fq': fq + }) + + # LEGACY: Take all the top-level extras and move them into an "extras" + # dict. + for result in response.docs: + result['extras'] = extras = {} + + for k in result.keys(): + if k.startswith('extras_'): + extras[k[7:]] = result.pop(k) + + return { + 'results': response.docs, + 'count': response.hits, + 'sort': sort, + 'facets': {}, + 'search_facets': {} + } + + def remove(self, package): + self._get_connection().delete( + q='id:{package_id}'.format( + package.id + ), + fq=[ + '+site_id:{0}'.format(config.get('ckan.site_id')) + ] + ) + + def _get_connection(self): + return pysolr.Solr( + config.get('solr_url'), + # Use a custom decoder that will "fix" datetimes stored in solr. + decoder=json.JSONDecoder( + object_hook=solr_datetime_decoder + ) + ) diff --git a/setup.py b/setup.py index f6d55e0d122..5c284cc16ea 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,6 @@ 'sysadmin = ckan.lib.cli:Sysadmin', 'user = ckan.lib.cli:UserCmd', 'dataset = ckan.lib.cli:DatasetCmd', - 'search-index = ckan.lib.cli:SearchIndexCommand', 'ratings = ckan.lib.cli:Ratings', 'notify = ckan.lib.cli:Notification', 'celeryd = ckan.lib.cli:Celery', @@ -60,12 +59,7 @@ 'group = ckan.forms.group:get_group_fieldset', 'package_group = ckan.forms.group:get_package_group_fieldset', ], - 'ckan.search': [ - 'sql = ckan.lib.search.sql:SqlSearchBackend', - 'solr = ckan.lib.search.solr_backend:SolrSearchBackend', - ], 'ckan.plugins': [ - 'synchronous_search = ckan.lib.search:SynchronousSearchPlugin', 'stats = ckanext.stats.plugin:StatsPlugin', 'publisher_form = ckanext.publisher_form.forms:PublisherForm', 'publisher_dataset_form = ckanext.publisher_form.forms:PublisherDatasetForm', @@ -86,6 +80,10 @@ 'recline_map_view = ckanext.reclineview.plugin:ReclineMapView', 'image_view = ckanext.imageview.plugin:ImageView', 'webpage_view = ckanext.webpageview.plugin:WebPageView', + # Search indexer plugins. + 'no_indexer = ckanext.search:NoIndexer', + 'postgres_indexer = ckanext.search:PostgresIndexer', + 'solr_indexer = ckanext.search.solr:SolrIndexer', # FIXME: Remove deprecated resource previews below. You should use the # versions as *_view instead. 'text_preview = ckanext.textview.plugin:TextView',