Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance boost for bulk delete, and additional abstraction in utils #99

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 36 additions & 15 deletions bungiesearch/utils.py
Expand Up @@ -11,7 +11,9 @@
def update_index(model_items, model_name, action='index', bulk_size=100, num_docs=-1, start_date=None, end_date=None, refresh=True):
'''
Updates the index for the provided model_items.
:param model_items: a list of model_items (django Model instances, or proxy instances) which are to be indexed, or updated.
:param model_items: a list of model_items (django Model instances, or proxy instances) which are to be indexed/updated or deleted.
If action is 'index', the model_items must be serializable objects. If action is 'delete', the model_items must be primary keys
corresponding to obects in the index.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*objects

:param model_name: doctype, which must also be the model name.
:param action: the action that you'd like to perform on this group of data. Must be in ('index', 'delete') and defaults to 'index.'
:param bulk_size: bulk size for indexing. Defaults to 100.
Expand All @@ -25,6 +27,9 @@ def update_index(model_items, model_name, action='index', bulk_size=100, num_doc
'''
src = Bungiesearch()

if action == 'delete' and not isinstance(model_items, (list, tuple)):
raise ValueError("If action is 'delete', model_items must be an iterable of primary keys.")

logging.info('Getting index for model {}.'.format(model_name))
for index_name in src.get_index(model_name):
index_instance = src.get_model_index(model_name)
Expand All @@ -34,15 +39,7 @@ def update_index(model_items, model_name, action='index', bulk_size=100, num_doc
if isinstance(model_items, (list, tuple)):
num_docs = len(model_items)
else:
# Let's parse the start date and end date.
if start_date or end_date:
if index_instance.updated_field is None:
raise ValueError('Cannot filter by date on model {}: no updated_field defined in {}\'s Meta class.'.format(model_name, index_instance.__class__.__name__))
if start_date:
model_items = model_items.filter(**{'{}__gte'.format(index_instance.updated_field): __str_to_tzdate__(start_date)})
if end_date:
model_items = model_items.filter(**{'{}__lte'.format(index_instance.updated_field): __str_to_tzdate__(end_date)})
logging.info('Fetching number of documents to {} in {}.'.format(action, model.__name__))
model_items = filter_model_items(index_instance, model_items, model_name, start_date, end_date)
num_docs = model_items.count()
else:
logging.warning('Limiting the number of model_items to {} to {}.'.format(action, num_docs))
Expand All @@ -52,13 +49,10 @@ def update_index(model_items, model_name, action='index', bulk_size=100, num_doc
max_docs = num_docs + bulk_size if num_docs > bulk_size else bulk_size + 1
for next_step in range(bulk_size, max_docs, bulk_size):
logging.info('{}: documents {} to {} of {} total on index {}.'.format(action.capitalize(), prev_step, next_step, num_docs, index_name))
data = [index_instance.serialize_object(doc) for doc in model_items[prev_step:next_step] if index_instance.matches_indexing_condition(doc)]
for entry in data:
# Tell elasticsearch-py what to do with the data internally
entry["_op_type"] = action
data = create_indexed_document(index_instance, model_items[prev_step:next_step], action)
bulk_index(src.get_es_instance(), data, index=index_name, doc_type=model.__name__, raise_on_error=True)
prev_step = next_step

if refresh:
src.get_es_instance().indices.refresh(index=index_name)

Expand All @@ -84,5 +78,32 @@ def delete_index_item(item, model_name, refresh=True):
if refresh:
src.get_es_instance().indices.refresh(index=index_name)

def create_indexed_document(index_instance, model_items, action):
'''
Creates the document that will be passed into the bulk index function.
Either a list of serialized objects to index, or a a dictionary specifying the primary keys of items to be delete.
'''
data = []
if action == 'delete':
for pk in model_items:
data.append({'_id': pk, '_op_type': action})
else:
for doc in model_items:
if index_instance.matches_indexing_condition(doc):
data.append(index_instance.serialize_object(doc))
return data

def filter_model_items(index_instance, model_items, model_name, start_date, end_date):
''' Filters the model items queryset based on start and end date.'''
if index_instance.updated_field is None:
logging.warning("No updated date field found for {} - not restricting with start and end date".format(model_name))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a logger warning in Haystack instead of a hard error, and I think we should follow suit. The problem with the ValueError from before was that it was difficult to apply the update functionality across varied types of model indices. For example, if you want to apply update with start and end dates over all your model indices, you need to split the ones with update_field from the ones without in some way. This is possible for users like us, but seems a bit more cumbersome than necessary. Let me know what you think about this!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a valid point.

else:
if start_date:
model_items = model_items.filter(**{'{}__gte'.format(index_instance.updated_field): __str_to_tzdate__(start_date)})
if end_date:
model_items = model_items.filter(**{'{}__lte'.format(index_instance.updated_field): __str_to_tzdate__(end_date)})

return model_items

def __str_to_tzdate__(date_str):
return timezone.make_aware(parsedt(date_str), timezone.get_current_timezone())
7 changes: 5 additions & 2 deletions tests/core/test_bungiesearch.py
Expand Up @@ -258,7 +258,7 @@ def test_bulk_delete(self):
find_five = Article.objects.search.query('match', title='five')
self.assertEqual(len(find_five), 2, 'Searching for "five" in title did not return exactly two results (got {})'.format(find_five))

model_items = [bulk_obj1, bulk_obj2]
model_items = [bulk_obj1.pk, bulk_obj2.pk]
model_name = Article.__name__
update_index(model_items, model_name, action='delete', bulk_size=2, num_docs=-1, start_date=None, end_date=None, refresh=True)

Expand All @@ -279,7 +279,10 @@ def test_time_indexing(self):
except Exception as e:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on this test, can we remove the exception handling here and let the testing framework catch it. Instead of an assertion fail, it's probably better for the test to raise the exception. Hence, this function would become:

def test_time_indexing(self):
    update_index(Article.objects.all(), 'Article', start_date=datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M'))
    update_index(NoUpdatedField.objects.all(), 'NoUpdatedField', end_date=datetime.strftime(datetime.now(), '%Y-%m-%d'))

What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this would be a better way of writing this, now that you mention it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'll fix that up soon then.

self.fail('update_index with a start date failed for model Article: {}.'.format(e))

self.assertRaises(ValueError, update_index, **{'model_items': NoUpdatedField.objects.all(), 'model_name': 'NoUpdatedField', 'end_date': datetime.strftime(datetime.now(), '%Y-%m-%d')})
try:
update_index(NoUpdatedField.objects.all(), 'NoUpdatedField', end_date=datetime.strftime(datetime.now(), '%Y-%m-%d'))
except Exception as e:
self.fail('update_index with a start date failed for model NoUpdatedField, which has no updated field: {}.'.format(e))

def test_optimal_queries(self):
db_item = NoUpdatedField.objects.get(pk=1)
Expand Down