Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add atomic/zero-downtime index rebuilding #358

Merged
merged 20 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a3105c3
Added atomic/zero-downtime index rebuilding
oehrlein Jul 9, 2021
f8d1961
Update django_elasticsearch_dsl/management/commands/search_index.py
oehrlein Jul 29, 2021
2ffc083
Update django_elasticsearch_dsl/management/commands/search_index.py
oehrlein Jul 29, 2021
332cbb4
Update django_elasticsearch_dsl/management/commands/search_index.py
oehrlein Jul 29, 2021
b81f1af
Update django_elasticsearch_dsl/management/commands/search_index.py
oehrlein Jul 29, 2021
f7de595
Update django_elasticsearch_dsl/management/commands/search_index.py
oehrlein Jul 29, 2021
8a61845
Added index name slice to ensure index name length does not exceed 25…
oehrlein Aug 4, 2021
d9586e3
Updated naming of flags
oehrlein Sep 16, 2021
4848c43
Updated arg help messages for '--use-alias' and '--use-alias-keep-index'
oehrlein Sep 30, 2021
e86580a
Updated 'indexes' to 'indices' for verbiage consistency
oehrlein Sep 30, 2021
fa596e8
Simplified stdout messaging
oehrlein Sep 30, 2021
2070d36
Made adjustments for better switching between using aliases and not
oehrlein Sep 30, 2021
a78571f
Updated super() call for compatibility with Python 2
oehrlein Sep 30, 2021
34e74a7
Simplified logic for stdout messaging
oehrlein Sep 30, 2021
e19ee7a
Merge branch 'django-es:master' into master
oehrlein Oct 1, 2021
75a45ce
Fixed test_custom_generate_id_is_called test
oehrlein Oct 1, 2021
4ba7059
Added comment explaining need for aliases regardless of using the '--…
oehrlein Oct 6, 2021
3ad4a35
Moved alias indices deletion for 'rebuild' without aliases to _delete()
oehrlein Oct 6, 2021
39d1ba2
Added 'return False' in case of undeleted index or alias in _delete()
oehrlein Oct 6, 2021
ce05715
Added _matches() to DocType for alias pattern matching
oehrlein Oct 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions django_elasticsearch_dsl/documents.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import unicode_literals

from collections import deque
from fnmatch import fnmatch
from functools import partial

from django import VERSION as DJANGO_VERSION
Expand Down Expand Up @@ -66,6 +67,18 @@ def __eq__(self, other):
def __hash__(self):
return id(self)

@classmethod
def _matches(cls, hit):
"""
Determine which index or indices in a pattern to be used in a hit.
Overrides DSLDocument _matches function to match indices in a pattern,
which is needed in case of using aliases. This is needed as the
document class will not be used to deserialize the documents. The
documents will have the index set to the concrete index, whereas the
class refers to the alias.
"""
return fnmatch(hit.get("_index", ""), cls._index._name + "*")

@classmethod
def search(cls, using=None, index=None):
return Search(
Expand Down
173 changes: 159 additions & 14 deletions django_elasticsearch_dsl/management/commands/search_index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import unicode_literals, absolute_import
from datetime import datetime

from elasticsearch_dsl import connections
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from six.moves import input
Expand All @@ -9,6 +11,10 @@
class Command(BaseCommand):
help = 'Manage elasticsearch index.'

def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs)
self.es_conn = connections.get_connection()

def add_arguments(self, parser):
parser.add_argument(
'--models',
Expand Down Expand Up @@ -63,6 +69,21 @@ def add_arguments(self, parser):
dest='parallel',
help='Run populate/rebuild update single threaded'
)
parser.add_argument(
'--use-alias',
action='store_true',
dest='use_alias',
help='Use alias with indices'
)
parser.add_argument(
'--use-alias-keep-index',
action='store_true',
dest='use_alias_keep_index',
help="""
Do not delete replaced indices when used with '--rebuild' and
'--use-alias' args
"""
)
parser.set_defaults(parallel=getattr(settings, 'ELASTICSEARCH_DSL_PARALLEL', False))
parser.add_argument(
'--refresh',
Expand Down Expand Up @@ -107,10 +128,18 @@ def _get_models(self, args):

return set(models)

def _create(self, models, options):
def _create(self, models, aliases, options):
for index in registry.get_indices(models):
self.stdout.write("Creating index '{}'".format(index._name))
index.create()
alias_exists = index._name in aliases
if not alias_exists:
self.stdout.write("Creating index '{}'".format(index._name))
index.create()
elif options['action'] == 'create':
self.stdout.write(
"'{}' already exists as an alias. Run '--delete' with"
" '--use-alias' arg to delete indices pointed at the "
"alias to make index name available.".format(index._name)
)

def _populate(self, models, options):
parallel = options['parallel']
Expand All @@ -123,29 +152,138 @@ def _populate(self, models, options):
qs = doc().get_indexing_queryset()
doc().update(qs, parallel=parallel, refresh=options['refresh'])

def _delete(self, models, options):
def _get_alias_indices(self, alias):
alias_indices = self.es_conn.indices.get_alias(name=alias)
return list(alias_indices.keys())

def _delete_alias_indices(self, alias):
alias_indices = self._get_alias_indices(alias)
alias_delete_actions = [
{"remove_index": {"index": index}} for index in alias_indices
]
self.es_conn.indices.update_aliases({"actions": alias_delete_actions})
for index in alias_indices:
self.stdout.write("Deleted index '{}'".format(index))

def _delete(self, models, aliases, options):
index_names = [index._name for index in registry.get_indices(models)]

if not options['force']:
response = input(
"Are you sure you want to delete "
"the '{}' indexes? [y/N]: ".format(", ".join(index_names)))
"the '{}' indices? [y/N]: ".format(", ".join(index_names)))
if response.lower() != 'y':
self.stdout.write('Aborted')
return False

for index in registry.get_indices(models):
self.stdout.write("Deleting index '{}'".format(index._name))
index.delete(ignore=404)
if options['use_alias']:
for index in index_names:
alias_exists = index in aliases
if alias_exists:
self._delete_alias_indices(index)
elif self.es_conn.indices.exists(index=index):
self.stdout.write(
"'{}' refers to an index, not an alias. Run "
"'--delete' without '--use-alias' arg to delete "
"index.".format(index)
)
oehrlein marked this conversation as resolved.
Show resolved Hide resolved
return False
else:
for index in registry.get_indices(models):
alias_exists = index._name in aliases
if not alias_exists:
self.stdout.write("Deleting index '{}'".format(index._name))
index.delete(ignore=404)
elif options['action'] == 'rebuild':
self._delete_alias_indices(index._name)
elif options['action'] == 'delete':
self.stdout.write(
"'{}' refers to an alias, not an index. Run "
"'--delete' with '--use-alias' arg to delete indices "
"pointed at the alias.".format(index._name)
)
oehrlein marked this conversation as resolved.
Show resolved Hide resolved
return False

return True

def _rebuild(self, models, options):
if not self._delete(models, options):
def _update_alias(self, alias, new_index, alias_exists, options):
alias_actions = [{"add": {"alias": alias, "index": new_index}}]

delete_existing_index = False
if not alias_exists and self.es_conn.indices.exists(index=alias):
# Elasticsearch will return an error if an index already
# exists with the desired alias name. Therefore, we need to
# delete that index.
delete_existing_index = True
alias_actions.append({"remove_index": {"index": alias}})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a warning when using with --use-alias-keep-index as this will delete the old index anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By warning, do you just mean a stdout message explaining why the index is being deleted? If so, that's fine with me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A message sounds good to me. We may also add a prompt for that.
@safwanrahman what are your thoughts on this?


old_indices = []
alias_delete_actions = []
if alias_exists:
# Elasticsearch will return an error if we search for
# indices by alias but the alias doesn't exist. Therefore,
# we want to be sure the alias exists.
old_indices = self._get_alias_indices(alias)
alias_actions.append(
{"remove": {"alias": alias, "indices": old_indices}}
)
alias_delete_actions = [
{"remove_index": {"index": index}} for index in old_indices
]

self.es_conn.indices.update_aliases({"actions": alias_actions})
if delete_existing_index:
self.stdout.write("Deleted index '{}'".format(alias))

self.stdout.write(
"Added alias '{}' to index '{}'".format(alias, new_index)
)

if old_indices:
oehrlein marked this conversation as resolved.
Show resolved Hide resolved
for index in old_indices:
self.stdout.write(
"Removed alias '{}' from index '{}'".format(alias, index)
)

if alias_delete_actions and not options['use_alias_keep_index']:
self.es_conn.indices.update_aliases(
{"actions": alias_delete_actions}
)
for index in old_indices:
self.stdout.write("Deleted index '{}'".format(index))

def _rebuild(self, models, aliases, options):
if (not options['use_alias']
and not self._delete(models, aliases, options)):
return

self._create(models, options)
if options['use_alias']:
alias_index_pairs = []
index_suffix = "-" + datetime.now().strftime("%Y%m%d%H%M%S%f")
for index in registry.get_indices(models):
# The alias takes the original index name value. The
# index name sent to Elasticsearch will be the alias
# plus the suffix from above. In addition, the index
# name needs to be limited to 255 characters, of which
# 21 will always be taken by the suffix, leaving 234
# characters from the original index name value.
new_index = index._name[:234] + index_suffix
alias_index_pairs.append(
{'alias': index._name, 'index': new_index}
)
index._name = new_index

self._create(models, aliases, options)
self._populate(models, options)

if options['use_alias']:
for alias_index_pair in alias_index_pairs:
alias = alias_index_pair['alias']
alias_exists = alias in aliases
self._update_alias(
alias, alias_index_pair['index'], alias_exists, options
)

def handle(self, *args, **options):
if not options['action']:
raise CommandError(
Expand All @@ -156,14 +294,21 @@ def handle(self, *args, **options):
action = options['action']
models = self._get_models(options['models'])

# We need to know if and which aliases exist to mitigate naming
# conflicts with indices, therefore this is needed regardless
# of using the '--use-alias' arg.
aliases = []
for index in self.es_conn.indices.get_alias().values():
aliases += index['aliases'].keys()

if action == 'create':
self._create(models, options)
self._create(models, aliases, options)
elif action == 'populate':
self._populate(models, options)
elif action == 'delete':
self._delete(models, options)
self._delete(models, aliases, options)
elif action == 'rebuild':
self._rebuild(models, options)
self._rebuild(models, aliases, options)
else:
raise CommandError(
"Invalid action. Must be one of"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,5 +485,5 @@ def generate_id(cls, article):

# Get the data from the elasticsearch low level API because
# The generator get executed there.
data = json.loads(mock_bulk.call_args[0][0].split("\n")[0])
data = json.loads(mock_bulk.call_args[1]['body'].split("\n")[0])
assert data["index"]["_id"] == article.slug