Skip to content

Commit

Permalink
[bug 720865] Remove race conditions that were causing mapping conflic…
Browse files Browse the repository at this point in the history
…ts in production.

* Remove support for putting different doctypes in different indices.
* Remove the ability to limit reindexing to given doctypes.
* Make get_mapping() return less boilerplate. Factor that up.
  • Loading branch information
erikrose authored and willkg committed Jan 25, 2012
1 parent 57f4faf commit f2b4463
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 133 deletions.
34 changes: 15 additions & 19 deletions apps/forums/models.py
Expand Up @@ -179,25 +179,21 @@ def update_last_post(self, exclude_post=None):

@classmethod
def get_mapping(cls):
mapping = {
'properties': {
'id': {'type': 'integer'},
'thread_id': {'type': 'integer'},
'forum_id': {'type': 'integer'},
'title': {'type': 'string', 'analyzer': 'snowball'},
'is_sticky': {'type': 'boolean'},
'is_locked': {'type': 'boolean'},
'author_id': {'type': 'integer'},
'author_ord': {'type': 'string'},
'content': {'type': 'string', 'analyzer': 'snowball',
'store': 'yes',
'term_vector': 'with_positions_offsets'},
'created': {'type': 'date'},
'updated': {'type': 'date'},
'replies': {'type': 'integer'}
}
}
return mapping
return {
'id': {'type': 'integer'},
'thread_id': {'type': 'integer'},
'forum_id': {'type': 'integer'},
'title': {'type': 'string', 'analyzer': 'snowball'},
'is_sticky': {'type': 'boolean'},
'is_locked': {'type': 'boolean'},
'author_id': {'type': 'integer'},
'author_ord': {'type': 'string'},
'content': {'type': 'string', 'analyzer': 'snowball',
'store': 'yes',
'term_vector': 'with_positions_offsets'},
'created': {'type': 'date'},
'updated': {'type': 'date'},
'replies': {'type': 'integer'}}

def extract_document(self):
"""Extracts interesting thing from a Thread and its Posts"""
Expand Down
52 changes: 24 additions & 28 deletions apps/questions/models.py
Expand Up @@ -283,34 +283,30 @@ def my_tags(self):

@classmethod
def get_mapping(cls):
mapping = {
'properties': {
'id': {'type': 'long'},
'question_id': {'type': 'long'},
'title': {'type': 'string', 'analyzer': 'snowball'},
'question_content':
{'type': 'string', 'analyzer': 'snowball',
# TODO: Stored because originally, this is the
# only field we were excerpting on. Standardize
# one way or the other.
'store': 'yes', 'term_vector': 'with_positions_offsets'},
'answer_content':
{'type': 'string', 'analyzer': 'snowball'},
'replies': {'type': 'integer'},
'is_solved': {'type': 'boolean'},
'is_locked': {'type': 'boolean'},
'has_answers': {'type': 'boolean'},
'has_helpful': {'type': 'boolean'},
'created': {'type': 'date'},
'updated': {'type': 'date'},
'question_creator': {'type': 'string'},
'answer_creator': {'type': 'string'},
'question_votes': {'type': 'integer'},
'answer_votes': {'type': 'integer'},
'tag': {'type': 'string'}
}
}
return mapping
return {
'id': {'type': 'long'},
'question_id': {'type': 'long'},
'title': {'type': 'string', 'analyzer': 'snowball'},
'question_content':
{'type': 'string', 'analyzer': 'snowball',
# TODO: Stored because originally, this is the
# only field we were excerpting on. Standardize
# one way or the other.
'store': 'yes', 'term_vector': 'with_positions_offsets'},
'answer_content':
{'type': 'string', 'analyzer': 'snowball'},
'replies': {'type': 'integer'},
'is_solved': {'type': 'boolean'},
'is_locked': {'type': 'boolean'},
'has_answers': {'type': 'boolean'},
'has_helpful': {'type': 'boolean'},
'created': {'type': 'date'},
'updated': {'type': 'date'},
'question_creator': {'type': 'string'},
'answer_creator': {'type': 'string'},
'question_votes': {'type': 'integer'},
'answer_votes': {'type': 'integer'},
'tag': {'type': 'string'}}

def extract_document(self):
"""Extracts indexable attributes from a Question and its answers."""
Expand Down
38 changes: 17 additions & 21 deletions apps/search/es_utils.py
Expand Up @@ -66,49 +66,45 @@ def format_time(time_to_go):
return "%dm %ds" % (time_to_go / 60, time_to_go % 60)


def es_reindex_with_progress(doctypes=None, percent=100):
def es_reindex_with_progress(percent=100):
"""Rebuild Elastic indexes as you iterate over yielded progress ratios.
:arg doctypes: Defaults to None which will index all doctypes.
Otherwise indexes the doctypes specified. See
:py:func:`.get_doctype_stats()` for what doctypes look like.
:arg percent: Defaults to 100. Allows you to specify how much of
each doctype you want to index. This is useful for
development where doing a full reindex takes an hour.
"""
from search.models import get_search_models

es = elasticutils.get_es()

search_models = get_search_models()
if doctypes:
search_models = [cls for cls in search_models
if cls._meta.db_table in doctypes]

if len(search_models) == len(get_search_models()):
index = settings.ES_INDEXES.get('default')
if index is not None:
# If we're indexing everything and there's a default index
# specified in settings, then we delete and recreate it.
es.delete_index_if_exists(index)
es.create_index(index)

total = sum([cls.objects.count() for cls in search_models])
es = elasticutils.get_es()
index = settings.ES_INDEXES['default']
es.delete_index_if_exists(index)
# There should be no mapping-conflict race here since the index doesn't
# exist. Live indexing should just fail.

# Simultaneously create the index and the mappings, so live indexing
# doesn't get a chance to index anything between the two and infer a bogus
# mapping (which ES then freaks out over when we try to lay in an
# incompatible explicit mapping).
mappings = dict((cls._meta.db_table, {'properties': cls.get_mapping()})
for cls in search_models)
es.create_index(index, settings={'mappings': mappings})

total = sum([cls.objects.count() for cls in search_models])
to_index = [cls.index_all(percent) for cls in search_models]

return (float(done) / total for done, _ in
izip(count(1), chain(*to_index)))


def es_reindex(doctypes=None, percent=100):
def es_reindex(percent=100):
"""Rebuild ElasticSearch indexes
See :py:func:`.es_reindex_with_progress` for argument details.
"""
[x for x in es_reindex_with_progress(doctypes, percent) if False]
[x for x in es_reindex_with_progress(percent) if False]


def es_whazzup():
Expand Down
19 changes: 5 additions & 14 deletions apps/search/management/commands/esreindex.py
@@ -1,6 +1,8 @@
import logging
from django.core.management.base import BaseCommand, CommandError
from optparse import make_option

from django.core.management.base import BaseCommand, CommandError

from search.es_utils import es_reindex
from search.models import get_search_models

Expand All @@ -14,17 +16,6 @@ class Command(BaseCommand):
def handle(self, *args, **options):
logging.basicConfig(level=logging.INFO)
percent = options['percent']
if percent > 100 or percent < 1:
if not 1 <= percent <= 100:
raise CommandError('percent should be between 1 and 100')

if args:
search_models = get_search_models()
possible_doctypes = dict((cls._meta.db_table, cls)
for cls in search_models)
for mem in args:
if mem not in possible_doctypes:
raise CommandError('"%s" is not a valid doctype (%s)' %
(mem, possible_doctypes.keys()))

# args are the list of doctypes to index.
es_reindex(args, percent)
es_reindex(percent)
40 changes: 14 additions & 26 deletions apps/search/models.py
Expand Up @@ -60,11 +60,11 @@ class SearchMixin(object):
"""
@classmethod
def get_mapping(self):
"""Returns the ES mapping defition for this document type
"""Return the ES mapping properties for this document type.
This must be implemented. It should return an ES mapping.
For example... ::
For examples, see the codebase.
{'id': {'type': 'integer'}, ...}
"""
raise NotImplementedError
Expand All @@ -80,12 +80,6 @@ def extract_document(self):
"""
raise NotImplementedError

@classmethod
def get_es_index(cls):
"""Returns the index for this class"""
indexes = settings.ES_INDEXES
return indexes.get(cls._meta.db_table) or indexes['default']

def index_later(self):
"""Register myself to be indexed at the end of the request."""
_local_tasks().add((index_task.delay, (self.__class__, (self.id,))))
Expand All @@ -110,23 +104,12 @@ def index_all(cls, percent=100):
es = es_utils.get_es()

doc_type = cls._meta.db_table
index = cls.get_es_index()

if index != settings.ES_INDEXES.get('default'):
# If this doctype isn't using the default index, then this
# doctype is responsible for deleting and re-creating the
# index.
es.delete_index_if_exists(index)
es.create_index(index)
index = settings.ES_INDEXES['default']

start_time = time.time()

log.info('reindex %s into %s index', doc_type, index)

log.info('setting up mapping....')
mapping = cls.get_mapping()
es.put_mapping(doc_type, mapping, index)

log.info('iterating through %s....', doc_type)
total = cls.objects.count()
to_index = int(total * (percent / 100.0))
Expand Down Expand Up @@ -187,12 +170,16 @@ def index(cls, document, bulk=False, force_insert=False, refresh=False,
if es is None:
es = elasticutils.get_es()

index = cls.get_es_index()
index = settings.ES_INDEXES['default']
doc_type = cls._meta.db_table

# TODO: handle pyes.urllib3.TimeoutErrors here.
es.index(document, index=index, doc_type=doc_type, id=document['id'],
bulk=bulk, force_insert=force_insert)
es.index(document,
index=index,
doc_type=doc_type,
id=document['id'],
bulk=bulk,
force_insert=force_insert)

if refresh:
es.refresh(timesleep=0)
Expand All @@ -203,10 +190,11 @@ def unindex(cls, id):
if not settings.ES_LIVE_INDEXING:
return

index = cls.get_es_index()
doc_type = cls._meta.db_table
try:
elasticutils.get_es().delete(index, doc_type, id)
elasticutils.get_es().delete(settings.ES_INDEXES['default'],
doc_type,
id)
except pyes.exceptions.NotFoundException:
# Ignore the case where we try to delete something that's
# not there.
Expand Down
8 changes: 3 additions & 5 deletions apps/sumo/tests/__init__.py
Expand Up @@ -78,16 +78,15 @@ def tearDown(self):
self.teardown_indexes()
super(ElasticTestCase, self).tearDown()

def refresh(self, index='default'):
def refresh(self):
# Any time we're doing a refresh, we're making sure that the
# index is ready to be queried. Given that, it's almost
# always the case that we want to run all the generated tasks,
# then refresh.
from search.models import generate_tasks
generate_tasks()

es = get_es()
es.refresh(settings.ES_INDEXES[index], timesleep=0)
get_es().refresh(settings.ES_INDEXES['default'], timesleep=0)

def setup_indexes(self):
"""(Re-)create ES indexes."""
Expand Down Expand Up @@ -115,8 +114,7 @@ def setup_indexes(self):

def teardown_indexes(self):
es = get_es()
for index in settings.ES_INDEXES.values():
es.delete_index_if_exists(index)
es.delete_index_if_exists(settings.ES_INDEXES['default'])

settings.ES_LIVE_INDEXING = False

Expand Down
33 changes: 14 additions & 19 deletions apps/wiki/models.py
Expand Up @@ -628,25 +628,20 @@ def is_watched_by(self, user):

@classmethod
def get_mapping(cls):
mapping = {
'properties': {
'id': {'type': 'integer'},
'title': {'type': 'string', 'analyzer': 'snowball'},
'locale': {'type': 'string', 'index': 'not_analyzed'},
'current': {'type': 'integer'},
'parent_id': {'type': 'integer'},
'content':
{'type': 'string', 'analyzer': 'snowball'},
'category': {'type': 'integer'},
'slug': {'type': 'string'},
'is_archived': {'type': 'boolean'},
'summary': {'type': 'string', 'analyzer': 'snowball'},
'keywords': {'type': 'string', 'analyzer': 'snowball'},
'updated': {'type': 'date'},
'tag': {'type': 'string'}
}
}
return mapping
return {
'id': {'type': 'integer'},
'title': {'type': 'string', 'analyzer': 'snowball'},
'locale': {'type': 'string', 'index': 'not_analyzed'},
'current': {'type': 'integer'},
'parent_id': {'type': 'integer'},
'content': {'type': 'string', 'analyzer': 'snowball'},
'category': {'type': 'integer'},
'slug': {'type': 'string'},
'is_archived': {'type': 'boolean'},
'summary': {'type': 'string', 'analyzer': 'snowball'},
'keywords': {'type': 'string', 'analyzer': 'snowball'},
'updated': {'type': 'date'},
'tag': {'type': 'string'}}

def extract_document(self):
d = {}
Expand Down
2 changes: 1 addition & 1 deletion settings.py
Expand Up @@ -592,7 +592,7 @@ def JINJA_CONFIG():
#
# Connection information for Elastic
ES_HOSTS = ['127.0.0.1:9200']
ES_INDEXES = {'default': 'sumo'}
ES_INDEXES = {'default': 'sumo'} # Doesn't support non-default indexes atm.
ES_LIVE_INDEXING = False # Keep indexes up to date as objects are made/deleted
ES_TIMEOUT = 5 # 5 second timeouts for querying
ES_INDEXING_TIMEOUT = 30 # 30 second timeouts for all things indexing
Expand Down

0 comments on commit f2b4463

Please sign in to comment.