Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mjl index speedup #213

Merged
merged 23 commits into from Oct 28, 2019
Merged

Mjl index speedup #213

merged 23 commits into from Oct 28, 2019

Conversation

mjl
Copy link
Contributor

@mjl mjl commented Oct 17, 2019

Use elasticsearch's parallel_bulk for indexing, add ELASTICSEARCH_DSL_PARALLEL default setting and parameters to management command.
Use qs.iterator() for fetching data during reindex, as this is much more memory efficient and performant.
Instead of finding out which methods to call to prepare fields, do that finagling once and cache it for subsequent model instance prepares.

See issue #154 for performance analysis and details.

…_PARALLEL default setting and parameters to management command.

Use qs.iterator() for fetching data during reindex, as this is much more memory efficient and performant.
Instead of finding out which methods to call to prepare fields, do that finagling once and cache it for subsequent model instance prepares.

See issue django-es#154 for performance analysis and details.
@mjl mjl changed the title Use elasticsearch's parallel_bulk for indexing, add ELASTICSEARCH_DSL… Mjl index speedup Oct 17, 2019
@mjl mjl mentioned this pull request Oct 17, 2019
@@ -124,6 +145,12 @@ def to_field(cls, field_name, model_field):
def bulk(self, actions, **kwargs):
return bulk(client=self._get_connection(), actions=actions, **kwargs)

def parallel_bulk(self, actions, **kwargs):
deque(parallel_bulk(client=self._get_connection(), actions=actions, **kwargs), maxlen=0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use deque here? should not parallel_bulk is enough?

@mjl
Copy link
Contributor Author

mjl commented Oct 17, 2019 via email

@safwanrahman
Copy link
Collaborator

@mjl Thanks for the update. I will give a closer review tomorrow and let you know the update

@safwanrahman
Copy link
Collaborator

safwanrahman commented Oct 17, 2019

I was thinking how a user can configure the parameters of parallel_bulk like thread_count=4, chunk_size=500 etc. @mjl Do you have any idea?

@safwanrahman
Copy link
Collaborator

No, parallel_bulk is a generator and thus is only started when started.

I understand. as parallel_bulk works lazyly, we can use it to optimize it in more way! I will give more review tomorrow.

@mjl
Copy link
Contributor Author

mjl commented Oct 17, 2019 via email


def prepare(self, instance):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change the prepare function?

@mjl
Copy link
Contributor Author

mjl commented Oct 17, 2019 via email

@mjl
Copy link
Contributor Author

mjl commented Oct 17, 2019 via email

Martin J. Laubach added 2 commits October 17, 2019 23:46
…ally cleaner. Also shaves off a test per object.
…, as it's conceptually possible to have several indices on the same model.

Also remove forced ordering that is a remnant of earlier code.
@safwanrahman
Copy link
Collaborator

The tests fails seems relative. Can you fix it?

@mjl
Copy link
Contributor Author

mjl commented Oct 18, 2019 via email

@mjl
Copy link
Contributor Author

mjl commented Oct 18, 2019 via email

"""
qs = self.get_queryset()
kwargs = {}
if DJANGO_VERSION >= (2,) and self.django.queryset_pagination:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to handle the usecase of people who are using django< 2 version. They should be able to paginate the queryset like before.

@@ -135,6 +135,7 @@ def test_get_doc_with_many_to_many_relationships(self):
])

def test_doc_to_dict(self):
self.maxDiff = None # XXX: mjl temporary
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it

@safwanrahman
Copy link
Collaborator

I need some guidance. There is a test for pagination. However, pagination has been totally replaced with using qs.iterator(). Should I remove the test, or adapt it to match current query patterns, or re-add pagination on top of iterator() (which doesn't make a lot of sense to me and probably negates using iterator in the first place, but...)?

I think paginator can not be totally replaced if we want to keep support of django<2.0. So you should use paginator in the django version where chunk_size is not present in iterator and use iterator with chunk_size where its present. Regarding tests, can you fix the tests so it cover the case of iterator and Pagination both?

@mjl Will you consider adding some tests for your workflow? I know its hard to add some tests for this workflow, but I dont want things get broken in future.

@mjl
Copy link
Contributor Author

mjl commented Oct 19, 2019 via email

@safwanrahman
Copy link
Collaborator

Considering that official support for dj 1.10 ended in december 2017, is it worth the complexity?

Django 1.10 is not supported, so its not needed. But django 1.11 is a important LTS release and its the last release for python 2.x. So I would consider supporting the django 1.11 for long time. As the pagination can be configured in the document, we must support it for django 1.11 also.

@mjl
Copy link
Contributor Author

mjl commented Oct 28, 2019

Ok, I added/adapted some tests. I fear it might be a bit brittle because it tests the inner workings of the indexing process, but hey...

@@ -124,6 +148,12 @@ def to_field(cls, field_name, model_field):
def bulk(self, actions, **kwargs):
return bulk(client=self._get_connection(), actions=actions, **kwargs)

def parallel_bulk(self, actions, **kwargs):
deque(parallel_bulk(client=self._get_connection(), actions=actions, **kwargs), maxlen=0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjl Can you add the chunk_size parameter here which will consist the value of queryset_pagination?

car2 = Car()
car3 = Car()
with patch('django_elasticsearch_dsl.documents.bulk') as mock_bulk, \
patch('django_elasticsearch_dsl.documents.parallel_bulk') as mock_parallel_bulk:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to have this

Suggested change
patch('django_elasticsearch_dsl.documents.parallel_bulk') as mock_parallel_bulk:
with (patch('django_elasticsearch_dsl.documents.bulk') as mock_bulk,
patch('django_elasticsearch_dsl.documents.parallel_bulk') as mock_parallel_bulk):```

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this?

Suggested change
patch('django_elasticsearch_dsl.documents.parallel_bulk') as mock_parallel_bulk:
bulk = "django_elasticsearch_dsl.documents.bulk"
parallel_bulk = "django_elasticsearch_dsl.documents.parallel_bulk"
with patch(bulk) as mock_parallel_bulk, patch(parallel_bulk) as mock_parallel_bulk:

@@ -124,6 +148,12 @@ def to_field(cls, field_name, model_field):
def bulk(self, actions, **kwargs):
return bulk(client=self._get_connection(), actions=actions, **kwargs)

def parallel_bulk(self, actions, **kwargs):
deque(parallel_bulk(client=self._get_connection(), actions=actions, **kwargs), maxlen=0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
deque(parallel_bulk(client=self._get_connection(), actions=actions, **kwargs), maxlen=0)
bulk_actions = parallel_bulk(client=self._get_connection(), actions=actions,
chunk_size=self.queryset_pagination, **kwargs),
# As the `parallel_bulk` is lazy, we need to get it into `deque` to run it instantly
deque(bulk_actions, maxlen=0)

@safwanrahman
Copy link
Collaborator

@mjl Thanks a lot for the work you have done. I have added couple of reviews, after fixing this thing, I think its good to merge. 👍

@mjl
Copy link
Contributor Author

mjl commented Oct 28, 2019 via email

name: prep_func(instance)
for name, field, prep_func in self._prepared_fields
}
# print("-> %s" % data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove this?

Suggested change
# print("-> %s" % data)

@@ -124,6 +147,17 @@ def to_field(cls, field_name, model_field):
def bulk(self, actions, **kwargs):
return bulk(client=self._get_connection(), actions=actions, **kwargs)

def parallel_bulk(self, actions, **kwargs):
if self.django.queryset_pagination and 'chunk_size' not in kwargs:
kwargs['chunk_size'] = self.django.queryset_pagination
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix the indent.

@safwanrahman
Copy link
Collaborator

Doesn't work, this isn't valid syntax according to my python 3.7. I
incorporated the rest of your suggestions.

That is weired. Can you check how to break the line without using \. \ backslach is not very much readable.

@mjl
Copy link
Contributor Author

mjl commented Oct 28, 2019 via email

@safwanrahman
Copy link
Collaborator

ERR_CONFUSED, I don't understand? Looks fine to me?

Oh. sorry. it did not catch my eye that its a seperate block. Sorry

@mjl
Copy link
Contributor Author

mjl commented Oct 28, 2019 via email

Copy link
Collaborator

@safwanrahman safwanrahman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really nice! Thanks a lot for the awesome work @mjl. It is really nice to get this kind of significant contributions get merged in Open source projects. r++ 💯

It is such a amazing that after many rounds of review and long wait time, this things are in a state to have it merged.

@safwanrahman safwanrahman merged commit 13a4d5c into django-es:master Oct 28, 2019
@pySilver
Copy link
Contributor

Congratulations to you both! Thank you for your time and hard work!

@mjl
Copy link
Contributor Author

mjl commented Oct 28, 2019 via email

@safwanrahman
Copy link
Collaborator

Just released a new branch 7.1.0 with the changes. https://pypi.org/project/django-elasticsearch-dsl/7.1.0/#description

@mjl mjl deleted the mjl-index-speedup branch October 29, 2019 12:01
@mjl
Copy link
Contributor Author

mjl commented Oct 29, 2019

@safwanrahman The release notes description is not quite complete.
Most of the speed improvement comes from using queryset.iterator(), which also uses way less memory. Then comes precomputing prepare functions to move all those comparisons out of the criticial path; and then there is the parallel indexing.

I'm not sure that is of interest to end users to include in the release notes though. Perhaps that is too much detail.

@safwanrahman
Copy link
Collaborator

@mjl Oops. I have updated the release note. Check it and let me know if anything need to be changed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants