Skip to content

Commit

Permalink
Improve celery extension documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
matllubos committed Jan 20, 2023
1 parent 1946cdc commit 8ed0e31
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 30 deletions.
2 changes: 1 addition & 1 deletion django_celery_extensions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
DEFAULTS = {
'CACHE_NAME': 'default',
'UNIQUE_TASK_KEY_PREFIX': 'django-celery-extensions|unique',
'IGNORE_TASK_AFTER_SUCCESS_KEY_PREFIX': 'django-celery-extensions|ignore-after-success',
'IGNORE_TASK_KEY_PREFIX': 'django-celery-extensions|ignore',
'BEATER_LOCK_KEY': 'django-celery-extensions|lock',
'LOCK_TIMEOUT': DEFAULT_MAX_INTERVAL * 5,
'LOCK_SLEEP': DEFAULT_MAX_INTERVAL,
Expand Down
2 changes: 1 addition & 1 deletion django_celery_extensions/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def _get_unique_key(self, task_args, task_kwargs):

def _get_ignore_task_key(self, task_args, task_kwargs):
return self.unique_key_generator(
settings.IGNORE_TASK_AFTER_SUCCESS_KEY_PREFIX, task_args, task_kwargs
settings.IGNORE_TASK_KEY_PREFIX, task_args, task_kwargs
)

def is_processing(self, args=None, kwargs=None):
Expand Down
60 changes: 48 additions & 12 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,81 @@ You can configure the library in Django settings. Following options are availabl
Name of the django cache used for locked scheduler and uniqueness of a task.


* ``DJANGO_CELERY_EXTENSIONS_KEY_PREFIX`` (default: ``'django-celery-extensions'``)
* ``DJANGO_CELERY_EXTENSIONS_BEATER_LOCK_KEY`` (default: ``'django-celery-extensions|lock''``)

Cache key prefix used for locked scheduler and uniqueness of a task.
Cache key prefix used for locked scheduler.

* ``DJANGO_CELERY_EXTENSIONS_IUNIQUE_TASK_KEY_PREFIX`` (default: ``'django-celery-extensions|unique''``)

* ``DJANGO_CELERY_EXTENSIONS_LOCK_KEY`` (default: ``'lock'``)
Value of the setting is used to generate cache key for the uniqueness of the tasks.

Name of the key used for locked scheduler.
* ``DJANGO_CELERY_EXTENSIONS_IGNORE_TASK_KEY_PREFIX`` (default: ``'django-celery-extensions|ignore''``)

Value of the setting is used to generate cache key for the tasks ignore logic.

* ``DJANGO_CELERY_EXTENSIONS_LOCK_TIMEOUT`` (default: ``celery.beat.DEFAULT_MAX_INTERVAL * 5``)

Time of the scheduler lock.


* ``DJANGO_CELERY_EXTENSIONS_LOCK_SLEEP`` (default: ``celery.beat.DEFAULT_MAX_INTERVAL``)

Sleep time interval to check if schduler is still locked.


* ``DJANGO_CELERY_EXTENSIONS_DEFAULT_TASK_STALE_TIME_LIMIT`` (default: ``None``)

Default time of the task when it will be set as stale.


* ``DJANGO_CELERY_EXTENSIONS_DEFAULT_TASK_MAX_QUEUE_WAITING_TIME`` (default: ``None``)

Maximal time which task will be waiting in the queue. Value of ``task_stale_limit`` can be computed from this value.


* ``DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DJANGO_COMMANDS`` (default: ``{}``)

Dictionary of django commands which will be converted into celery tasks.
Dictionary of django commands which will be converted into celery tasks::

DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DJANGO_COMMANDS = {
'django_command_name': {
'queue': 'fast',
# Another celery task configuration
}


}

* ``DJANGO_CELERY_EXTENSIONS_CELERY_AUTODISCOVER`` (default: ``False``)

Run the celery tasks auto discover after the Django application initialization.

* ``DJANGO_CELERY_EXTENSIONS_CELERY_SETTINGS`` (default: ``None``)

Path to the celery setting which is automatically imported after the Django application initialization.

* ``DJANGO_CELERY_EXTENSIONS_CELERY_TASK_CHECKER`` (default: ``None``)

Path to the module with implemented Django checker of the celery tasks configuration::

# Django settings
DJANGO_CELERY_EXTENSIONS_CELERY_TASK_CHECKER = 'celery_check'

# celery_check.py file
from django.core.checks import Error

def check_celery_task(task_name, task):
if not hasattr(task, 'queue'):
return Error(
f'Task with name "{task_name}" has not set queue',
hint=f'Set one of celery task queues "{CeleryQueue}"',
id='celery_task.E001',
obj=task
)

* ``DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DEFAULT_CELERY_KWARGS`` (default: ``{}``)

Default celery task kwargs which will be used for auto generated tasks from django commands.
Default celery task kwargs which will be used for auto generated tasks from django commands::

DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DEFAULT_CELERY_KWARGS = {'queue': 'fast'} # All celery tasks will be set with the fast celery queue


* ``AUTO_SQS_MESSAGE_GROUP_ID_FROM_TASK_NAME`` (default: ``False``)
* ``DJANGO_CELERY_EXTENSIONS_AUTO_SQS_MESSAGE_GROUP_ID`` (default: ``False``)

Whether to automatically set `MessageGroupId <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html>` to Celery task name or not.
Whether to automatically set `MessageGroupId <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html>` to Celery task name or not. Is recommended use this settings with the AWS SQS queue.
35 changes: 30 additions & 5 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@
Installation
============

Library purpose
---------------

Library extends celery framework with these improvements:
* Automatic Django commands conversion to the celery tasks
* Improve celery signals with on apply, trigger, unique or timeout events
* Add possibility to create the unique celery task
* Add possibility to ignore task invocation for the defined timeout
* Fix some celery bugs with the expiration
* Better AWS SQS support
* Apply a task and wait for the result for the timeout
* Celery beater implementation which will ensure that only one beater can be active if more beater are running at the same time
* Define celery queues in the enums
* Use Django checks to validate celery tasks settings

Stable release
--------------
Expand Down Expand Up @@ -59,16 +73,16 @@ Once installed, add the library to ``INSTALLED_APPS`` in your Django project set
INSTALLED_APPS = [
...
'django_celery_extensions',
...
]

For your celery configuration use ``django_celery_extensions.celery.Celery`` class::

from django_celery_extensions.celery import Celery


app = Celery('example')

You can use ``django_celery_extensions.celery.CeleryQueueEnum`` to define default configuration for tasks in this queue::
You can use ``django_celery_extensions.celery.CeleryQueueEnum`` to define default configuration for tasks in the queue::


from django_celery_extensions.celery import CeleryQueueEnum
Expand All @@ -77,9 +91,20 @@ You can use ``django_celery_extensions.celery.CeleryQueueEnum`` to define defaul
FAST = ('fast', {'time_limit': 10})


You can now define task and set the right queue::
You can define task and set the right queue now::

@celery_app.task(
queue=CeleryQueue.FAST)
@celery_app.task(queue=CeleryQueue.FAST)
def task_with_fast_queue():
return 'result'

If you need to override celery task class you should use ``django_celery_extensions.task.DjangoTask`` class::


from django_celery_extensions.task import DjangoTask

class YourTask(DjangoTask):
...

@celery_app.task(base=YourTask)
def your_task():
return 'result'
118 changes: 107 additions & 11 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,26 @@ Because ``Django`` framework often uses atomic transactions it is recomended cal
notify_user.delay_on_commit(user.pk) # similar to delay but with on_commit
notify_user.apply_async_and_get_result(args=(user.pk,), timeout=None, propagate=True) # call apply_async and wait specified timeout to task result. If result is not obtained to the specified time ``TimeoutError`` is raised

Default retry delays
^^^^^^^^^^^^^^^^^^^^

If you want to use celery autoretry but you want different retry times for different attempts, you can use default_retry_delays::

@celery_app.task(
base=DjangoTask,
bind=True,
autoretry_for=(NotifyException,),
default_retry_delays(60, 120, 180))
default_retry_delays=(60, 120, 180))
def notify_user(self, user_pk):
user = User.objects.get(pk=user_pk)
was_notified = notify(user)
return was_notified

The task will be retried three times. First attempt will be after 60 second, the second attempt will be after 120 second and third after 180 second.

Unique
^^^^^^

Sometimes it is necessary for a task with the same input to run only once. For this purpose you can use unique configuration::

@celery_app.task(
Expand All @@ -65,7 +71,29 @@ For unique tasks you can use ``is_processing`` method to check if task is runnin

notify_user.is_processing(args=(user_pk,))

Sometimes it is good convert ``Django`` commands to celery task. For example when you want to use celery beater instead of cron. For this purpose you can use ``DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DJANGO_COMMANDS`` setting to define which commands you want to convert into tasks::
For every unique task is generated and stored a key in the cache. The key is generated by a unique key generator. The function ``default_unique_key_generator`` is used as a generator by default and you can change change it with ``unique_key_generator`` property::

from kombu import serialization

def custom_unique_key_generator(task, prefix, task_args, task_kwargs):
return 'custome unique_key'

@celery_app.task(
base=DjangoTask,
bind=True,
unique=True,
unique_key_generator=custom_unique_key_generator)
def notify_user(self, user_pk):
user = User.objects.get(pk=user_pk)
was_notified = notify(user)
return was_notified

Default unique key generator generates unique key from celery task name and task input arguments.

Django commands to celery tasks
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Django commands can be converted automatically to the celery task. For example when you want to use celery beater instead of cron. For this purpose you can use ``DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DJANGO_COMMANDS`` setting to define which commands you want to convert into tasks::

DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DJANGO_COMMANDS = {
'clearsessions': {'unique': True},
Expand All @@ -79,13 +107,16 @@ If you want to call the command tasky by hand, you can use ``get_django_command_

get_django_command_task('clearsessions').delay_on_commit()

Ignore
^^^^^^

Some tasks can be run only once per specific time. For this purpose you can use ``ignore_task_after_success_timedelta``::
Celery tasks can be run only once per specific time. For this purpose you can use ``ignore_task_after_success`` and ``ignore_task_timedelta``::

@celery_app.task(
base=DjangoTask,
bind=True,
ignore_task_after_success_timedelta=timedelta(hours=5))
ignore_task_after_success=True,
ignore_task_timedelta=timedelta(hours=5))
def notify_user(self, user_pk):
user = User.objects.get(pk=user_pk)
was_notified = notify(user)
Expand All @@ -100,6 +131,78 @@ Now ``notify_user`` task will be ignored for 5 hours after the last successful c

If task ends in failure state it can be run again and will not be ignored.

A task can be set to ignore by hand with ``set_ignore_task`` method::

@celery_app.task(
base=DjangoTask,
bind=True,
ignore_task_timedelta=timedelta(hours=5))
def notify_user(self, user_pk):
user = User.objects.get(pk=user_pk)
was_notified = notify(user)
if was_notified:
self.set_ignore_task() # there can be specified the ignore task timedelta too self.set_ignore_task(ignore_task_timedelta=timedelta(hours=2))
return was_notified

Now the task will be ignored only if a user was successfully notified.

Signals
^^^^^^^

You can use ``DjangoTask`` new event methods to simplify your celery tasks logic. The `invocation_id` is unique UUID value generated with the task invocation::

class CustomTask(DjangoTask):

def on_invocation_apply(self, invocation_id, args, kwargs, options, result):
"""
Method is called when task was applied with the requester.
"""

def on_invocation_trigger(self, invocation_id, args, kwargs, task_id, options, result):
"""
Task has been triggered and placed in the queue.
"""

def on_invocation_unique(self, invocation_id, args, kwargs, task_id, options, result):
"""
Task has been triggered but the same task is already active.
"""

def on_invocation_ignored(self, invocation_id, args, kwargs, task_id, options, result):
"""
Task has been triggered but the task has set ignore_task_timedelta
and task was successfully completed in this timeout.
"""

def on_invocation_timeout(self, invocation_id, args, kwargs, task_id, ex, options, result):
"""
Task has been joined to another unique async result.
"""

def on_task_start(self, task_id, args, kwargs):
"""
Task has been started with worker.
"""

def on_task_retry(self, task_id, args, kwargs, exc, eta):
"""
Task failed but will be retried.
"""

def on_task_failure(self, task_id, args, kwargs, exc, einfo):
"""
Task failed and will not be retried.
"""

def on_task_success(self, task_id, args, kwargs, retval):
"""
Task was successful.
"""

@celery_app.task(base=CustomTask)
def custom_task(self, user_pk):
pass


Beater
------
Expand All @@ -109,10 +212,3 @@ Celery documentation warns against running more than one beater. But sometimes i
celery -A proj beat -s django_celery_extensions.beat.LockedPersistentScheduler

The scheduler will only work with configured ``redis_cache.RedisCache`` in ``Django`` settings.

Commands
--------

For development purposes ``Django`` provides autoreload funkcionality, which restarts django application when code is changes. Celery unfortunately doesn't support it but you can run celery via django command to achieve it::

./manage.py run_celery --type=worker/beater --celerysettings=settings.celery --autoreload --extra="some extra celery arguments"

0 comments on commit 8ed0e31

Please sign in to comment.