Skip to content

Commit

Permalink
Add task_acks_late configuration to Celery Executor (#37066)
Browse files Browse the repository at this point in the history
* get task_acks_late from config

* add task_acks_late to config

* Update airflow/providers/celery/provider.yaml

Co-authored-by: Hussein Awala <hussein@awala.fr>

* Update airflow/providers/celery/executors/default_celery.py

Co-authored-by: Hussein Awala <hussein@awala.fr>

* add basic test

* test acks_late False

* linting

* Update airflow/providers/celery/provider.yaml

Co-authored-by: Niko Oliveira <onikolas@amazon.com>

* Update airflow/providers/celery/provider.yaml

Co-authored-by: Niko Oliveira <onikolas@amazon.com>

* linting

* Update spelling_wordlist.txt

* alphabetic order

* define version added

* Update airflow/providers/celery/provider.yaml

* Update airflow/providers/celery/provider.yaml

Co-authored-by: Jarek Potiuk <jarek@potiuk.com>

---------

Co-authored-by: Hussein Awala <hussein@awala.fr>
Co-authored-by: Niko Oliveira <onikolas@amazon.com>
Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
  • Loading branch information
4 people committed Feb 5, 2024
1 parent 41ebf28 commit 6c72223
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/providers/celery/executors/default_celery.py
Expand Up @@ -72,7 +72,7 @@ def _broker_supports_visibility_timeout(url):
"accept_content": ["json"],
"event_serializer": "json",
"worker_prefetch_multiplier": conf.getint("celery", "worker_prefetch_multiplier", fallback=1),
"task_acks_late": True,
"task_acks_late": conf.getboolean("celery", "task_acks_late", fallback=True),
"task_default_queue": conf.get("operators", "DEFAULT_QUEUE"),
"task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
"task_track_started": conf.getboolean("celery", "task_track_started", fallback=True),
Expand Down
15 changes: 15 additions & 0 deletions airflow/providers/celery/provider.yaml
Expand Up @@ -276,6 +276,21 @@ config:
type: float
example: ~
default: "1.0"
task_acks_late:
description: |
If an Airflow task's execution time exceeds the visibility_timeout, Celery will re-assign the
task to a Celery worker, even if the original task is still running successfully. The new task
instance then runs concurrently with the original task and the Airflow UI and logs only show an
error message:
'Task Instance Not Running' FAILED: Task is in the running state'
Setting task_acks_late to True will force Celery to wait until a task is finished before a
new task instance is assigned. This effectively overrides the visibility timeout.
See also:
https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late
version_added: 3.6.0
type: boolean
example: "True"
default: "True"
task_track_started:
description: |
Celery task will report its status as 'started' when the task is executed by a worker.
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -7,6 +7,7 @@ Ack
ack
ackIds
acknowledgement
acks
actionCard
Acyclic
acyclic
Expand Down
9 changes: 9 additions & 0 deletions tests/providers/celery/executors/test_celery_executor.py
Expand Up @@ -358,3 +358,12 @@ def test_sentinel_kwargs_loaded_from_string():
assert default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["sentinel_kwargs"] == {
"service_name": "mymaster"
}


@conf_vars({("celery", "task_acks_late"): "False"})
def test_celery_task_acks_late_loaded_from_string():
import importlib

# reload celery conf to apply the new config
importlib.reload(default_celery)
assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is False

0 comments on commit 6c72223

Please sign in to comment.