Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.celery.executors import (
celery_executor_utils as _celery_executor_utils, # noqa: F401 # Needed to register Celery tasks at worker startup, see #63043
)
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
from airflow.providers.common.compat.sdk import AirflowTaskTimeout, Stats
from airflow.utils.state import TaskInstanceState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,3 +687,26 @@ def test_task_routing_through_team_specific_app(self, mock_send_tasks, monkeypat
# Critical: task belongs to team A's app, not module-level app
assert task_from_call.app is team_a_executor.celery_app
assert task_from_call.name == "execute_command"


def test_celery_tasks_registered_on_import():
"""
Ensure execute_workload (and execute_command for Airflow 2.x) are registered
with the Celery app when celery_executor is imported.

Regression test for https://github.com/apache/airflow/issues/63043
Celery provider 3.17.0 exposed that celery_executor_utils was never imported
at module level, so tasks were never registered at worker startup.
"""
from airflow.providers.celery.executors.celery_executor_utils import app

registered_tasks = list(app.tasks.keys())
assert "execute_workload" in registered_tasks, (
"execute_workload must be registered with the Celery app at import time. "
"Workers need this to receive tasks without KeyError."
)
# TODO: remove this block when min supported Airflow version is >= 3.0
if not AIRFLOW_V_3_0_PLUS:
assert "execute_command" in registered_tasks, (
"execute_command must be registered for Airflow 2.x compatibility."
)