diff --git a/README.rst b/README.rst index 8dc42b8c..b4d2ba51 100644 --- a/README.rst +++ b/README.rst @@ -131,10 +131,12 @@ It required to defined ``Document`` class in ``documents.py`` in your app direc # Ignore auto updating of Elasticsearch when a model is saved # or deleted: # ignore_signals = True + # Don't perform an index refresh after every update (overrides global setting): # auto_refresh = False + # Paginate the django queryset used to populate the index with the specified size - # (by default there is no pagination) + # (by default it uses the database driver's default setting) # queryset_pagination = 5000 @@ -551,6 +553,15 @@ Defaults to ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor``. You could, for instance, make a ``CelerySignalProcessor`` which would add update jobs to the queue to for delayed processing. +ELASTICSEARCH_DSL_PARALLEL +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Default: ``False`` + +Run indexing (populate and rebuild) in parallel using ES' parallel_bulk() method. +Note that some databases (e.g. sqlite) do not play well with this option. + + Testing ------- diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index 753e6fc7..719ec9c1 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -1,9 +1,13 @@ from __future__ import unicode_literals -from django.core.paginator import Paginator +from collections import deque +from copy import deepcopy +from functools import partial + +from django import VERSION as DJANGO_VERSION from django.db import models from django.utils.six import iteritems -from elasticsearch.helpers import bulk +from elasticsearch.helpers import bulk, parallel_bulk from elasticsearch_dsl import Document as DSLDocument from .exceptions import ModelFieldNotMappedError @@ -45,11 +49,12 @@ models.URLField: TextField, } - class DocType(DSLDocument): + _prepared_fields = [] def __init__(self, related_instance_to_ignore=None, **kwargs): super(DocType, self).__init__(**kwargs) self._related_instance_to_ignore = related_instance_to_ignore + self._prepared_fields = self.init_prepare() def __eq__(self, other): return id(self) == id(other) @@ -70,39 +75,56 @@ def get_queryset(self): """ Return the queryset that should be indexed by this doc type. """ - primary_key_field_name = self.django.model._meta.pk.name - return self.django.model._default_manager.all().order_by(primary_key_field_name) + return self.django.model._default_manager.all() - def prepare(self, instance): + def get_indexing_queryset(self): """ - Take a model instance, and turn it into a dict that can be serialized - based on the fields defined on this DocType subclass + Build queryset (iterator) for use by indexing. + """ + qs = self.get_queryset() + kwargs = {} + if DJANGO_VERSION >= (2,) and self.django.queryset_pagination: + kwargs = {'chunk_size': self.django.queryset_pagination} + return qs.iterator(**kwargs) + + def init_prepare(self): + """ + Initialise the data model preparers once here. Extracts the preparers + from the model and generate a list of callables to avoid doing that + work on every object instance over. """ - data = {} - for name, field in iteritems(self._fields): + index_fields = getattr(self, '_fields', {}) + fields = [] + for name, field in iteritems(index_fields): if not isinstance(field, DEDField): continue - if field._path == []: + if not field._path: field._path = [name] prep_func = getattr(self, 'prepare_%s_with_related' % name, None) if prep_func: - field_value = prep_func( - instance, - related_to_ignore=self._related_instance_to_ignore - ) + fn = partial(prep_func, related_to_ignore=self._related_instance_to_ignore) else: prep_func = getattr(self, 'prepare_%s' % name, None) if prep_func: - field_value = prep_func(instance) + fn = prep_func else: - field_value = field.get_value_from_instance( - instance, self._related_instance_to_ignore - ) + fn = partial(field.get_value_from_instance, field_value_to_ignore=self._related_instance_to_ignore) - data[name] = field_value + fields.append((name, field, fn)) + return fields + + def prepare(self, instance): + """ + Take a model instance, and turn it into a dict that can be serialized + based on the fields defined on this DocType subclass + """ + data = { + name: prep_func(instance) + for name, field, prep_func in self._prepared_fields + } return data @classmethod @@ -124,6 +146,17 @@ def to_field(cls, field_name, model_field): def bulk(self, actions, **kwargs): return bulk(client=self._get_connection(), actions=actions, **kwargs) + def parallel_bulk(self, actions, **kwargs): + if self.django.queryset_pagination and 'chunk_size' not in kwargs: + kwargs['chunk_size'] = self.django.queryset_pagination + bulk_actions = parallel_bulk(client=self._get_connection(), actions=actions, **kwargs) + # As the `parallel_bulk` is lazy, we need to get it into `deque` to run it instantly + # See https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/2 + deque(bulk_actions, maxlen=0) + # Fake return value to emulate bulk() since we don't have a result yet, + # the result is currently not used upstream anyway. + return (1, []) + def _prepare_action(self, object_instance, action): return { '_op_type': action, @@ -135,18 +168,18 @@ def _prepare_action(self, object_instance, action): } def _get_actions(self, object_list, action): - if self.django.queryset_pagination is not None: - paginator = Paginator( - object_list, self.django.queryset_pagination - ) - for page in paginator.page_range: - for object_instance in paginator.page(page).object_list: - yield self._prepare_action(object_instance, action) + for object_instance in object_list: + yield self._prepare_action(object_instance, action) + + def _bulk(self, *args, **kwargs): + """Helper for switching between normal and parallel bulk operation""" + parallel = kwargs.pop('parallel', False) + if parallel: + return self.parallel_bulk(*args, **kwargs) else: - for object_instance in object_list: - yield self._prepare_action(object_instance, action) + return self.bulk(*args, **kwargs) - def update(self, thing, refresh=None, action='index', **kwargs): + def update(self, thing, refresh=None, action='index', parallel=False, **kwargs): """ Update each document in ES for a model, iterable of models or queryset """ @@ -160,8 +193,10 @@ def update(self, thing, refresh=None, action='index', **kwargs): else: object_list = thing - return self.bulk( - self._get_actions(object_list, action), **kwargs + return self._bulk( + self._get_actions(object_list, action), + parallel=parallel, + **kwargs ) diff --git a/django_elasticsearch_dsl/management/commands/search_index.py b/django_elasticsearch_dsl/management/commands/search_index.py index c73a87cd..8898928a 100644 --- a/django_elasticsearch_dsl/management/commands/search_index.py +++ b/django_elasticsearch_dsl/management/commands/search_index.py @@ -1,4 +1,6 @@ from __future__ import unicode_literals, absolute_import + +from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.utils.six.moves import input from ...registries import registry @@ -49,6 +51,26 @@ def add_arguments(self, parser): dest='force', help="Force operations without asking" ) + parser.add_argument( + '--parallel', + action='store_true', + dest='parallel', + help='Run populate/rebuild update multi threaded' + ) + parser.add_argument( + '--no-parallel', + action='store_false', + dest='parallel', + help='Run populate/rebuild update single threaded' + ) + parser.set_defaults(parallel=getattr(settings, 'ELASTICSEARCH_DSL_PARALLEL', False)) + parser.add_argument( + '--no-count', + action='store_false', + default=True, + dest='count', + help='Do not include a total count in the summary log line' + ) def _get_models(self, args): """ @@ -84,12 +106,15 @@ def _create(self, models, options): index.create() def _populate(self, models, options): + parallel = options['parallel'] for doc in registry.get_documents(models): - qs = doc().get_queryset() - self.stdout.write("Indexing {} '{}' objects".format( - qs.count(), doc.django.model.__name__) + self.stdout.write("Indexing {} '{}' objects {}".format( + doc().get_queryset().count() if options['count'] else "all", + doc.django.model.__name__, + "(parallel)" if parallel else "") ) - doc().update(qs) + qs = doc().get_indexing_queryset() + doc().update(qs, parallel=parallel) def _delete(self, models, options): index_names = [index._name for index in registry.get_indices(models)] diff --git a/django_elasticsearch_dsl/test/testcases.py b/django_elasticsearch_dsl/test/testcases.py index 83ea3401..ba29aaf3 100644 --- a/django_elasticsearch_dsl/test/testcases.py +++ b/django_elasticsearch_dsl/test/testcases.py @@ -19,11 +19,11 @@ def setUp(self): def tearDown(self): pattern = re.compile(self._index_suffixe + '$') - for doc in registry.get_documents(): - doc._index._name = pattern.sub('', doc._index._name) - for index in registry.get_indices(): index.delete(ignore=[404, 400]) index._name = pattern.sub('', index._name) + for doc in registry.get_documents(): + doc._index._name = pattern.sub('', doc._index._name) + super(ESTestCase, self).tearDown() diff --git a/example/test_app/documents.py b/example/test_app/documents.py index 1e7e1dfe..e1e449d9 100644 --- a/example/test_app/documents.py +++ b/example/test_app/documents.py @@ -1,7 +1,8 @@ from elasticsearch_dsl import analyzer -from django_elasticsearch_dsl import DocType, Index, fields +from django_elasticsearch_dsl import Document, Index, fields +from django_elasticsearch_dsl.registries import registry -from .models import Ad, Category, Car, Manufacturer +from .models import Ad, Car, Manufacturer car = Index('test_cars') @@ -19,8 +20,8 @@ ) -@car.doc_type -class CarDocument(DocType): +@registry.register_document +class CarDocument(Document): manufacturer = fields.ObjectField(properties={ 'name': fields.TextField(), 'country': fields.TextField(), @@ -37,19 +38,16 @@ class CarDocument(DocType): 'title': fields.TextField(), }) - class Meta: + class Django: model = Car - related_models = [Ad, Manufacturer, Category] fields = [ 'name', 'launched', 'type', ] - def get_queryset(self): - return super(CarDocument, self).get_queryset().select_related( - 'manufacturer' - ) + class Index: + name = "car" def get_instances_from_related(self, related_instance): if isinstance(related_instance, Ad): @@ -59,11 +57,11 @@ def get_instances_from_related(self, related_instance): return related_instance.car_set.all() -@car.doc_type -class ManufacturerDocument(DocType): +@registry.register_document +class ManufacturerDocument(Document): country = fields.TextField() - class Meta: + class Django: model = Manufacturer fields = [ 'name', @@ -72,54 +70,61 @@ class Meta: 'logo', ] - -class CarWithPrepareDocument(DocType): - manufacturer = fields.ObjectField(properties={ - 'name': fields.TextField(), - 'country': fields.TextField(), - }) - - manufacturer_short = fields.ObjectField(properties={ - 'name': fields.TextField(), - }) - - class Meta: - model = Car - related_models = [Manufacturer] - index = 'car_with_prepare_index' - fields = [ - 'name', - 'launched', - 'type', - ] - - def prepare_manufacturer_with_related(self, car, related_to_ignore): - if (car.manufacturer is not None and car.manufacturer != - related_to_ignore): - return { - 'name': car.manufacturer.name, - 'country': car.manufacturer.country(), - } - return {} - - def prepare_manufacturer_short(self, car): - if car.manufacturer is not None: - return { - 'name': car.manufacturer.name, - } - return {} - - def get_instances_from_related(self, related_instance): - return related_instance.car_set.all() - - -class AdDocument(DocType): + class Index: + name = "manufacturer" + + +# @registry.register_document +# class CarWithPrepareDocument(Document): +# manufacturer = fields.ObjectField(properties={ +# 'name': fields.TextField(), +# 'country': fields.TextField(), +# }) +# +# manufacturer_short = fields.ObjectField(properties={ +# 'name': fields.TextField(), +# }) +# +# class Index: +# name = "car_with_prepare_index" +# +# class Django: +# model = Car +# related_models = [Manufacturer] +# fields = [ +# 'name', +# 'launched', +# 'type', +# ] +# +# def prepare_manufacturer_with_related(self, car, related_to_ignore): +# if (car.manufacturer is not None and car.manufacturer != +# related_to_ignore): +# return { +# 'name': car.manufacturer.name, +# 'country': car.manufacturer.country(), +# } +# return {} +# +# def prepare_manufacturer_short(self, car): +# if car.manufacturer is not None: +# return { +# 'name': car.manufacturer.name, +# } +# return {} +# +# def get_instances_from_related(self, related_instance): +# return related_instance.car_set.all() + + +@registry.register_document +class AdDocument(Document): description = fields.TextField( analyzer=html_strip, fields={'raw': fields.KeywordField()} ) - class Meta: + class Django: model = Ad index = 'test_ads' fields = [ @@ -129,14 +134,21 @@ class Meta: 'url', ] + class Index: + name = "add" -class AdDocument2(DocType): + +@registry.register_document +class AdDocument2(Document): def __init__(self, *args, **kwargs): super(AdDocument2, self).__init__(*args, **kwargs) - class Meta: + class Django: model = Ad index = 'test_ads2' fields = [ 'title', ] + + class Index: + name = "add2" diff --git a/tests/documents.py b/tests/documents.py index 04e0ebfb..77e3668a 100644 --- a/tests/documents.py +++ b/tests/documents.py @@ -148,25 +148,5 @@ class Index: settings = index_settings -@registry.register_document -class PaginatedAdDocument(Document): - - class Django: - model = Ad - queryset_pagination = 2 - fields = [ - 'title', - 'created', - 'modified', - 'url', - ] - - class Index: - name = 'ad_index' - - def get_queryset(self): - return Ad.objects.all().order_by('-id') - - ad_index = AdDocument._index car_index = CarDocument._index diff --git a/tests/test_commands.py b/tests/test_commands.py index 2267d75c..9b66fa38 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -112,14 +112,15 @@ def test_create_all_indices(self): def test_populate_all_doc_type(self): call_command('search_index', stdout=self.out, action='populate') - self.doc_a1.get_queryset.assert_called_once() - self.doc_a1.update.assert_called_once_with(self.doc_a1_qs) - self.doc_a2.get_queryset.assert_called_once() - self.doc_a2.update.assert_called_once_with(self.doc_a2_qs) - self.doc_b1.get_queryset.assert_called_once() - self.doc_b1.update.assert_called_once_with(self.doc_b1_qs) - self.doc_c1.get_queryset.assert_called_once() - self.doc_c1.update.assert_called_once_with(self.doc_c1_qs) + # One call for "Indexing NNN documents", one for indexing itself (via get_index_queryset). + assert self.doc_a1.get_queryset.call_count == 2 + self.doc_a1.update.assert_called_once_with(self.doc_a1_qs.iterator(), parallel=False) + assert self.doc_a2.get_queryset.call_count == 2 + self.doc_a2.update.assert_called_once_with(self.doc_a2_qs.iterator(), parallel=False) + assert self.doc_b1.get_queryset.call_count == 2 + self.doc_b1.update.assert_called_once_with(self.doc_b1_qs.iterator(), parallel=False) + assert self.doc_c1.get_queryset.call_count == 2 + self.doc_c1.update.assert_called_once_with(self.doc_c1_qs.iterator(), parallel=False) def test_rebuild_indices(self): diff --git a/tests/test_documents.py b/tests/test_documents.py index d6e4e618..8d27d5be 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -3,7 +3,7 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from elasticsearch_dsl import GeoPoint, MetaField -from mock import patch +from mock import patch, Mock, PropertyMock from django_elasticsearch_dsl import fields from django_elasticsearch_dsl.documents import DocType @@ -274,8 +274,75 @@ class Django: car1 = Car() car2 = Car() car3 = Car() - with patch('django_elasticsearch_dsl.documents.bulk') as mock: - doc.update([car1, car2, car3]) + + bulk = "django_elasticsearch_dsl.documents.bulk" + parallel_bulk = "django_elasticsearch_dsl.documents.parallel_bulk" + with patch(bulk) as mock_bulk, patch(parallel_bulk) as mock_parallel_bulk: + doc.update([car3, car2, car3]) self.assertEqual( - 3, len(list(mock.call_args_list[0][1]['actions'])) + 3, len(list(mock_bulk.call_args_list[0][1]['actions'])) ) + self.assertEqual(mock_bulk.call_count, 1, "bulk is called") + self.assertEqual(mock_parallel_bulk.call_count, 0, "parallel bulk is not called") + + def test_model_instance_iterable_update_with_parallel(self): + class CarDocument2(DocType): + class Django: + model = Car + + doc = CarDocument() + car1 = Car() + car2 = Car() + car3 = Car() + bulk = "django_elasticsearch_dsl.documents.bulk" + parallel_bulk = "django_elasticsearch_dsl.documents.parallel_bulk" + with patch(bulk) as mock_bulk, patch(parallel_bulk) as mock_parallel_bulk: + doc.update([car1, car2, car3], parallel=True) + self.assertEqual(mock_bulk.call_count, 0, "bulk is not called") + self.assertEqual(mock_parallel_bulk.call_count, 1, "parallel bulk is called") + + def test_init_prepare_correct(self): + """Does init_prepare() run and collect the right preparation functions?""" + + d = CarDocument() + self.assertEqual(len(d._prepared_fields), 4) + + expect = { + 'color': ("", + ("", "")), # py3, py2 + 'type': ("", + ("","")), + 'name': ("", + ("","")), + 'price': ("", + ("","")), + } + + for name, field, prep in d._prepared_fields: + e = expect[name] + self.assertEqual(str(type(field)), e[0], 'field type should be copied over') + self.assertTrue('__call__' in dir(prep), 'prep function should be callable') + self.assertTrue(str(type(prep)) in e[1], 'prep function is correct partial or method') + + def test_init_prepare_results(self): + """Are the results from init_prepare() actually used in prepare()?""" + d = CarDocument() + + car = Car() + setattr(car, 'name', "Tusla") + setattr(car, 'price', 340123.21) + setattr(car, 'color', "polka-dots") # Overwritten by prepare function + setattr(car, 'pk', 4701) # Ignored, not in document + setattr(car, 'type', "imaginary") + + self.assertEqual(d.prepare(car), {'color': 'blue', 'type': 'imaginary', 'name': 'Tusla', 'price': 340123.21}) + + m = Mock() + # This will blow up should we access _fields and try to iterate over it. + # Since init_prepare compiles a list of prepare functions, while + # preparing no access to _fields should happen + with patch.object(CarDocument, '_fields', 33): + d.prepare(m) + self.assertEqual(sorted([tuple(x) for x in m.method_calls], key=lambda _: _[0]), + [('name', (), {}), ('price', (), {}), ('type', (), {})] + ) diff --git a/tests/test_integration.py b/tests/test_integration.py index 2d65cf52..7ccb60ff 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -17,7 +17,6 @@ car_index, CarDocument, CarWithPrepareDocument, - PaginatedAdDocument, ManufacturerDocument, index_settings ) @@ -337,18 +336,19 @@ def test_to_queryset(self): self.assertEqual(qs.count(), 2) self.assertEqual(list(qs), [self.ad2, self.ad1]) - def test_queryset_pagination(self): + def test_queryset_iterator_queries(self): ad3 = Ad(title="Ad 3", car=self.car1) ad3.save() with self.assertNumQueries(1): AdDocument().update(Ad.objects.all()) - doc = PaginatedAdDocument() + doc = AdDocument() - with self.assertNumQueries(3): + with self.assertNumQueries(1): doc.update(Ad.objects.all().order_by('-id')) self.assertEqual( set(int(instance.meta.id) for instance in doc.search().query('match', title="Ad")), set([ad3.pk, self.ad1.pk, self.ad2.pk]) ) + diff --git a/tox.ini b/tox.ini index 013f62c6..906ef578 100644 --- a/tox.ini +++ b/tox.ini @@ -1,10 +1,10 @@ [tox] envlist = - {py27,py36,py37}-django-110-{es6} - {py27,py36,py37}-django-111-{es6} - {py36,py37}-django-2-{es6} - {py36,py37}-django-21-{es6} - {py36,py37}-django-22-{es6} + {py27,py36,py37}-django-110-{es7} + {py27,py36,py37}-django-111-{es7} + {py36,py37}-django-2-{es7} + {py36,py37}-django-21-{es7} + {py36,py37}-django-22-{es7} [testenv] setenv = @@ -19,6 +19,7 @@ deps = django-21: Django>=2.1,<2.2 django-22: Django>=2.2,<2.3 es6: elasticsearch-dsl>=6.4.0,<7.0.0 + es7: elasticsearch-dsl>=7,<8 -r{toxinidir}/requirements_test.txt basepython =