Skip to content

Commit

Permalink
Custom ES Celery Signal Processing
Browse files Browse the repository at this point in the history
* using celery signal processing custom class for model upserts
* celery tasks to trigger model upserts on ES
  • Loading branch information
snyaggarwal committed Aug 21, 2020
1 parent c3873ef commit f1bce45
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 0 deletions.
22 changes: 22 additions & 0 deletions core/common/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from celery.result import AsyncResult
from django.conf import settings
from django.contrib.postgres.fields import JSONField, ArrayField
from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db import models, IntegrityError
from django.db.models import Max, Value
from django.db.models.expressions import CombinedExpression, F
from django.utils import timezone
from django_elasticsearch_dsl.registries import registry
from django_elasticsearch_dsl.signals import RealTimeSignalProcessor
from pydash import get

from core.common.services import S3
Expand All @@ -15,6 +18,7 @@
ACCESS_TYPE_CHOICES, DEFAULT_ACCESS_TYPE, NAMESPACE_REGEX,
ACCESS_TYPE_VIEW, ACCESS_TYPE_EDIT, SUPER_ADMIN_USER_ID,
HEAD)
from .tasks import handle_save, handle_m2m_changed


class BaseModel(models.Model):
Expand Down Expand Up @@ -53,6 +57,14 @@ class Meta:
extras_have_been_decoded = False
is_being_saved = False

@property
def model_name(self):
return self.__class__.__name__

@property
def app_name(self):
return self.__module__.split('.')[1]

def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
if not self.internal_reference_id and self.id:
self.internal_reference_id = str(self.id)
Expand Down Expand Up @@ -569,3 +581,13 @@ def get_export_url(self):

def has_export(self):
return bool(self.get_export_url())


class CelerySignalProcessor(RealTimeSignalProcessor):
def handle_save(self, sender, instance, **kwargs):
if settings.ES_SYNC and instance.__class__ in registry.get_models():
handle_save.delay(instance.app_name, instance.model_name, instance.id)

def handle_m2m_changed(self, sender, instance, action, **kwargs):
if settings.ES_SYNC and instance.__class__ in registry.get_models():
handle_m2m_changed.delay(instance.app_name, instance.model_name, instance.id, action)
39 changes: 39 additions & 0 deletions core/common/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from celery.utils.log import get_task_logger
from celery_once import QueueOnce
from django.apps import apps
from django.core.management import call_command
from django_elasticsearch_dsl.registries import registry

from core.celery import app
from core.common.utils import write_export_file
Expand Down Expand Up @@ -59,3 +62,39 @@ def add_references(
head.remove_processing(self.request.id)

return added_references, errors


def __handle_save(instance):
registry.update(instance)
registry.update_related(instance)


def __handle_pre_delete(instance):
registry.delete_related(instance)


@app.task
def handle_save(app_name, model_name, instance_id):
__handle_save(apps.get_model(app_name, model_name).objects.get(id=instance_id))


@app.task
def handle_m2m_changed(app_name, model_name, instance_id, action):
instance = apps.get_model(app_name, model_name).objects.get(id=instance_id)
if action in ('post_add', 'post_remove', 'post_clear'):
__handle_save(instance)
elif action in ('pre_remove', 'pre_clear'):
__handle_pre_delete(instance)


@app.task
def handle_pre_delete(app_name, model_name, instance_id):
__handle_pre_delete(apps.get_model(app_name, model_name).objects.get(id=instance_id))


@app.task
def populate_indexes(app_names=None): # app_names has to be an iterable of strings
if app_names:
call_command('search_index', '--populate', '-f', '--models', *app_names, '--parallel')
else:
call_command('search_index', '--populate', '-f', '--parallel')
5 changes: 5 additions & 0 deletions core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,8 @@
}
CELERY_TASK_RESULT_EXPIRES = 259200 # 72 hours
CELERY_TRACK_STARTED = True
ELASTICSEARCH_DSL_PARALLEL = True
ELASTICSEARCH_DSL_AUTO_REFRESH = True
ELASTICSEARCH_DSL_AUTOSYNC = True
ELASTICSEARCH_DSL_SIGNAL_PROCESSOR = 'core.common.models.CelerySignalProcessor'
ES_SYNC = True

0 comments on commit f1bce45

Please sign in to comment.