diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 4dee7f4dfbcc..4c976cfe50ff 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -369,9 +369,7 @@ def _check_for_stalled_adopted_tasks(self): "\n\t".join(repr(x) for x in timedout_keys), ) for key in timedout_keys: - self.event_buffer[key] = (State.FAILED, None) - del self.tasks[key] - del self.adopted_task_timeouts[key] + self.change_state(key, State.FAILED) def debug_dump(self) -> None: """Called in response to SIGUSR2 by the scheduler""" diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 91a18c7921f9..cd9f0f9be966 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -371,10 +371,12 @@ def test_check_for_stalled_adopted_tasks(self): key_1: queued_dttm + executor.task_adoption_timeout, key_2: queued_dttm + executor.task_adoption_timeout, } + executor.running = {key_1, key_2} executor.tasks = {key_1: AsyncResult("231"), key_2: AsyncResult("232")} executor.sync() assert executor.event_buffer == {key_1: (State.FAILED, None), key_2: (State.FAILED, None)} assert executor.tasks == {} + assert executor.running == set() assert executor.adopted_task_timeouts == {}