Permalink
Browse files

Post commit hooks for ElasticSearch

  • Loading branch information...
1 parent e317075 commit 1214449dc319220cfa6706d03bc4975a2406dd29 @davedash davedash committed Jun 10, 2011
Showing with 116 additions and 2 deletions.
  1. +40 −1 apps/feedback/models.py
  2. +1 −1 apps/input/__init__.py
  3. +24 −0 apps/search/cron.py
  4. +12 −0 apps/search/tasks.py
  5. +30 −0 apps/search/tests/test_elastic.py
  6. +8 −0 docs/elasticsearch.rst
  7. +1 −0 settings_test.py
View
@@ -5,13 +5,19 @@
from django.db.models import Count, signals
import caching.base
+import commonware
+from elasticutils import es_required
+from pyes import djangoutils
+from pyes.exceptions import NotFoundException as PyesNotFoundException
from feedback import query, utils
from feedback.utils import ua_parse, extract_terms, smart_truncate
from input import PRODUCT_IDS, OPINION_TYPES, OPINION_PRAISE, PLATFORMS
from input.models import ModelBase
from input.urlresolvers import reverse
+log = commonware.log.getLogger('feedback')
+
class OpinionManager(caching.base.CachingManager):
def browse(self, **kwargs):
@@ -41,7 +47,7 @@ class Opinion(ModelBase):
"""A single feedback item."""
_type = models.PositiveSmallIntegerField(blank=True,
db_column='type', default=OPINION_PRAISE.id, db_index=True)
-
+
url = models.URLField(verify_exists=False, blank=True)
description = models.TextField(blank=True)
terms = models.ManyToManyField('Term', related_name='used_in')
@@ -103,6 +109,28 @@ def type(self):
def get_url_path(self):
return reverse('opinion.detail', args=(self.id,))
+ @es_required
+ def update_index(self, es, bulk=False):
+ data = djangoutils.get_values(self)
+ try:
+ es.index(data, settings.ES_INDEX, 'opinion', self.id, bulk=bulk)
+ except Exception, e:
+ log.error("ElasticSearch errored for opinion (%s): %s" % (self, e))
+ else:
+ log.debug('Opinion %d added to search index.' % self.id)
+
+ @es_required
+ def remove_from_index(self, es, bulk=False):
+ try:
+ es.delete(settings.ES_INDEX, 'opinion', self.id, bulk=bulk)
+ except PyesNotFoundException:
+ pass
+ except Exception, e:
+ log.error("ElasticSearch error removing opinion (%s): %s" %
+ (self, e))
+ else:
+ log.debug('Opinion %d removed from search index.' % self.id)
+
def parse_user_agent(sender, instance, **kw):
parsed = ua_parse(instance.user_agent)
@@ -112,6 +140,7 @@ def parse_user_agent(sender, instance, **kw):
instance.platform = parsed['platform']
+# TODO: add this to celery
def extract_terms(sender, instance, **kw):
if instance.terms.all() or settings.DISABLE_TERMS:
return
@@ -121,9 +150,19 @@ def extract_terms(sender, instance, **kw):
this_term, created = Term.objects.get_or_create(term=term)
instance.terms.add(this_term)
+def post_to_elastic(sender, instance, **kw):
+ """Asynchronously update the opinion in ElasticSearch."""
+ from search import tasks
+ tasks.add_to_index.delay([instance.id])
+
signals.pre_save.connect(parse_user_agent, sender=Opinion)
signals.post_save.connect(extract_terms, sender=Opinion,
dispatch_uid='extract_terms')
+signals.post_save.connect(post_to_elastic, sender=Opinion)
+
+unindex_opinion = lambda instance, **kwargs: instance.remove_from_index()
+signals.post_delete.connect(unindex_opinion, sender=Opinion)
+
# post_Save for POST to metrics
class TermManager(models.Manager):
View
@@ -69,7 +69,7 @@ class OPINION_BROKEN:
OPINION_TYPES_USAGE = OPINION_PRAISE, OPINION_ISSUE, OPINION_IDEA
OPINION_TYPES = dict((type.id, type) for type in OPINION_TYPES_USAGE)
-
+OPINION_USAGE = (OPINION_PRAISE, OPINION_ISSUE, OPINION_IDEA)
## Applications
class FIREFOX:
View
@@ -0,0 +1,24 @@
+import commonware.log
+import cronjobs
+from celery.messaging import establish_connection
+from celeryutils import chunked
+
+import input
+from feedback.models import Opinion
+from search import tasks
+
+log = commonware.log.getLogger('i.cron')
+
+
+@cronjobs.register
+def index_all():
+ """
+ This reindexes all the Opinions in usage. This is not intended to be run
+ other than to initially seed Elastic Search.
+ """
+ ids = (Opinion.objects
+ .filter(_type__in=[i.id for i in input.OPINION_USAGE])
+ .values_list('id', flat=True))
+ with establish_connection() as conn:
+ for chunk in chunked(ids, 1000):
+ tasks.add_to_index.apply_async(args=[chunk], connection=conn)
View
@@ -0,0 +1,12 @@
+from celeryutils import task
+
+from feedback.models import Opinion
+from elasticutils import get_es, es_required
+
+
+@task
+@es_required
+def add_to_index(pks, es, **kw):
+ for opinion in Opinion.objects.filter(pk__in=pks):
+ opinion.update_index(bulk=True)
+ es.force_bulk()
@@ -0,0 +1,30 @@
+"""
+These tests specifically test elasticsearch related things.
+
+Specifically:
+
+ * creating an opinion will add it to the elastic search index.
+ * deleteing an opinion will remove it from the index.
+"""
+
+from elasticutils.tests import ESTestCase
+from elasticutils import S
+from nose.tools import eq_
+
+from feedback.models import Opinion
+
+
+class TestElastic(ESTestCase):
+ def test_index(self):
+ o = Opinion.objects.create(
+ product=1,
+ description='Get me a chocolate milk, with extra salt.')
+ self.es.refresh()
+ eq_(len(S('chocolate')), 1)
+ return o
+
+ def test_delete(self):
+ a = self.test_index()
+ a.delete()
+ self.es.refresh()
+ eq_(len(S('chocolate')), 0, 'We deleted this... WTF?')
View
@@ -0,0 +1,8 @@
+==============
+Elastic Search
+==============
+
+We use ElasticSearch. Download the latest version and follow the instructions
+in ElasticUtils_.
+
+.. _ElasticUtils: http://elasticutils.rtfd.org
View
@@ -1 +1,2 @@
DISABLE_TERMS = True
+ES_DISABLED = True

0 comments on commit 1214449

Please sign in to comment.