diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dc9daa7d..c633bb28 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -122,6 +122,7 @@ jobs: run: | TOX_ENV=$(echo "py${{ matrix.python-version }}-django-${{ matrix.django-version }}-es${{ matrix.es-dsl-version }}" | tr -d .) python -m tox -e $TOX_ENV -- --elasticsearch + python -m tox -e $TOX_ENV -- --elasticsearch --signal-processor celery - name: Publish Coverage Report uses: codecov/codecov-action@v1 diff --git a/django_elasticsearch_dsl/documents.py b/django_elasticsearch_dsl/documents.py index dcbfd784..4671064d 100644 --- a/django_elasticsearch_dsl/documents.py +++ b/django_elasticsearch_dsl/documents.py @@ -219,6 +219,13 @@ def _get_actions(self, object_list, action): for object_instance in object_list: if action == 'delete' or self.should_index_object(object_instance): yield self._prepare_action(object_instance, action) + + def get_actions(self, object_list, action): + """ + Generate the elasticsearch payload. + """ + return self._get_actions(object_list, action) + def _bulk(self, *args, **kwargs): """Helper for switching between normal and parallel bulk operation""" diff --git a/django_elasticsearch_dsl/registries.py b/django_elasticsearch_dsl/registries.py index 72510610..e2623ddd 100644 --- a/django_elasticsearch_dsl/registries.py +++ b/django_elasticsearch_dsl/registries.py @@ -174,5 +174,11 @@ def get_indices(self, models=None): return set(iterkeys(self._indices)) + def __contains__(self, model): + """ + Checks that model is in registry + """ + return model in self._models or model in self._related_models + registry = DocumentRegistry() diff --git a/django_elasticsearch_dsl/signals.py b/django_elasticsearch_dsl/signals.py index 35a631c4..48f42249 100644 --- a/django_elasticsearch_dsl/signals.py +++ b/django_elasticsearch_dsl/signals.py @@ -7,10 +7,13 @@ from __future__ import absolute_import from django.db import models +from django.apps import apps from django.dispatch import Signal - from .registries import registry - +from django.core.exceptions import ObjectDoesNotExist +from importlib import import_module +# Sent after document indexing is completed +post_index = Signal() class BaseSignalProcessor(object): """Base signal processor. @@ -96,6 +99,124 @@ def teardown(self): models.signals.m2m_changed.disconnect(self.handle_m2m_changed) models.signals.pre_delete.disconnect(self.handle_pre_delete) +try: + from celery import shared_task +except ImportError: + pass +else: + class CelerySignalProcessor(RealTimeSignalProcessor): + """Celery signal processor. + + Allows automatic updates on the index as delayed background tasks using + Celery. + + NB: We cannot process deletes as background tasks. + By the time the Celery worker would pick up the delete job, the + model instance would already deleted. We can get around this by + setting Celery to use `pickle` and sending the object to the worker, + but using `pickle` opens the application up to security concerns. + """ -# Sent after document indexing is completed -post_index = Signal() + def handle_save(self, sender, instance, **kwargs): + """Handle save with a Celery task. + + Given an individual model instance, update the object in the index. + Update the related objects either. + """ + pk = instance.pk + app_label = instance._meta.app_label + model_name = instance.__class__.__name__ + + self.registry_update_task.delay(pk, app_label, model_name) + self.registry_update_related_task.delay(pk, app_label, model_name) + + def handle_pre_delete(self, sender, instance, **kwargs): + """Handle removing of instance object from related models instance. + We need to do this before the real delete otherwise the relation + doesn't exists anymore and we can't get the related models instance. + """ + self.prepare_registry_delete_related_task(instance) + + def handle_delete(self, sender, instance, **kwargs): + """Handle delete. + + Given an individual model instance, delete the object from index. + """ + self.prepare_registry_delete_task(instance) + + def prepare_registry_delete_related_task(self, instance): + """ + Select its related instance before this instance was deleted. + And pass that to celery. + """ + action = 'index' + for doc in registry._get_related_doc(instance): + doc_instance = doc(related_instance_to_ignore=instance) + try: + related = doc_instance.get_instances_from_related(instance) + except ObjectDoesNotExist: + related = None + if related is not None: + doc_instance.update(related) + if isinstance(related, models.Model): + object_list = [related] + else: + object_list = related + bulk_data = list(doc_instance._get_actions(object_list, action)), + self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data) + + @shared_task() + def registry_delete_task(doc_label, data): + """ + Handle the bulk delete data on the registry as a Celery task. + The different implementations used are due to the difference between delete and update operations. + The update operation can re-read the updated data from the database to ensure eventual consistency, + but the delete needs to be processed before the database record is deleted to obtain the associated data. + """ + doc_instance = import_module(doc_label) + parallel = True + doc_instance._bulk(bulk_data, parallel=parallel) + + def prepare_registry_delete_task(self, instance): + """ + Get the prepare did before database record deleted. + """ + action = 'delete' + for doc in registry._get_related_doc(instance): + doc_instance = doc(related_instance_to_ignore=instance) + try: + related = doc_instance.get_instances_from_related(instance) + except ObjectDoesNotExist: + related = None + if related is not None: + doc_instance.update(related) + if isinstance(related, models.Model): + object_list = [related] + else: + object_list = related + bulk_data = list(doc_instance.get_actions(object_list, action)), + self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data) + + @shared_task() + def registry_update_task(pk, app_label, model_name): + """Handle the update on the registry as a Celery task.""" + try: + model = apps.get_model(app_label, model_name) + except LookupError: + pass + else: + registry.update( + model.objects.get(pk=pk) + ) + + @shared_task() + def registry_update_related_task(pk, app_label, model_name): + """Handle the related update on the registry as a Celery task.""" + try: + model = apps.get_model(app_label, model_name) + except LookupError: + pass + else: + registry.update_related( + model.objects.get(pk=pk) + ) diff --git a/docs/source/settings.rst b/docs/source/settings.rst index 27e8e136..ae8d5eef 100644 --- a/docs/source/settings.rst +++ b/docs/source/settings.rst @@ -37,8 +37,15 @@ An example: Defaults to ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor``. -You could, for instance, make a ``CelerySignalProcessor`` which would add -update jobs to the queue to for delayed processing. +Options: ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor`` \ ``django_elasticsearch_dsl.signals.CelerySignalProcessor`` + +In this ``CelerySignalProcessor`` implementation, +Create and update operations will record the updated data primary key from the database and delay the time to find the association to ensure eventual consistency. +Delete operations are processed to obtain associated data before database records are deleted. +And celery needs to be pre-configured in the django project, for example `Using Celery with Django `. + +You could, for instance, make a ``CustomSignalProcessor`` which would apply +update jobs as your wish. ELASTICSEARCH_DSL_PARALLEL ========================== diff --git a/requirements.txt b/requirements.txt index cb1678c7..77ddcb3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ django>=1.9.6 elasticsearch-dsl>=7.0.0,<8.0.0 - diff --git a/requirements_test.txt b/requirements_test.txt index 9586a2ad..ba387ec5 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -3,6 +3,7 @@ mock>=1.0.1 flake8>=2.1.0 tox>=1.7.0 Pillow==6.2.0 +celery>=4.1.0 # Additional test requirements go here diff --git a/runtests.py b/runtests.py index 89451b54..43cc5178 100644 --- a/runtests.py +++ b/runtests.py @@ -2,11 +2,19 @@ import sys import argparse +from celery import Celery + try: from django.conf import settings from django.test.utils import get_runner - def get_settings(): + def get_settings(signal_processor): + PROCESSOR_CLASSES = { + 'realtime': 'django_elasticsearch_dsl.signals.RealTimeSignalProcessor', + 'celery': 'django_elasticsearch_dsl.signals.CelerySignalProcessor', + } + + signal_processor = PROCESSOR_CLASSES[signal_processor] settings.configure( DEBUG=True, USE_TZ=True, @@ -31,6 +39,10 @@ def get_settings(): }, }, DEFAULT_AUTO_FIELD="django.db.models.BigAutoField", + CELERY_BROKER_URL='memory://localhost/', + CELERY_TASK_ALWAYS_EAGER=True, + CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, + ELASTICSEARCH_DSL_SIGNAL_PROCESSOR=signal_processor ) try: @@ -41,6 +53,9 @@ def get_settings(): else: setup() + app = Celery() + app.config_from_object('django.conf:settings', namespace='CELERY') + app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) return settings except ImportError: @@ -59,6 +74,13 @@ def make_parser(): const='localhost:9200', help="To run integration test against an Elasticsearch server", ) + parser.add_argument( + '--signal-processor', + nargs='?', + default='realtime', + choices=('realtime', 'celery'), + help='Defines which signal backend to choose' + ) return parser @@ -70,7 +92,9 @@ def run_tests(*test_args): if not test_args: test_args = ['tests'] - settings = get_settings() + signal_processor = args.signal_processor + + settings = get_settings(signal_processor) TestRunner = get_runner(settings) test_runner = TestRunner() diff --git a/setup.py b/setup.py index 35b815d8..eeccb76f 100755 --- a/setup.py +++ b/setup.py @@ -72,4 +72,7 @@ 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', ], + extras_require={ + 'celery': ["celery>=4.1.0"], + } ) diff --git a/tests/test_documents.py b/tests/test_documents.py index 78e5dc43..40c36ce1 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -1,5 +1,7 @@ import json from unittest import TestCase +from unittest import SkipTest + import django from django.db import models @@ -64,7 +66,19 @@ class Index: doc_type = 'car_document' -class DocTypeTestCase(TestCase): +class BaseDocTypeTestCase(object): + TARGET_PROCESSOR = None + + @classmethod + def setUpClass(cls): + from django.conf import settings + if cls.TARGET_PROCESSOR != settings.ELASTICSEARCH_DSL_SIGNAL_PROCESSOR: + raise SkipTest( + "Skipped because {} is required, not {}".format( + cls.TARGET_PROCESSOR, settings.ELASTICSEARCH_DSL_SIGNAL_PROCESSOR + ) + ) + super(BaseDocTypeTestCase,cls).setUpClass() def test_model_class_added(self): self.assertEqual(CarDocument.django.model, Car) @@ -538,3 +552,10 @@ def should_index_object(self, obj): data_body = mock_bulk.call_args[1]['body'] self.assertTrue(article1.slug in data_body) self.assertTrue(article2.slug not in data_body) + +class RealTimeDocTypeTestCase(BaseDocTypeTestCase, TestCase): + TARGET_PROCESSOR = 'django_elasticsearch_dsl.signals.RealTimeSignalProcessor' + + +class CeleryDocTypeTestCase(BaseDocTypeTestCase, TestCase): + TARGET_PROCESSOR = 'django_elasticsearch_dsl.signals.CelerySignalProcessor' diff --git a/tests/test_integration.py b/tests/test_integration.py index fad7b753..f01d5781 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -3,7 +3,7 @@ import django from django.core.management import call_command -from django.test import TestCase +from django.test import TestCase, TransactionTestCase if django.VERSION < (4, 0): from django.utils.translation import ugettext_lazy as _ else: @@ -29,7 +29,7 @@ @unittest.skipUnless(is_es_online(), 'Elasticsearch is offline') -class IntegrationTestCase(ESTestCase, TestCase): +class IntegrationTestCase(ESTestCase, TransactionTestCase): def setUp(self): super(IntegrationTestCase, self).setUp() self.manufacturer = Manufacturer(