From b4920c9520a3a511214f0b1bec2782380f59789a Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Thu, 17 Oct 2019 20:41:09 +0200 Subject: [PATCH 01/23] Use elasticsearch's parallel_bulk for indexing, add ELASTICSEARCH_DSL_PARALLEL default setting and parameters to management command. Use qs.iterator() for fetching data during reindex, as this is much more memory efficient and performant. Instead of finding out which methods to call to prepare fields, do that finagling once and cache it for subsequent model instance prepares. See issue #154 for performance analysis and details. --- README.rst | 13 ++- django_elasticsearch_dsl/documents.py | 85 +++++++++++++------ .../management/commands/search_index.py | 25 +++++- 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/README.rst b/README.rst index 8dc42b8c..b4d2ba51 100644 --- a/README.rst +++ b/README.rst @@ -131,10 +131,12 @@ It required to defined ``Document`` class in ``documents.py`` in your app direc # Ignore auto updating of Elasticsearch when a model is saved # or deleted: # ignore_signals = True + # Don't perform an index refresh after every update (overrides global setting): # auto_refresh = False + # Paginate the django queryset used to populate the index with the specified size - # (by default there is no pagination) + # (by default it uses the database driver's default setting) # queryset_pagination = 5000 @@ -551,6 +553,15 @@ Defaults to ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor``. You could, for instance, make a ``CelerySignalProcessor`` which would add update jobs to the queue to for delayed processing. +ELASTICSEARCH_DSL_PARALLEL +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Default: ``False`` + +Run indexing (populate and rebuild) in parallel using ES' parallel_bulk() method. +Note that some databases (e.g. sqlite) do not play well with this option. + + Testing ------- diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index 753e6fc7..c3c55bbd 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -1,9 +1,13 @@ from __future__ import unicode_literals +from collections import deque +from copy import deepcopy +from functools import partial + from django.core.paginator import Paginator from django.db import models from django.utils.six import iteritems -from elasticsearch.helpers import bulk +from elasticsearch.helpers import bulk, parallel_bulk from elasticsearch_dsl import Document as DSLDocument from .exceptions import ModelFieldNotMappedError @@ -45,7 +49,6 @@ models.URLField: TextField, } - class DocType(DSLDocument): def __init__(self, related_instance_to_ignore=None, **kwargs): super(DocType, self).__init__(**kwargs) @@ -73,36 +76,54 @@ def get_queryset(self): primary_key_field_name = self.django.model._meta.pk.name return self.django.model._default_manager.all().order_by(primary_key_field_name) - def prepare(self, instance): + def get_indexing_queryset(self): + qs = self.get_queryset() + kwargs = {} + if self.django.queryset_pagination: + kwargs = {'chunk_size': self.django.queryset_pagination} + return qs.iterator(**kwargs) + + def init_prepare(self): """ - Take a model instance, and turn it into a dict that can be serialized - based on the fields defined on this DocType subclass + Initialise the data model preparers once here. Extracts the preparers + from the model and generate a list of callables to avoid doing that + work on every object instance over. """ - data = {} + fields = [] for name, field in iteritems(self._fields): if not isinstance(field, DEDField): continue - if field._path == []: + if not field._path: field._path = [name] prep_func = getattr(self, 'prepare_%s_with_related' % name, None) if prep_func: - field_value = prep_func( - instance, - related_to_ignore=self._related_instance_to_ignore - ) + fn = partial(prep_func, related_to_ignore=self._related_instance_to_ignore) else: prep_func = getattr(self, 'prepare_%s' % name, None) if prep_func: - field_value = prep_func(instance) + fn = prep_func else: - field_value = field.get_value_from_instance( - instance, self._related_instance_to_ignore - ) + fn = partial(field.get_value_from_instance, field_value_to_ignore=self._related_instance_to_ignore) + + fields.append((name, field, fn)) - data[name] = field_value + self._doc_type._prepared_fields = fields + def prepare(self, instance): + """ + Take a model instance, and turn it into a dict that can be serialized + based on the fields defined on this DocType subclass + """ + if getattr(self._doc_type, '_prepared_fields', None) is None: + self.init_prepare() + + data = { + name: prep_func(instance) + for name, field, prep_func in self._doc_type._prepared_fields + } + # print("-> %s" % data) return data @classmethod @@ -124,6 +145,12 @@ def to_field(cls, field_name, model_field): def bulk(self, actions, **kwargs): return bulk(client=self._get_connection(), actions=actions, **kwargs) + def parallel_bulk(self, actions, **kwargs): + deque(parallel_bulk(client=self._get_connection(), actions=actions, **kwargs), maxlen=0) + # Fake return value to emulate bulk() since we don't have a result yet, + # the result is currently not used upstream anyway. + return (1, []) + def _prepare_action(self, object_instance, action): return { '_op_type': action, @@ -135,18 +162,18 @@ def _prepare_action(self, object_instance, action): } def _get_actions(self, object_list, action): - if self.django.queryset_pagination is not None: - paginator = Paginator( - object_list, self.django.queryset_pagination - ) - for page in paginator.page_range: - for object_instance in paginator.page(page).object_list: - yield self._prepare_action(object_instance, action) + for object_instance in object_list: + yield self._prepare_action(object_instance, action) + + def _bulk(self, *args, **kwargs): + """Helper for switching between normal and parallel bulk operation""" + parallel = kwargs.pop('parallel', False) + if parallel: + return self.parallel_bulk(*args, **kwargs) else: - for object_instance in object_list: - yield self._prepare_action(object_instance, action) + return self.bulk(*args, **kwargs) - def update(self, thing, refresh=None, action='index', **kwargs): + def update(self, thing, refresh=None, action='index', parallel=False, **kwargs): """ Update each document in ES for a model, iterable of models or queryset """ @@ -160,8 +187,10 @@ def update(self, thing, refresh=None, action='index', **kwargs): else: object_list = thing - return self.bulk( - self._get_actions(object_list, action), **kwargs + return self._bulk( + self._get_actions(object_list, action), + parallel=parallel, + **kwargs ) diff --git a/django_elasticsearch_dsl/management/commands/search_index.py b/django_elasticsearch_dsl/management/commands/search_index.py index c73a87cd..38596039 100644 --- a/django_elasticsearch_dsl/management/commands/search_index.py +++ b/django_elasticsearch_dsl/management/commands/search_index.py @@ -1,4 +1,6 @@ from __future__ import unicode_literals, absolute_import + +from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.utils.six.moves import input from ...registries import registry @@ -49,6 +51,19 @@ def add_arguments(self, parser): dest='force', help="Force operations without asking" ) + parser.add_argument( + '--parallel', + action='store_true', + dest='parallel', + help='Run populate/rebuild update multi threaded' + ) + parser.add_argument( + '--no-parallel', + action='store_false', + dest='parallel', + help='Run populate/rebuild update single threaded' + ) + parser.set_defaults(parallel=getattr(settings, 'ELASTICSEARCH_DSL_PARALLEL', False)) def _get_models(self, args): """ @@ -84,12 +99,14 @@ def _create(self, models, options): index.create() def _populate(self, models, options): + parallel = options['parallel'] for doc in registry.get_documents(models): - qs = doc().get_queryset() - self.stdout.write("Indexing {} '{}' objects".format( - qs.count(), doc.django.model.__name__) + self.stdout.write("Indexing {} '{}' objects {}".format( + doc().get_queryset().count(), doc.django.model.__name__, + "(parallel)" if parallel else "") ) - doc().update(qs) + qs = doc().get_indexing_queryset() + doc().update(qs, parallel=parallel) def _delete(self, models, options): index_names = [index._name for index in registry.get_indices(models)] From e5cd2df9b02dde7990542b9f0f9d9b378515464f Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Thu, 17 Oct 2019 23:46:46 +0200 Subject: [PATCH 02/23] Move collection of prepare functions to __init__, where it's conceptually cleaner. Also shaves off a test per object. --- django_elasticsearch_dsl/documents.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index c3c55bbd..9feccbf2 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -53,6 +53,8 @@ class DocType(DSLDocument): def __init__(self, related_instance_to_ignore=None, **kwargs): super(DocType, self).__init__(**kwargs) self._related_instance_to_ignore = related_instance_to_ignore + if getattr(self._doc_type, '_prepared_fields', None) is None: + self._doc_type._prepared_fields = self.init_prepare() def __eq__(self, other): return id(self) == id(other) @@ -109,16 +111,13 @@ def init_prepare(self): fields.append((name, field, fn)) - self._doc_type._prepared_fields = fields + return fields def prepare(self, instance): """ Take a model instance, and turn it into a dict that can be serialized based on the fields defined on this DocType subclass """ - if getattr(self._doc_type, '_prepared_fields', None) is None: - self.init_prepare() - data = { name: prep_func(instance) for name, field, prep_func in self._doc_type._prepared_fields From 861f7045b9af830feff6293607369e998d26fe01 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 18 Oct 2019 00:25:23 +0200 Subject: [PATCH 03/23] Minor cleanup: Move prepare cache to Document object instead of Model, as it's conceptually possible to have several indices on the same model. Also remove forced ordering that is a remnant of earlier code. --- django_elasticsearch_dsl/documents.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index 9feccbf2..7981b0f4 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -53,8 +53,7 @@ class DocType(DSLDocument): def __init__(self, related_instance_to_ignore=None, **kwargs): super(DocType, self).__init__(**kwargs) self._related_instance_to_ignore = related_instance_to_ignore - if getattr(self._doc_type, '_prepared_fields', None) is None: - self._doc_type._prepared_fields = self.init_prepare() + self._prepared_fields = self.init_prepare() def __eq__(self, other): return id(self) == id(other) @@ -75,10 +74,12 @@ def get_queryset(self): """ Return the queryset that should be indexed by this doc type. """ - primary_key_field_name = self.django.model._meta.pk.name - return self.django.model._default_manager.all().order_by(primary_key_field_name) + return self.django.model._default_manager.all() def get_indexing_queryset(self): + """ + Build queryset (iterator) for use by indexing. + """ qs = self.get_queryset() kwargs = {} if self.django.queryset_pagination: @@ -120,7 +121,7 @@ def prepare(self, instance): """ data = { name: prep_func(instance) - for name, field, prep_func in self._doc_type._prepared_fields + for name, field, prep_func in self._prepared_fields } # print("-> %s" % data) return data From 827800d8700ddbb396ef5cddea3902428bf210ad Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 18 Oct 2019 10:10:43 +0200 Subject: [PATCH 04/23] chunk_size parameter for queryset.iterator() appeared in Django 2 --- django_elasticsearch_dsl/documents.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index 7981b0f4..5249a4c5 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -4,6 +4,7 @@ from copy import deepcopy from functools import partial +from django import VERSION as DJANGO_VERSION from django.core.paginator import Paginator from django.db import models from django.utils.six import iteritems @@ -82,7 +83,7 @@ def get_indexing_queryset(self): """ qs = self.get_queryset() kwargs = {} - if self.django.queryset_pagination: + if DJANGO_VERSION >= (2,) and self.django.queryset_pagination: kwargs = {'chunk_size': self.django.queryset_pagination} return qs.iterator(**kwargs) From 1773ed65647a5f8527bff8c3e4f408d3b2c7a4fd Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 18 Oct 2019 10:46:38 +0200 Subject: [PATCH 05/23] Do not crash in init_prepare when no fields have been defined --- django_elasticsearch_dsl/documents.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index 5249a4c5..f7c83600 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -93,8 +93,9 @@ def init_prepare(self): from the model and generate a list of callables to avoid doing that work on every object instance over. """ + index_fields = getattr(self, '_fields', {}) fields = [] - for name, field in iteritems(self._fields): + for name, field in iteritems(index_fields): if not isinstance(field, DEDField): continue From acda7e7e612b7b91fb3df279b44b6abdcd750f49 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 18 Oct 2019 11:23:57 +0200 Subject: [PATCH 06/23] Crank up diff size to see what is going on --- tests/test_integration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_integration.py b/tests/test_integration.py index 2d65cf52..7dcef5c4 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -135,6 +135,7 @@ def test_get_doc_with_many_to_many_relationships(self): ]) def test_doc_to_dict(self): + self.maxDiff = None # XXX: mjl temporary s = CarDocument.search().query("match", name=self.car2.name) result = s.execute() self.assertEqual(len(result), 1) From 827974696efc13e1d9dbbff6ff6ae56177097590 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 18 Oct 2019 11:25:01 +0200 Subject: [PATCH 07/23] Adapt test to changed call pattern --- tests/test_commands.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 2267d75c..bd8f2d9b 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -112,7 +112,8 @@ def test_create_all_indices(self): def test_populate_all_doc_type(self): call_command('search_index', stdout=self.out, action='populate') - self.doc_a1.get_queryset.assert_called_once() + # One call for "Indexing NNN documents", one for indexing itself (via get_index_queryset). + assert self.doc_a1.get_queryset.call_count == 2 self.doc_a1.update.assert_called_once_with(self.doc_a1_qs) self.doc_a2.get_queryset.assert_called_once() self.doc_a2.update.assert_called_once_with(self.doc_a2_qs) From c4f230d0339857c0570eec611cd5c1a61a86498e Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 18 Oct 2019 11:52:32 +0200 Subject: [PATCH 08/23] Adapt tests to changed call patterns --- tests/test_commands.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index bd8f2d9b..9b66fa38 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -114,13 +114,13 @@ def test_populate_all_doc_type(self): call_command('search_index', stdout=self.out, action='populate') # One call for "Indexing NNN documents", one for indexing itself (via get_index_queryset). assert self.doc_a1.get_queryset.call_count == 2 - self.doc_a1.update.assert_called_once_with(self.doc_a1_qs) - self.doc_a2.get_queryset.assert_called_once() - self.doc_a2.update.assert_called_once_with(self.doc_a2_qs) - self.doc_b1.get_queryset.assert_called_once() - self.doc_b1.update.assert_called_once_with(self.doc_b1_qs) - self.doc_c1.get_queryset.assert_called_once() - self.doc_c1.update.assert_called_once_with(self.doc_c1_qs) + self.doc_a1.update.assert_called_once_with(self.doc_a1_qs.iterator(), parallel=False) + assert self.doc_a2.get_queryset.call_count == 2 + self.doc_a2.update.assert_called_once_with(self.doc_a2_qs.iterator(), parallel=False) + assert self.doc_b1.get_queryset.call_count == 2 + self.doc_b1.update.assert_called_once_with(self.doc_b1_qs.iterator(), parallel=False) + assert self.doc_c1.get_queryset.call_count == 2 + self.doc_c1.update.assert_called_once_with(self.doc_c1_qs.iterator(), parallel=False) def test_rebuild_indices(self): From 1dcc0138cd98a5d0b71205cc04c11775b266dc3b Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 18 Oct 2019 13:32:55 +0200 Subject: [PATCH 09/23] Mark pagination test as expected failure for now. --- tests/test_integration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_integration.py b/tests/test_integration.py index 7dcef5c4..842dd965 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -338,6 +338,7 @@ def test_to_queryset(self): self.assertEqual(qs.count(), 2) self.assertEqual(list(qs), [self.ad2, self.ad1]) + @unittest.expectedFailure # ripping out pagination made this fail (mjl) def test_queryset_pagination(self): ad3 = Ad(title="Ad 3", car=self.car1) ad3.save() From 505406ed0bf720c18f5458ca61984dba01fe7fbe Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 18 Oct 2019 13:35:16 +0200 Subject: [PATCH 10/23] Define _prepared_fields as attribute in class so to_dict() won't pick it up as document field --- django_elasticsearch_dsl/documents.py | 1 + 1 file changed, 1 insertion(+) diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index f7c83600..82b5fcc3 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -51,6 +51,7 @@ } class DocType(DSLDocument): + _prepared_fields = [] def __init__(self, related_instance_to_ignore=None, **kwargs): super(DocType, self).__init__(**kwargs) self._related_instance_to_ignore = related_instance_to_ignore From 93f7d7ca008815a1f4ca9e58dd6922d5c9ee692e Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Mon, 21 Oct 2019 10:46:52 +0200 Subject: [PATCH 11/23] remove debugging --- tests/test_integration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_integration.py b/tests/test_integration.py index 842dd965..c3033895 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -135,7 +135,6 @@ def test_get_doc_with_many_to_many_relationships(self): ]) def test_doc_to_dict(self): - self.maxDiff = None # XXX: mjl temporary s = CarDocument.search().query("match", name=self.car2.name) result = s.execute() self.assertEqual(len(result), 1) From c4a8a24d64d59dbb66684a0ad69601cedccb9002 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Thu, 24 Oct 2019 10:38:18 +0200 Subject: [PATCH 12/23] Add parameter no to do a count(*) before indexing, as for complex querysets that might be expensive. --- .../management/commands/search_index.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/django_elasticsearch_dsl/management/commands/search_index.py b/django_elasticsearch_dsl/management/commands/search_index.py index 38596039..8898928a 100644 --- a/django_elasticsearch_dsl/management/commands/search_index.py +++ b/django_elasticsearch_dsl/management/commands/search_index.py @@ -64,6 +64,13 @@ def add_arguments(self, parser): help='Run populate/rebuild update single threaded' ) parser.set_defaults(parallel=getattr(settings, 'ELASTICSEARCH_DSL_PARALLEL', False)) + parser.add_argument( + '--no-count', + action='store_false', + default=True, + dest='count', + help='Do not include a total count in the summary log line' + ) def _get_models(self, args): """ @@ -102,7 +109,8 @@ def _populate(self, models, options): parallel = options['parallel'] for doc in registry.get_documents(models): self.stdout.write("Indexing {} '{}' objects {}".format( - doc().get_queryset().count(), doc.django.model.__name__, + doc().get_queryset().count() if options['count'] else "all", + doc.django.model.__name__, "(parallel)" if parallel else "") ) qs = doc().get_indexing_queryset() From 2d2e996b10a44cdb334a92b06ff7acd981feb53e Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Fri, 25 Oct 2019 14:39:18 +0600 Subject: [PATCH 13/23] Fixing example application --- example/test_app/documents.py | 128 +++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 58 deletions(-) diff --git a/example/test_app/documents.py b/example/test_app/documents.py index 1e7e1dfe..e1e449d9 100644 --- a/example/test_app/documents.py +++ b/example/test_app/documents.py @@ -1,7 +1,8 @@ from elasticsearch_dsl import analyzer -from django_elasticsearch_dsl import DocType, Index, fields +from django_elasticsearch_dsl import Document, Index, fields +from django_elasticsearch_dsl.registries import registry -from .models import Ad, Category, Car, Manufacturer +from .models import Ad, Car, Manufacturer car = Index('test_cars') @@ -19,8 +20,8 @@ ) -@car.doc_type -class CarDocument(DocType): +@registry.register_document +class CarDocument(Document): manufacturer = fields.ObjectField(properties={ 'name': fields.TextField(), 'country': fields.TextField(), @@ -37,19 +38,16 @@ class CarDocument(DocType): 'title': fields.TextField(), }) - class Meta: + class Django: model = Car - related_models = [Ad, Manufacturer, Category] fields = [ 'name', 'launched', 'type', ] - def get_queryset(self): - return super(CarDocument, self).get_queryset().select_related( - 'manufacturer' - ) + class Index: + name = "car" def get_instances_from_related(self, related_instance): if isinstance(related_instance, Ad): @@ -59,11 +57,11 @@ def get_instances_from_related(self, related_instance): return related_instance.car_set.all() -@car.doc_type -class ManufacturerDocument(DocType): +@registry.register_document +class ManufacturerDocument(Document): country = fields.TextField() - class Meta: + class Django: model = Manufacturer fields = [ 'name', @@ -72,54 +70,61 @@ class Meta: 'logo', ] - -class CarWithPrepareDocument(DocType): - manufacturer = fields.ObjectField(properties={ - 'name': fields.TextField(), - 'country': fields.TextField(), - }) - - manufacturer_short = fields.ObjectField(properties={ - 'name': fields.TextField(), - }) - - class Meta: - model = Car - related_models = [Manufacturer] - index = 'car_with_prepare_index' - fields = [ - 'name', - 'launched', - 'type', - ] - - def prepare_manufacturer_with_related(self, car, related_to_ignore): - if (car.manufacturer is not None and car.manufacturer != - related_to_ignore): - return { - 'name': car.manufacturer.name, - 'country': car.manufacturer.country(), - } - return {} - - def prepare_manufacturer_short(self, car): - if car.manufacturer is not None: - return { - 'name': car.manufacturer.name, - } - return {} - - def get_instances_from_related(self, related_instance): - return related_instance.car_set.all() - - -class AdDocument(DocType): + class Index: + name = "manufacturer" + + +# @registry.register_document +# class CarWithPrepareDocument(Document): +# manufacturer = fields.ObjectField(properties={ +# 'name': fields.TextField(), +# 'country': fields.TextField(), +# }) +# +# manufacturer_short = fields.ObjectField(properties={ +# 'name': fields.TextField(), +# }) +# +# class Index: +# name = "car_with_prepare_index" +# +# class Django: +# model = Car +# related_models = [Manufacturer] +# fields = [ +# 'name', +# 'launched', +# 'type', +# ] +# +# def prepare_manufacturer_with_related(self, car, related_to_ignore): +# if (car.manufacturer is not None and car.manufacturer != +# related_to_ignore): +# return { +# 'name': car.manufacturer.name, +# 'country': car.manufacturer.country(), +# } +# return {} +# +# def prepare_manufacturer_short(self, car): +# if car.manufacturer is not None: +# return { +# 'name': car.manufacturer.name, +# } +# return {} +# +# def get_instances_from_related(self, related_instance): +# return related_instance.car_set.all() + + +@registry.register_document +class AdDocument(Document): description = fields.TextField( analyzer=html_strip, fields={'raw': fields.KeywordField()} ) - class Meta: + class Django: model = Ad index = 'test_ads' fields = [ @@ -129,14 +134,21 @@ class Meta: 'url', ] + class Index: + name = "add" -class AdDocument2(DocType): + +@registry.register_document +class AdDocument2(Document): def __init__(self, *args, **kwargs): super(AdDocument2, self).__init__(*args, **kwargs) - class Meta: + class Django: model = Ad index = 'test_ads2' fields = [ 'title', ] + + class Index: + name = "add2" From 3b690090dffa28812ec7536d24559024dc4db288 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 25 Oct 2019 15:08:38 +0200 Subject: [PATCH 14/23] Correctly clean up after test run (delete indices with the right name). --- django_elasticsearch_dsl/test/testcases.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/django_elasticsearch_dsl/test/testcases.py b/django_elasticsearch_dsl/test/testcases.py index 83ea3401..ba29aaf3 100644 --- a/django_elasticsearch_dsl/test/testcases.py +++ b/django_elasticsearch_dsl/test/testcases.py @@ -19,11 +19,11 @@ def setUp(self): def tearDown(self): pattern = re.compile(self._index_suffixe + '$') - for doc in registry.get_documents(): - doc._index._name = pattern.sub('', doc._index._name) - for index in registry.get_indices(): index.delete(ignore=[404, 400]) index._name = pattern.sub('', index._name) + for doc in registry.get_documents(): + doc._index._name = pattern.sub('', doc._index._name) + super(ESTestCase, self).tearDown() From f613926a608e07ad7efb8984df4d2a24c659431f Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 25 Oct 2019 15:09:46 +0200 Subject: [PATCH 15/23] Remove paginator test. Add tests for usage of init_prepare() and _prepared_fields. Add tests for correct calling of bulk/parallel_bulk. --- tests/documents.py | 20 ------------ tests/test_documents.py | 64 +++++++++++++++++++++++++++++++++++++-- tests/test_integration.py | 9 +++--- 3 files changed, 65 insertions(+), 28 deletions(-) diff --git a/tests/documents.py b/tests/documents.py index 04e0ebfb..77e3668a 100644 --- a/tests/documents.py +++ b/tests/documents.py @@ -148,25 +148,5 @@ class Index: settings = index_settings -@registry.register_document -class PaginatedAdDocument(Document): - - class Django: - model = Ad - queryset_pagination = 2 - fields = [ - 'title', - 'created', - 'modified', - 'url', - ] - - class Index: - name = 'ad_index' - - def get_queryset(self): - return Ad.objects.all().order_by('-id') - - ad_index = AdDocument._index car_index = CarDocument._index diff --git a/tests/test_documents.py b/tests/test_documents.py index d6e4e618..587d9dc0 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -3,7 +3,7 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from elasticsearch_dsl import GeoPoint, MetaField -from mock import patch +from mock import patch, Mock, PropertyMock from django_elasticsearch_dsl import fields from django_elasticsearch_dsl.documents import DocType @@ -274,8 +274,66 @@ class Django: car1 = Car() car2 = Car() car3 = Car() - with patch('django_elasticsearch_dsl.documents.bulk') as mock: + with patch('django_elasticsearch_dsl.documents.bulk') as mock_bulk, \ + patch('django_elasticsearch_dsl.documents.parallel_bulk') as mock_parallel_bulk: doc.update([car1, car2, car3]) self.assertEqual( - 3, len(list(mock.call_args_list[0][1]['actions'])) + 3, len(list(mock_bulk.call_args_list[0][1]['actions'])) ) + self.assertEqual(mock_bulk.call_count, 1, "bulk is no called") + self.assertEqual(mock_parallel_bulk.call_count, 0, "parallel bulk is no called") + + def test_model_instance_iterable_update_with_parallel(self): + class CarDocument2(DocType): + class Django: + model = Car + + doc = CarDocument() + car1 = Car() + car2 = Car() + car3 = Car() + with patch('django_elasticsearch_dsl.documents.bulk') as mock_bulk, \ + patch('django_elasticsearch_dsl.documents.parallel_bulk') as mock_parallel_bulk: + doc.update([car1, car2, car3], parallel=True) + self.assertEqual(mock_bulk.call_count, 0, "bulk is not called") + self.assertEqual(mock_parallel_bulk.call_count, 1, "parallel bulk is called") + + def test_init_prepare_correct(self): + """Does init_prepare() run and collect the right preparation functions?""" + + d = CarDocument() + self.assertEqual(len(d._prepared_fields), 4) + + expect = { + 'color': ("", ""), + 'type': ("", ""), + 'name': ("", ""), + 'price': ("", ""), + } + + for name, field, prep in d._prepared_fields: + e = expect[name] + self.assertEqual(str(type(field)), e[0], 'field type should be copied over') + self.assertTrue('__call__' in dir(prep), 'prep function should be callable') + self.assertEqual(str(type(prep)), e[1], 'prep function is correct partial or method') + + def test_init_prepare_results(self): + """Are the results from init_prepare() actually used in prepare()?""" + d = CarDocument() + + car = Car() + setattr(car, 'name', "Tusla") + setattr(car, 'price', 340123.21) + setattr(car, 'color', "polka-dots") # Overwritten by prepare function + setattr(car, 'pk', 4701) # Ignored, not in document + setattr(car, 'type', "imaginary") + + self.assertEqual(d.prepare(car), {'color': 'blue', 'type': 'imaginary', 'name': 'Tusla', 'price': 340123.21}) + + m = Mock() + # This will blow up should we access _fields and try to iterate over it. + # Since init_prepare compiles a list of prepare functions, while + # preparing no access to _fields should happen + with patch.object(CarDocument, '_fields', 33): + d.prepare(m) + self.assertEqual([tuple(x) for x in m.method_calls], [('type', (), {}), ('name', (), {}), ('price', (), {})]) diff --git a/tests/test_integration.py b/tests/test_integration.py index c3033895..7ccb60ff 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -17,7 +17,6 @@ car_index, CarDocument, CarWithPrepareDocument, - PaginatedAdDocument, ManufacturerDocument, index_settings ) @@ -337,19 +336,19 @@ def test_to_queryset(self): self.assertEqual(qs.count(), 2) self.assertEqual(list(qs), [self.ad2, self.ad1]) - @unittest.expectedFailure # ripping out pagination made this fail (mjl) - def test_queryset_pagination(self): + def test_queryset_iterator_queries(self): ad3 = Ad(title="Ad 3", car=self.car1) ad3.save() with self.assertNumQueries(1): AdDocument().update(Ad.objects.all()) - doc = PaginatedAdDocument() + doc = AdDocument() - with self.assertNumQueries(3): + with self.assertNumQueries(1): doc.update(Ad.objects.all().order_by('-id')) self.assertEqual( set(int(instance.meta.id) for instance in doc.search().query('match', title="Ad")), set([ad3.pk, self.ad1.pk, self.ad2.pk]) ) + From 51360c3f02cb947d569835198af87f78e8598c71 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 25 Oct 2019 15:21:23 +0200 Subject: [PATCH 16/23] Make sure we compare w/ stable order --- tests/test_documents.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_documents.py b/tests/test_documents.py index 587d9dc0..61d03db0 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -336,4 +336,6 @@ def test_init_prepare_results(self): # preparing no access to _fields should happen with patch.object(CarDocument, '_fields', 33): d.prepare(m) - self.assertEqual([tuple(x) for x in m.method_calls], [('type', (), {}), ('name', (), {}), ('price', (), {})]) + self.assertEqual(sorted([tuple(x) for x in m.method_calls], key=lambda _: _[0]), + [('name', (), {}), ('price', (), {}), ('type', (), {})] + ) From 475e8a896270ac0402bfc092c0a157b4a9d70295 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 25 Oct 2019 15:36:17 +0200 Subject: [PATCH 17/23] Adjust for different types for methods/partials in py2 --- tests/test_documents.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/test_documents.py b/tests/test_documents.py index 61d03db0..6fbd5d83 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -305,17 +305,21 @@ def test_init_prepare_correct(self): self.assertEqual(len(d._prepared_fields), 4) expect = { - 'color': ("", ""), - 'type': ("", ""), - 'name': ("", ""), - 'price': ("", ""), + 'color': ("", + ("", "")), # py3, py2 + 'type': ("", + ("","")), + 'name': ("", + ("","")), + 'price': ("", + ("","")), } for name, field, prep in d._prepared_fields: e = expect[name] self.assertEqual(str(type(field)), e[0], 'field type should be copied over') self.assertTrue('__call__' in dir(prep), 'prep function should be callable') - self.assertEqual(str(type(prep)), e[1], 'prep function is correct partial or method') + self.assertTrue(str(type(prep)) in e[1], 'prep function is correct partial or method') def test_init_prepare_results(self): """Are the results from init_prepare() actually used in prepare()?""" From 45f62e790c70d2bfadea6c7fe82faef0e21a8543 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Fri, 25 Oct 2019 15:41:45 +0200 Subject: [PATCH 18/23] Correct es dependency (was conflicting with requirements.txt) --- tox.ini | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tox.ini b/tox.ini index 013f62c6..906ef578 100644 --- a/tox.ini +++ b/tox.ini @@ -1,10 +1,10 @@ [tox] envlist = - {py27,py36,py37}-django-110-{es6} - {py27,py36,py37}-django-111-{es6} - {py36,py37}-django-2-{es6} - {py36,py37}-django-21-{es6} - {py36,py37}-django-22-{es6} + {py27,py36,py37}-django-110-{es7} + {py27,py36,py37}-django-111-{es7} + {py36,py37}-django-2-{es7} + {py36,py37}-django-21-{es7} + {py36,py37}-django-22-{es7} [testenv] setenv = @@ -19,6 +19,7 @@ deps = django-21: Django>=2.1,<2.2 django-22: Django>=2.2,<2.3 es6: elasticsearch-dsl>=6.4.0,<7.0.0 + es7: elasticsearch-dsl>=7,<8 -r{toxinidir}/requirements_test.txt basepython = From c933fb7eef703c7fe1da0480613cade034aef587 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Mon, 28 Oct 2019 10:38:58 +0100 Subject: [PATCH 19/23] Pass queryset_pagination as chunk_size into parallel_bulk too. --- django_elasticsearch_dsl/documents.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index 82b5fcc3..1e26ba8f 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -5,7 +5,6 @@ from functools import partial from django import VERSION as DJANGO_VERSION -from django.core.paginator import Paginator from django.db import models from django.utils.six import iteritems from elasticsearch.helpers import bulk, parallel_bulk @@ -149,6 +148,8 @@ def bulk(self, actions, **kwargs): return bulk(client=self._get_connection(), actions=actions, **kwargs) def parallel_bulk(self, actions, **kwargs): + if self.django.queryset_pagination and 'chunk_size' not in kwargs: + kwargs['chunk_size'] = self.django.queryset_pagination deque(parallel_bulk(client=self._get_connection(), actions=actions, **kwargs), maxlen=0) # Fake return value to emulate bulk() since we don't have a result yet, # the result is currently not used upstream anyway. From 96c3883e11a1a3754e490a9895886e834752da86 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Mon, 28 Oct 2019 10:42:33 +0100 Subject: [PATCH 20/23] Add explanation why we use deque() --- django_elasticsearch_dsl/documents.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index 1e26ba8f..b8977049 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -150,7 +150,10 @@ def bulk(self, actions, **kwargs): def parallel_bulk(self, actions, **kwargs): if self.django.queryset_pagination and 'chunk_size' not in kwargs: kwargs['chunk_size'] = self.django.queryset_pagination - deque(parallel_bulk(client=self._get_connection(), actions=actions, **kwargs), maxlen=0) + bulk_actions = parallel_bulk(client=self._get_connection(), actions=actions, **kwargs) + # As the `parallel_bulk` is lazy, we need to get it into `deque` to run it instantly + # See https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/2 + deque(bulk_actions, maxlen=0) # Fake return value to emulate bulk() since we don't have a result yet, # the result is currently not used upstream anyway. return (1, []) From 9c138d6e4d71f7411e1b393f9fb0c55d84825038 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Mon, 28 Oct 2019 10:49:03 +0100 Subject: [PATCH 21/23] Correct typo in explanation of test --- tests/test_documents.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_documents.py b/tests/test_documents.py index 6fbd5d83..4dd53ebb 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -280,8 +280,8 @@ class Django: self.assertEqual( 3, len(list(mock_bulk.call_args_list[0][1]['actions'])) ) - self.assertEqual(mock_bulk.call_count, 1, "bulk is no called") - self.assertEqual(mock_parallel_bulk.call_count, 0, "parallel bulk is no called") + self.assertEqual(mock_bulk.call_count, 1, "bulk is called") + self.assertEqual(mock_parallel_bulk.call_count, 0, "parallel bulk is not called") def test_model_instance_iterable_update_with_parallel(self): class CarDocument2(DocType): From 9ec41a0ced24cd9a0d4ebf4818d8580c2c2846e0 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Mon, 28 Oct 2019 11:15:17 +0100 Subject: [PATCH 22/23] Remove leftover instrumentation print --- django_elasticsearch_dsl/documents.py | 1 - 1 file changed, 1 deletion(-) diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index b8977049..719ec9c1 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -125,7 +125,6 @@ def prepare(self, instance): name: prep_func(instance) for name, field, prep_func in self._prepared_fields } - # print("-> %s" % data) return data @classmethod From 5086396f8672529e35746aafb3f7123871e70b83 Mon Sep 17 00:00:00 2001 From: "Martin J. Laubach" Date: Mon, 28 Oct 2019 11:38:07 +0100 Subject: [PATCH 23/23] Better formatting to avoid backslash-continuation line --- tests/test_documents.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_documents.py b/tests/test_documents.py index 4dd53ebb..8d27d5be 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -274,9 +274,11 @@ class Django: car1 = Car() car2 = Car() car3 = Car() - with patch('django_elasticsearch_dsl.documents.bulk') as mock_bulk, \ - patch('django_elasticsearch_dsl.documents.parallel_bulk') as mock_parallel_bulk: - doc.update([car1, car2, car3]) + + bulk = "django_elasticsearch_dsl.documents.bulk" + parallel_bulk = "django_elasticsearch_dsl.documents.parallel_bulk" + with patch(bulk) as mock_bulk, patch(parallel_bulk) as mock_parallel_bulk: + doc.update([car3, car2, car3]) self.assertEqual( 3, len(list(mock_bulk.call_args_list[0][1]['actions'])) ) @@ -292,8 +294,9 @@ class Django: car1 = Car() car2 = Car() car3 = Car() - with patch('django_elasticsearch_dsl.documents.bulk') as mock_bulk, \ - patch('django_elasticsearch_dsl.documents.parallel_bulk') as mock_parallel_bulk: + bulk = "django_elasticsearch_dsl.documents.bulk" + parallel_bulk = "django_elasticsearch_dsl.documents.parallel_bulk" + with patch(bulk) as mock_bulk, patch(parallel_bulk) as mock_parallel_bulk: doc.update([car1, car2, car3], parallel=True) self.assertEqual(mock_bulk.call_count, 0, "bulk is not called") self.assertEqual(mock_parallel_bulk.call_count, 1, "parallel bulk is called")