Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mjl index speedup #154

Closed
wants to merge 12 commits into from
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,20 @@ It required to defined ``Document`` class in ``documents.py`` in your app direc
# Ignore auto updating of Elasticsearch when a model is saved
# or deleted:
# ignore_signals = True

# Don't perform an index refresh after every update (overrides global setting):
# auto_refresh = False

# Paginate the django queryset used to populate the index with the specified size
# (by default there is no pagination)
# queryset_pagination = 5000

# Do indexing in parallel (using ES parallel_bulk)
# Note that some databases (eg. sqlite) do not play well with this option
# (defaults to False):
# parallel_indexing = True



To create and populate the Elasticsearch index and mapping use the search_index command::

Expand Down
137 changes: 111 additions & 26 deletions django_elasticsearch_dsl/documents.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from __future__ import unicode_literals

from collections import deque
from copy import deepcopy
from functools import partial

from django.core.paginator import Paginator
from django.db import models
from django.utils.six import iteritems
from elasticsearch.helpers import bulk
from elasticsearch.helpers import bulk, parallel_bulk
from elasticsearch_dsl import Document as DSLDocument

from .exceptions import ModelFieldNotMappedError
Expand Down Expand Up @@ -44,6 +48,62 @@
models.URLField: TextField,
}

class PagingQuerysetProxy(object):
"""
I am a tiny standin for Django Querysets that implements enough of
the protocol (namely count() and __iter__) to be useful for indexing
large data sets.

When iterated over, I will:
- use qs.iterator() to disable result set caching in queryset.
- chunk fetching the results so that caching in database driver
(especially psycopg2) is kept to a minimum, and database
drivers that do not support streaming (eg. mysql) do not
need to load the whole dataset at once.
"""
def __init__(self, qs, chunk_size=10000):
self.qs = qs
self.chunk_size = chunk_size

def count(self):
"""Pass through to underlying queryset"""
return self.qs.count()

def __iter__(self):
"""Iterate over result set. Internally uses iterator() as not
to cache in the queryset; also supports chunking fetching data
in smaller sets so that databases that do not use server side
cursors (django docs say only postgres and oracle do) or other
optimisations keep memory consumption manageable."""

last_max_pk = None

# Get a clone of the QuerySet so that the cache doesn't bloat up
# in memory. Useful when reindexing large amounts of data.
small_cache_qs = self.qs.order_by('pk')

once = no_data = False
while not no_data and not once:
# If we got the max seen PK from last batch, use it to restrict the qs
# to values above; this optimises the query for Postgres as not to
# devolve into multi-second run time at large offsets.
if self.chunk_size:
if last_max_pk is not None:
current_qs = small_cache_qs.filter(pk__gt=last_max_pk)[:self.chunk_size]
Copy link
Collaborator

@safwanrahman safwanrahman Aug 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about when the primary key is not integer? This approach can not be generalized as pk can be uuid also!

else:
current_qs = small_cache_qs[:self.chunk_size]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the reason to slice before running the iterator? Django handle it by default if you pass certain chunk_size`

else: # Handle "no chunking"
current_qs = small_cache_qs
once = True # force loop exit after fetching all data

no_data = True
for obj in current_qs.iterator():
# Remember maximum PK seen so far
last_max_pk = obj.pk
no_data = False
yield obj

current_qs = None # I'm free!

class DocType(DSLDocument):
def __init__(self, related_instance_to_ignore=None, **kwargs):
Expand Down Expand Up @@ -71,36 +131,55 @@ def get_queryset(self):
"""
return self.django.model._default_manager.all()

def prepare(self, instance):
def get_indexing_queryset(self):
qs = self.get_queryset()
# Note: PagingQuerysetProxy handles no chunking, but some tests
# check for the qs, so don't interfere.
if self.django.queryset_pagination is not None:
return PagingQuerysetProxy(qs, chunk_size=self.django.queryset_pagination)
return qs

def init_prepare(self):
"""
Take a model instance, and turn it into a dict that can be serialized
based on the fields defined on this DocType subclass
Initialise the data model preparers once here. Extracts the preparers
from the model and generate a list of callables to avoid doing that
work on every object instance over.
"""
data = {}
fields = []
for name, field in iteritems(self._fields):
if not isinstance(field, DEDField):
continue

if field._path == []:
if not field._path:
field._path = [name]

prep_func = getattr(self, 'prepare_%s_with_related' % name, None)
if prep_func:
field_value = prep_func(
instance,
related_to_ignore=self._related_instance_to_ignore
)
fn = partial(prep_func, related_to_ignore=self._related_instance_to_ignore)
else:
prep_func = getattr(self, 'prepare_%s' % name, None)
if prep_func:
field_value = prep_func(instance)
fn = prep_func
else:
field_value = field.get_value_from_instance(
instance, self._related_instance_to_ignore
)
fn = partial(field.get_value_from_instance, field_value_to_ignore=self._related_instance_to_ignore)

data[name] = field_value
fields.append((name, field, fn))

self._doc_type._prepared_fields = fields

def prepare(self, instance):
"""
Take a model instance, and turn it into a dict that can be serialized
based on the fields defined on this DocType subclass
"""
if getattr(self._doc_type, '_prepared_fields', None) is None:
self.init_prepare()

data = {
name: prep_func(instance)
for name, field, prep_func in self._doc_type._prepared_fields
}
# print("-> %s" % data)
return data

@classmethod
Expand All @@ -122,6 +201,10 @@ def to_field(cls, field_name, model_field):
def bulk(self, actions, **kwargs):
return bulk(client=self._get_connection(), actions=actions, **kwargs)

def parallel_bulk(self, actions, **kwargs):
deque(parallel_bulk(client=self._get_connection(), actions=actions, **kwargs), maxlen=0)
return (1, []) # Fake return value to emulate bulk(), not used upstream

def _prepare_action(self, object_instance, action):
return {
'_op_type': action,
Expand All @@ -134,16 +217,16 @@ def _prepare_action(self, object_instance, action):
}

def _get_actions(self, object_list, action):
if self.django.queryset_pagination is not None:
paginator = Paginator(
object_list, self.django.queryset_pagination
)
for page in paginator.page_range:
for object_instance in paginator.page(page).object_list:
yield self._prepare_action(object_instance, action)
for object_instance in object_list:
yield self._prepare_action(object_instance, action)

def _bulk(self, *args, **kwargs):
"""Helper for switching between normal and parallel bulk operation"""
parallel = kwargs.pop('parallel', False)
if parallel:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should find a way where while updating a single object the parallel is always false. We need to use it only while indexing large number of objects. When a model object is updated, the django signal call the same method for updating the index.

return self.parallel_bulk(*args, **kwargs)
else:
for object_instance in object_list:
yield self._prepare_action(object_instance, action)
return self.bulk(*args, **kwargs)

def update(self, thing, refresh=None, action='index', **kwargs):
"""
Expand All @@ -159,8 +242,10 @@ def update(self, thing, refresh=None, action='index', **kwargs):
else:
object_list = thing

return self.bulk(
self._get_actions(object_list, action), **kwargs
return self._bulk(
self._get_actions(object_list, action),
parallel=self.django.parallel_indexing,
**kwargs
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _create(self, models, options):

def _populate(self, models, options):
for doc in registry.get_documents(models):
qs = doc().get_queryset()
qs = doc().get_indexing_queryset()
self.stdout.write("Indexing {} '{}' objects".format(
qs.count(), doc.django.model.__name__)
)
Expand Down
1 change: 1 addition & 0 deletions django_elasticsearch_dsl/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def register_document(self, document):
"auto_refresh", DEDConfig.auto_refresh_enabled())
django_attr.related_models = getattr(django_meta, "related_models", [])
django_attr.queryset_pagination = getattr(django_meta, "queryset_pagination", None)
django_attr.parallel_indexing = getattr(django_meta, "parallel_indexing", False)

# Add django attribute in the document class with all the django attribute
setattr(document, 'django', django_attr)
Expand Down