Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mjl index speedup #213

Merged
merged 23 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b4920c9
Use elasticsearch's parallel_bulk for indexing, add ELASTICSEARCH_DSL…
Oct 17, 2019
e5cd2df
Move collection of prepare functions to __init__, where it's conceptu…
Oct 17, 2019
861f704
Minor cleanup: Move prepare cache to Document object instead of Model…
Oct 17, 2019
827800d
chunk_size parameter for queryset.iterator() appeared in Django 2
Oct 18, 2019
1773ed6
Do not crash in init_prepare when no fields have been defined
Oct 18, 2019
acda7e7
Crank up diff size to see what is going on
Oct 18, 2019
8279746
Adapt test to changed call pattern
Oct 18, 2019
c4f230d
Adapt tests to changed call patterns
Oct 18, 2019
1dcc013
Mark pagination test as expected failure for now.
Oct 18, 2019
505406e
Define _prepared_fields as attribute in class so to_dict() won't pick…
Oct 18, 2019
93f7d7c
remove debugging
Oct 21, 2019
c4a8a24
Add parameter no to do a count(*) before indexing, as for complex que…
Oct 24, 2019
2d2e996
Fixing example application
safwanrahman Oct 25, 2019
3b69009
Correctly clean up after test run (delete indices with the right name).
Oct 25, 2019
f613926
Remove paginator test.
Oct 25, 2019
51360c3
Make sure we compare w/ stable order
Oct 25, 2019
475e8a8
Adjust for different types for methods/partials in py2
Oct 25, 2019
45f62e7
Correct es dependency (was conflicting with requirements.txt)
Oct 25, 2019
c933fb7
Pass queryset_pagination as chunk_size into parallel_bulk too.
Oct 28, 2019
96c3883
Add explanation why we use deque()
Oct 28, 2019
9c138d6
Correct typo in explanation of test
Oct 28, 2019
9ec41a0
Remove leftover instrumentation print
Oct 28, 2019
5086396
Better formatting to avoid backslash-continuation line
Oct 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ It required to defined ``Document`` class in ``documents.py`` in your app direc
# Ignore auto updating of Elasticsearch when a model is saved
# or deleted:
# ignore_signals = True

# Don't perform an index refresh after every update (overrides global setting):
# auto_refresh = False

# Paginate the django queryset used to populate the index with the specified size
# (by default there is no pagination)
# (by default it uses the database driver's default setting)
# queryset_pagination = 5000


Expand Down Expand Up @@ -551,6 +553,15 @@ Defaults to ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor``.
You could, for instance, make a ``CelerySignalProcessor`` which would add
update jobs to the queue to for delayed processing.

ELASTICSEARCH_DSL_PARALLEL
~~~~~~~~~~~~~~~~~~~~~~~~~~

Default: ``False``

Run indexing (populate and rebuild) in parallel using ES' parallel_bulk() method.
Note that some databases (e.g. sqlite) do not play well with this option.


Testing
-------

Expand Down
100 changes: 68 additions & 32 deletions django_elasticsearch_dsl/documents.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from __future__ import unicode_literals

from django.core.paginator import Paginator
from collections import deque
from copy import deepcopy
from functools import partial

from django import VERSION as DJANGO_VERSION
from django.db import models
from django.utils.six import iteritems
from elasticsearch.helpers import bulk
from elasticsearch.helpers import bulk, parallel_bulk
from elasticsearch_dsl import Document as DSLDocument

from .exceptions import ModelFieldNotMappedError
Expand Down Expand Up @@ -45,11 +49,12 @@
models.URLField: TextField,
}


class DocType(DSLDocument):
_prepared_fields = []
def __init__(self, related_instance_to_ignore=None, **kwargs):
super(DocType, self).__init__(**kwargs)
self._related_instance_to_ignore = related_instance_to_ignore
self._prepared_fields = self.init_prepare()

def __eq__(self, other):
return id(self) == id(other)
Expand All @@ -70,39 +75,57 @@ def get_queryset(self):
"""
Return the queryset that should be indexed by this doc type.
"""
primary_key_field_name = self.django.model._meta.pk.name
return self.django.model._default_manager.all().order_by(primary_key_field_name)
return self.django.model._default_manager.all()

def prepare(self, instance):
def get_indexing_queryset(self):
"""
Take a model instance, and turn it into a dict that can be serialized
based on the fields defined on this DocType subclass
Build queryset (iterator) for use by indexing.
"""
qs = self.get_queryset()
kwargs = {}
if DJANGO_VERSION >= (2,) and self.django.queryset_pagination:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to handle the usecase of people who are using django< 2 version. They should be able to paginate the queryset like before.

kwargs = {'chunk_size': self.django.queryset_pagination}
return qs.iterator(**kwargs)

def init_prepare(self):
"""
Initialise the data model preparers once here. Extracts the preparers
from the model and generate a list of callables to avoid doing that
work on every object instance over.
"""
data = {}
for name, field in iteritems(self._fields):
index_fields = getattr(self, '_fields', {})
fields = []
for name, field in iteritems(index_fields):
if not isinstance(field, DEDField):
continue

if field._path == []:
if not field._path:
field._path = [name]

prep_func = getattr(self, 'prepare_%s_with_related' % name, None)
if prep_func:
field_value = prep_func(
instance,
related_to_ignore=self._related_instance_to_ignore
)
fn = partial(prep_func, related_to_ignore=self._related_instance_to_ignore)
else:
prep_func = getattr(self, 'prepare_%s' % name, None)
if prep_func:
field_value = prep_func(instance)
fn = prep_func
else:
field_value = field.get_value_from_instance(
instance, self._related_instance_to_ignore
)
fn = partial(field.get_value_from_instance, field_value_to_ignore=self._related_instance_to_ignore)

data[name] = field_value
fields.append((name, field, fn))

return fields

def prepare(self, instance):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change the prepare function?

"""
Take a model instance, and turn it into a dict that can be serialized
based on the fields defined on this DocType subclass
"""
data = {
name: prep_func(instance)
for name, field, prep_func in self._prepared_fields
}
# print("-> %s" % data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove this?

Suggested change
# print("-> %s" % data)

return data

@classmethod
Expand All @@ -124,6 +147,17 @@ def to_field(cls, field_name, model_field):
def bulk(self, actions, **kwargs):
return bulk(client=self._get_connection(), actions=actions, **kwargs)

def parallel_bulk(self, actions, **kwargs):
if self.django.queryset_pagination and 'chunk_size' not in kwargs:
kwargs['chunk_size'] = self.django.queryset_pagination
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix the indent.

bulk_actions = parallel_bulk(client=self._get_connection(), actions=actions, **kwargs)
# As the `parallel_bulk` is lazy, we need to get it into `deque` to run it instantly
# See https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/2
deque(bulk_actions, maxlen=0)
# Fake return value to emulate bulk() since we don't have a result yet,
# the result is currently not used upstream anyway.
return (1, [])

def _prepare_action(self, object_instance, action):
return {
'_op_type': action,
Expand All @@ -135,18 +169,18 @@ def _prepare_action(self, object_instance, action):
}

def _get_actions(self, object_list, action):
if self.django.queryset_pagination is not None:
paginator = Paginator(
object_list, self.django.queryset_pagination
)
for page in paginator.page_range:
for object_instance in paginator.page(page).object_list:
yield self._prepare_action(object_instance, action)
for object_instance in object_list:
yield self._prepare_action(object_instance, action)

def _bulk(self, *args, **kwargs):
"""Helper for switching between normal and parallel bulk operation"""
parallel = kwargs.pop('parallel', False)
if parallel:
return self.parallel_bulk(*args, **kwargs)
else:
for object_instance in object_list:
yield self._prepare_action(object_instance, action)
return self.bulk(*args, **kwargs)

def update(self, thing, refresh=None, action='index', **kwargs):
def update(self, thing, refresh=None, action='index', parallel=False, **kwargs):
"""
Update each document in ES for a model, iterable of models or queryset
"""
Expand All @@ -160,8 +194,10 @@ def update(self, thing, refresh=None, action='index', **kwargs):
else:
object_list = thing

return self.bulk(
self._get_actions(object_list, action), **kwargs
return self._bulk(
self._get_actions(object_list, action),
parallel=parallel,
**kwargs
)


Expand Down
33 changes: 29 additions & 4 deletions django_elasticsearch_dsl/management/commands/search_index.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import unicode_literals, absolute_import

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.utils.six.moves import input
from ...registries import registry
Expand Down Expand Up @@ -49,6 +51,26 @@ def add_arguments(self, parser):
dest='force',
help="Force operations without asking"
)
parser.add_argument(
'--parallel',
action='store_true',
dest='parallel',
help='Run populate/rebuild update multi threaded'
)
parser.add_argument(
'--no-parallel',
action='store_false',
dest='parallel',
help='Run populate/rebuild update single threaded'
)
parser.set_defaults(parallel=getattr(settings, 'ELASTICSEARCH_DSL_PARALLEL', False))
parser.add_argument(
'--no-count',
action='store_false',
default=True,
dest='count',
help='Do not include a total count in the summary log line'
)

def _get_models(self, args):
"""
Expand Down Expand Up @@ -84,12 +106,15 @@ def _create(self, models, options):
index.create()

def _populate(self, models, options):
parallel = options['parallel']
for doc in registry.get_documents(models):
qs = doc().get_queryset()
self.stdout.write("Indexing {} '{}' objects".format(
qs.count(), doc.django.model.__name__)
self.stdout.write("Indexing {} '{}' objects {}".format(
doc().get_queryset().count() if options['count'] else "all",
doc.django.model.__name__,
"(parallel)" if parallel else "")
)
doc().update(qs)
qs = doc().get_indexing_queryset()
doc().update(qs, parallel=parallel)

def _delete(self, models, options):
index_names = [index._name for index in registry.get_indices(models)]
Expand Down
6 changes: 3 additions & 3 deletions django_elasticsearch_dsl/test/testcases.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ def setUp(self):
def tearDown(self):
pattern = re.compile(self._index_suffixe + '$')

for doc in registry.get_documents():
doc._index._name = pattern.sub('', doc._index._name)

for index in registry.get_indices():
index.delete(ignore=[404, 400])
index._name = pattern.sub('', index._name)

for doc in registry.get_documents():
doc._index._name = pattern.sub('', doc._index._name)

super(ESTestCase, self).tearDown()