Skip to content

Commit

Permalink
Pass active backend to index queryset calls (closes django-haystack#534)
Browse files Browse the repository at this point in the history
Now the Index index_queryset() and read_queryset() methods will be called with
the active backend name so they can optionally perform backend-specific
filtering.

This is extremely useful when using something like Solr cores to maintain
language specific backends, allowing an Index to select the appropriate
documents for each language::

    def index_queryset(self, using=None):
        return Post.objects.filter(language=using)

Changes:
    * clear_index, update_index and rebuild_index all default to processing
      *every* backend. ``--using`` may now be provided multiple times to select
      a subset of the configured backends.
    * Added examples to the Multiple Index documentation page
  • Loading branch information
acdha authored and floppya committed Mar 29, 2013
1 parent c44e206 commit 779634c
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 54 deletions.
2 changes: 1 addition & 1 deletion docs/autocomplete.rst
Expand Up @@ -42,7 +42,7 @@ Example (continuing from the tutorial)::
def get_model(self): def get_model(self):
return Note return Note


def index_queryset(self): def index_queryset(self, using=None):
"""Used when the entire index for model is updated.""" """Used when the entire index for model is updated."""
return Note.objects.filter(pub_date__lte=datetime.datetime.now()) return Note.objects.filter(pub_date__lte=datetime.datetime.now())


Expand Down
2 changes: 1 addition & 1 deletion docs/migration_from_1_to_2.rst
Expand Up @@ -155,7 +155,7 @@ A converted Haystack 2.X index should look like::
def get_model(self): def get_model(self):
return Note return Note


def index_queryset(self): def index_queryset(self, using=None):
"""Used when the entire index for model is updated.""" """Used when the entire index for model is updated."""
return self.get_model().objects.filter(pub_date__lte=datetime.datetime.now()) return self.get_model().objects.filter(pub_date__lte=datetime.datetime.now())


Expand Down
36 changes: 36 additions & 0 deletions docs/multiple_index.rst
Expand Up @@ -163,3 +163,39 @@ via the ``SearchQuerySet.using`` method::
Note that the models a ``SearchQuerySet`` is trying to pull from must all come Note that the models a ``SearchQuerySet`` is trying to pull from must all come
from the same index. Haystack is not able to combine search queries against from the same index. Haystack is not able to combine search queries against
different indexes. different indexes.


Custom Index Selection
======================

If a specific backend has been selected, the ``SearchIndex.index_queryset`` and
``SearchIndex.read_queryset`` will receive the backend name, giving indexes the
opportunity to customize the returned queryset.

For example, a site which uses separate indexes for recent items and older
content might define ``index_queryset`` to filter the items based on date::

def index_queryset(self, using=None):
qs = Note.objects.all()
archive_limit = datetime.datetime.now() - datetime.timedelta(days=90)

if using == "archive":
return qs.filter(pub_date__lte=archive_limit)
else:
return qs.filter(pub_date__gte=archive_limit)


Multi-lingual Content
---------------------

Most search engines require you to set the language at the index level. For
example, a multi-lingual site using Solr can use `multiple cores <http://wiki.apache.org/solr/CoreAdmin>`_ and corresponding Haystack
backends using the language name. Under this scenario, queries are simple::

sqs = SearchQuerySet.using(lang).auto_query(…)

During index updates, the Index's ``index_queryset`` method will need to filter
the items to avoid sending the wrong content to the search engine::

def index_queryset(self, using=None):
return Post.objects.filter(language=using)
8 changes: 4 additions & 4 deletions docs/searchindex_api.rst
Expand Up @@ -34,7 +34,7 @@ For the impatient::
def get_model(self): def get_model(self):
return Note return Note


def index_queryset(self): def index_queryset(self, using=None):
"Used when the entire index for model is updated." "Used when the entire index for model is updated."
return self.get_model().objects.filter(pub_date__lte=datetime.datetime.now()) return self.get_model().objects.filter(pub_date__lte=datetime.datetime.now())


Expand Down Expand Up @@ -386,7 +386,7 @@ This method is required & you must override it to return the correct class.
``index_queryset`` ``index_queryset``
------------------ ------------------


.. method:: SearchIndex.index_queryset(self) .. method:: SearchIndex.index_queryset(self, using=None)


Get the default QuerySet to index when doing a full update. Get the default QuerySet to index when doing a full update.


Expand All @@ -395,7 +395,7 @@ Subclasses can override this method to avoid indexing certain objects.
``read_queryset`` ``read_queryset``
----------------- -----------------


.. method:: SearchIndex.read_queryset(self) .. method:: SearchIndex.read_queryset(self, using=None)


Get the default QuerySet for read actions. Get the default QuerySet for read actions.


Expand Down Expand Up @@ -609,7 +609,7 @@ For the impatient::
fields = ['user', 'pub_date'] fields = ['user', 'pub_date']


# Note that regular ``SearchIndex`` methods apply. # Note that regular ``SearchIndex`` methods apply.
def index_queryset(self): def index_queryset(self, using=None):
"Used when the entire index for model is updated." "Used when the entire index for model is updated."
return Note.objects.filter(pub_date__lte=datetime.datetime.now()) return Note.objects.filter(pub_date__lte=datetime.datetime.now())


2 changes: 1 addition & 1 deletion docs/tutorial.rst
Expand Up @@ -221,7 +221,7 @@ Haystack to automatically pick it up. The ``NoteIndex`` should look like::
def get_model(self): def get_model(self):
return Note return Note


def index_queryset(self): def index_queryset(self, using=None):
"""Used when the entire index for model is updated.""" """Used when the entire index for model is updated."""
return self.get_model().objects.filter(pub_date__lte=datetime.datetime.now()) return self.get_model().objects.filter(pub_date__lte=datetime.datetime.now())


Expand Down
2 changes: 1 addition & 1 deletion example_project/regular_app/search_indexes.py
Expand Up @@ -19,7 +19,7 @@ class DogIndex(indexes.SearchIndex, indexes.Indexable):
def get_model(self): def get_model(self):
return Dog return Dog


def index_queryset(self): def index_queryset(self, using=None):
return self.get_model().objects.filter(public=True) return self.get_model().objects.filter(public=True)


def prepare_toys(self, obj): def prepare_toys(self, obj):
Expand Down
12 changes: 6 additions & 6 deletions haystack/indexes.py
Expand Up @@ -76,7 +76,7 @@ class NoteIndex(indexes.SearchIndex, indexes.Indexable):
def get_model(self): def get_model(self):
return Note return Note
def index_queryset(self): def index_queryset(self, using=None):
return self.get_model().objects.filter(pub_date__lte=datetime.datetime.now()) return self.get_model().objects.filter(pub_date__lte=datetime.datetime.now())
""" """
Expand All @@ -102,24 +102,24 @@ def get_model(self):
""" """
raise NotImplementedError("You must provide a 'model' method for the '%r' index." % self) raise NotImplementedError("You must provide a 'model' method for the '%r' index." % self)


def index_queryset(self): def index_queryset(self, using=None):
""" """
Get the default QuerySet to index when doing a full update. Get the default QuerySet to index when doing a full update.
Subclasses can override this method to avoid indexing certain objects. Subclasses can override this method to avoid indexing certain objects.
""" """
return self.get_model()._default_manager.all() return self.get_model()._default_manager.all()


def read_queryset(self): def read_queryset(self, using=None):
""" """
Get the default QuerySet for read actions. Get the default QuerySet for read actions.
Subclasses can override this method to work with other managers. Subclasses can override this method to work with other managers.
Useful when working with default managers that filter some objects. Useful when working with default managers that filter some objects.
""" """
return self.index_queryset() return self.index_queryset(using=using)


def build_queryset(self, start_date=None, end_date=None): def build_queryset(self, using=None, start_date=None, end_date=None):
""" """
Get the default QuerySet to index when doing an index update. Get the default QuerySet to index when doing an index update.
Expand Down Expand Up @@ -154,7 +154,7 @@ def build_queryset(self, start_date=None, end_date=None):
warnings.warn("'SearchIndex.get_queryset' was deprecated in Haystack v2. Please rename the method 'index_queryset'.") warnings.warn("'SearchIndex.get_queryset' was deprecated in Haystack v2. Please rename the method 'index_queryset'.")
index_qs = self.get_queryset() index_qs = self.get_queryset()
else: else:
index_qs = self.index_queryset() index_qs = self.index_queryset(using=using)


if not hasattr(index_qs, 'filter'): if not hasattr(index_qs, 'filter'):
raise ImproperlyConfigured("The '%r' class must return a 'QuerySet' in the 'index_queryset' method." % self) raise ImproperlyConfigured("The '%r' class must return a 'QuerySet' in the 'index_queryset' method." % self)
Expand Down
34 changes: 20 additions & 14 deletions haystack/management/commands/clear_index.py
@@ -1,7 +1,7 @@
from optparse import make_option from optparse import make_option
import sys import sys

from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from haystack.constants import DEFAULT_ALIAS




class Command(BaseCommand): class Command(BaseCommand):
Expand All @@ -10,35 +10,41 @@ class Command(BaseCommand):
make_option('--noinput', action='store_false', dest='interactive', default=True, make_option('--noinput', action='store_false', dest='interactive', default=True,
help='If provided, no prompts will be issued to the user and the data will be wiped out.' help='If provided, no prompts will be issued to the user and the data will be wiped out.'
), ),
make_option("-u", "--using", action="store", type="string", dest="using", default=DEFAULT_ALIAS, make_option("-u", "--using", action="append", dest="using",
help='If provided, chooses a connection to work with.' default=[],
help='Update only the named backend (can be used multiple times). '
'By default all backends will be updated.'
), ),
) )
option_list = BaseCommand.option_list + base_options option_list = BaseCommand.option_list + base_options

def handle(self, **options): def handle(self, **options):
"""Clears out the search index completely.""" """Clears out the search index completely."""
from haystack import connections from haystack import connections
self.verbosity = int(options.get('verbosity', 1)) self.verbosity = int(options.get('verbosity', 1))
self.using = options.get('using')

using = options.get('using')
if not using:
using = connections.connections_info.keys()

if options.get('interactive', True): if options.get('interactive', True):
print print
print "WARNING: This will irreparably remove EVERYTHING from your search index in connection '%s'." % self.using print "WARNING: This will irreparably remove EVERYTHING from your search index in connection '%s'." % "', '".join(using)
print "Your choices after this are to restore from backups or rebuild via the `rebuild_index` command." print "Your choices after this are to restore from backups or rebuild via the `rebuild_index` command."

yes_or_no = raw_input("Are you sure you wish to continue? [y/N] ") yes_or_no = raw_input("Are you sure you wish to continue? [y/N] ")
print print

if not yes_or_no.lower().startswith('y'): if not yes_or_no.lower().startswith('y'):
print "No action taken." print "No action taken."
sys.exit() sys.exit()

if self.verbosity >= 1: if self.verbosity >= 1:
print "Removing all documents from your index because you said so." print "Removing all documents from your index because you said so."


backend = connections[self.using].get_backend() for backend_name in using:
backend.clear() backend = connections[backend_name].get_backend()

backend.clear()

if self.verbosity >= 1: if self.verbosity >= 1:
print "All documents removed." print "All documents removed."
48 changes: 32 additions & 16 deletions haystack/management/commands/update_index.py
@@ -1,7 +1,7 @@
from datetime import timedelta from datetime import timedelta
from optparse import make_option from optparse import make_option
import logging
import os import os
import warnings


from django import db from django import db
from django.conf import settings from django.conf import settings
Expand All @@ -11,7 +11,6 @@
from django.utils.encoding import smart_str from django.utils.encoding import smart_str


from haystack import connections as haystack_connections from haystack import connections as haystack_connections
from haystack.constants import DEFAULT_ALIAS
from haystack.query import SearchQuerySet from haystack.query import SearchQuerySet


try: try:
Expand Down Expand Up @@ -71,9 +70,9 @@ def do_update(backend, index, qs, start, end, total, verbosity=1):


if verbosity >= 2: if verbosity >= 2:
if hasattr(os, 'getppid') and os.getpid() == os.getppid(): if hasattr(os, 'getppid') and os.getpid() == os.getppid():
print " indexed %s - %d of %d." % (start+1, end, total) print " indexed %s - %d of %d." % (start + 1, end, total)
else: else:
print " indexed %s - %d of %d (by %s)." % (start+1, end, total, os.getpid()) print " indexed %s - %d of %d (by %s)." % (start + 1, end, total, os.getpid())


# FIXME: Get the right backend. # FIXME: Get the right backend.
backend.update(index, current_qs) backend.update(index, current_qs)
Expand Down Expand Up @@ -121,8 +120,10 @@ class Command(LabelCommand):
make_option('-r', '--remove', action='store_true', dest='remove', make_option('-r', '--remove', action='store_true', dest='remove',
default=False, help='Remove objects from the index that are no longer present in the database.' default=False, help='Remove objects from the index that are no longer present in the database.'
), ),
make_option("-u", "--using", action="store", type="string", dest="using", default=DEFAULT_ALIAS, make_option("-u", "--using", action="append", dest="using",
help='If provided, chooses a connection to work with.' default=[],
help='Update only the named backend (can be used multiple times). '
'By default all backends will be updated.'
), ),
make_option('-k', '--workers', action='store', dest='workers', make_option('-k', '--workers', action='store', dest='workers',
default=0, type='int', default=0, type='int',
Expand All @@ -137,9 +138,11 @@ def handle(self, *items, **options):
self.start_date = None self.start_date = None
self.end_date = None self.end_date = None
self.remove = options.get('remove', False) self.remove = options.get('remove', False)
self.using = options.get('using')
self.workers = int(options.get('workers', 0)) self.workers = int(options.get('workers', 0))
self.backend = haystack_connections[self.using].get_backend()
self.backends = options.get('using')
if not self.backends:
self.backends = haystack_connections.connections_info.keys()


age = options.get('age', DEFAULT_AGE) age = options.get('age', DEFAULT_AGE)
start_date = options.get('start_date') start_date = options.get('start_date')
Expand Down Expand Up @@ -202,9 +205,18 @@ def get_models(self, label):
return [get_model(app_label, model_name)] return [get_model(app_label, model_name)]


def handle_label(self, label, **options): def handle_label(self, label, **options):
for using in self.backends:
try:
self.update_backend(label, using)
except:
logging.exception("Error updating %s using %s ", label, using)
raise

def update_backend(self, label, using):
from haystack.exceptions import NotHandled from haystack.exceptions import NotHandled


unified_index = haystack_connections[self.using].get_unified_index() backend = haystack_connections[using].get_backend()
unified_index = haystack_connections[using].get_unified_index()


if self.workers > 0: if self.workers > 0:
import multiprocessing import multiprocessing
Expand All @@ -218,17 +230,21 @@ def handle_label(self, label, **options):
continue continue


if self.workers > 0: if self.workers > 0:
# workers resetting connections leads to references to models / connections getting stale and having their connection disconnected from under them. Resetting before the loop continues and it accesses the ORM makes it better. # workers resetting connections leads to references to models / connections getting
# stale and having their connection disconnected from under them. Resetting before
# the loop continues and it accesses the ORM makes it better.
db.close_connection() db.close_connection()


qs = index.build_queryset(start_date=self.start_date, end_date=self.end_date) qs = index.build_queryset(using=using, start_date=self.start_date,
end_date=self.end_date)

total = qs.count() total = qs.count()


if self.verbosity >= 1: if self.verbosity >= 1:
print "Indexing %d %s." % (total, smart_str(model._meta.verbose_name_plural)) print "Indexing %d %s." % (total, smart_str(model._meta.verbose_name_plural))


pks_seen = set([smart_str(pk) for pk in qs.values_list('pk', flat=True)]) pks_seen = set([smart_str(pk) for pk in qs.values_list('pk', flat=True)])
batch_size = self.batchsize or self.backend.batch_size batch_size = self.batchsize or backend.batch_size


if self.workers > 0: if self.workers > 0:
ghetto_queue = [] ghetto_queue = []
Expand All @@ -237,9 +253,9 @@ def handle_label(self, label, **options):
end = min(start + batch_size, total) end = min(start + batch_size, total)


if self.workers == 0: if self.workers == 0:
do_update(self.backend, index, qs, start, end, total, self.verbosity) do_update(backend, index, qs, start, end, total, self.verbosity)
else: else:
ghetto_queue.append(('do_update', model, start, end, total, self.using, self.start_date, self.end_date, self.verbosity)) ghetto_queue.append(('do_update', model, start, end, total, using, self.start_date, self.end_date, self.verbosity))


if self.workers > 0: if self.workers > 0:
pool = multiprocessing.Pool(self.workers) pool = multiprocessing.Pool(self.workers)
Expand All @@ -261,9 +277,9 @@ def handle_label(self, label, **options):
upper_bound = start + batch_size upper_bound = start + batch_size


if self.workers == 0: if self.workers == 0:
do_remove(self.backend, index, model, pks_seen, start, upper_bound) do_remove(backend, index, model, pks_seen, start, upper_bound)
else: else:
ghetto_queue.append(('do_remove', model, pks_seen, start, upper_bound, self.using, self.verbosity)) ghetto_queue.append(('do_remove', model, pks_seen, start, upper_bound, using, self.verbosity))


if self.workers > 0: if self.workers > 0:
pool = multiprocessing.Pool(self.workers) pool = multiprocessing.Pool(self.workers)
Expand Down
2 changes: 1 addition & 1 deletion haystack/query.py
Expand Up @@ -205,7 +205,7 @@ def post_process_results(self, results):
try: try:
ui = connections[self.query._using].get_unified_index() ui = connections[self.query._using].get_unified_index()
index = ui.get_index(model) index = ui.get_index(model)
objects = index.read_queryset() objects = index.read_queryset(using=self.query._using)
loaded_objects[model] = objects.in_bulk(models_pks[model]) loaded_objects[model] = objects.in_bulk(models_pks[model])
except NotHandled: except NotHandled:
self.log.warning("Model '%s.%s' not handled by the routers.", self.app_label, self.model_name) self.log.warning("Model '%s.%s' not handled by the routers.", self.app_label, self.model_name)
Expand Down
1 change: 1 addition & 0 deletions tests/core/tests/__init__.py
Expand Up @@ -14,3 +14,4 @@
from core.tests.templatetags import * from core.tests.templatetags import *
from core.tests.views import * from core.tests.views import *
from core.tests.utils import * from core.tests.utils import *
from core.tests.management_commands import *

0 comments on commit 779634c

Please sign in to comment.