Skip to content

Commit

Permalink
Added acks_on_failure_or_timeout as a setting.
Browse files Browse the repository at this point in the history
Fixes #5377.
  • Loading branch information
thedrow committed Mar 12, 2019
1 parent 7d69b74 commit e6ca997
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 5 deletions.
1 change: 1 addition & 0 deletions celery/app/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def __repr__(self):
task=Namespace(
__old__=OLD_NS,
acks_late=Option(False, type='bool'),
acks_on_failure_or_timeout=Option(True, type='bool'),
always_eager=Option(False, type='bool'),
annotations=Option(type='any'),
compression=Option(type='string', old={'celery_message_compression'}),
Expand Down
5 changes: 3 additions & 2 deletions celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,12 @@ class Task(object):
#: fails or times out.
#:
#: Configuring this setting only applies to tasks that are
#: acknowledged **after** they have been executed.
#: acknowledged **after** they have been executed and only if
#: :setting:`task_acks_late` is enabled.
#:
#: The application default can be overridden with the
#: :setting:`task_acks_on_failure_or_timeout` setting.
acks_on_failure_or_timeout = True
acks_on_failure_or_timeout = None

#: Even if :attr:`acks_late` is enabled, the worker will
#: acknowledge tasks when the worker process executing them abruptly
Expand Down
15 changes: 15 additions & 0 deletions docs/userguide/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ have been moved into a new ``task_`` prefix.
``CELERY_SECURITY_CERT_STORE`` :setting:`security_cert_store`
``CELERY_SECURITY_KEY`` :setting:`security_key`
``CELERY_TASK_ACKS_LATE`` :setting:`task_acks_late`
``CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT`` :setting:`task_acks_on_failure_or_timeout`
``CELERY_TASK_ALWAYS_EAGER`` :setting:`task_always_eager`
``CELERY_TASK_ANNOTATIONS`` :setting:`task_annotations`
``CELERY_TASK_COMPRESSION`` :setting:`task_compression`
Expand Down Expand Up @@ -525,6 +526,20 @@ has been executed, not *just before* (the default behavior).

FAQ: :ref:`faq-acks_late-vs-retry`.

.. setting:: task_acks_on_failure_or_timeout

``task_acks_on_failure_or_timeout``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Default: Enabled

When enabled messages for all tasks will be acknowledged even if they
fail or time out.

Configuring this setting only applies to tasks that are
acknowledged **after** they have been executed and only if
:setting:`task_acks_late` is enabled.

.. setting:: task_reject_on_worker_lost

``task_reject_on_worker_lost``
Expand Down
44 changes: 41 additions & 3 deletions t/unit/worker/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,34 @@ def test_on_failure_acks_late(self):
except KeyError:
exc_info = ExceptionInfo()
job.on_failure(exc_info)
assert job.acknowledged
assert job.acknowledged

def test_on_failure_acks_on_failure_or_timeout_disabled_for_task(self):
job = self.xRequest()
job.time_start = 1
self.mytask.acks_late = True
self.mytask.acks_on_failure_or_timeout = False
try:
raise KeyError('foo')
except KeyError:
exc_info = ExceptionInfo()
job.on_failure(exc_info)
assert job.acknowledged is False

def test_on_failure_acks_on_failure_or_timeout_enabled_for_task(self):
job = self.xRequest()
job.time_start = 1
self.mytask.acks_late = True
self.mytask.acks_on_failure_or_timeout = True
try:
raise KeyError('foo')
except KeyError:
exc_info = ExceptionInfo()
job.on_failure(exc_info)
assert job.acknowledged is True

def test_on_failure_acks_on_failure_or_timeout(self):
def test_on_failure_acks_on_failure_or_timeout_disabled(self):
self.app.conf.acks_on_failure_or_timeout = False
job = self.xRequest()
job.time_start = 1
self.mytask.acks_late = True
Expand All @@ -628,7 +653,20 @@ def test_on_failure_acks_on_failure_or_timeout(self):
except KeyError:
exc_info = ExceptionInfo()
job.on_failure(exc_info)
assert job.acknowledged is False
assert job.acknowledged is False
self.app.conf.acks_on_failure_or_timeout = True

def test_on_failure_acks_on_failure_or_timeout_enabled(self):
self.app.conf.acks_on_failure_or_timeout = True
job = self.xRequest()
job.time_start = 1
self.mytask.acks_late = True
try:
raise KeyError('foo')
except KeyError:
exc_info = ExceptionInfo()
job.on_failure(exc_info)
assert job.acknowledged is True

def test_from_message_invalid_kwargs(self):
m = self.TaskMessage(self.mytask.name, args=(), kwargs='foo')
Expand Down

0 comments on commit e6ca997

Please sign in to comment.