From 298fdb2d9ed3c34b724c2a6173f23e311c571ab0 Mon Sep 17 00:00:00 2001 From: kimhaggie Date: Fri, 17 Apr 2026 15:30:06 +0900 Subject: [PATCH] Set task_callback_type when scheduler purges tasks without heartbeats When a task instance stops heartbeating (e.g. worker OOMKilled, node eviction), ``_purge_task_instances_without_heartbeats`` sends a ``TaskCallbackRequest`` to the DAG processor. The request was created without a ``task_callback_type``, so it defaulted to ``None``; the DAG processor's ``_execute_task_callbacks`` then dispatched the request to ``on_failure_callback`` even when the task still had retries remaining, producing spurious failure alerts. Use a direct ``ti.max_tries > 0 and ti.try_number <= ti.max_tries`` check rather than ``ti.is_eligible_to_retry()``: the scheduler does not eagerly load ``ti.task`` here, and the latter's "task not loaded" fallback ignores ``task.retries`` and can incorrectly report retry-eligibility for tasks declared with ``retries=0``. Fixes #65400 Signed-off-by: kimhaggie --- airflow-core/newsfragments/65400.bugfix.rst | 1 + .../src/airflow/jobs/scheduler_job_runner.py | 5 +++ .../tests/unit/jobs/test_scheduler_job.py | 40 +++++++++++++++++++ 3 files changed, 46 insertions(+) create mode 100644 airflow-core/newsfragments/65400.bugfix.rst diff --git a/airflow-core/newsfragments/65400.bugfix.rst b/airflow-core/newsfragments/65400.bugfix.rst new file mode 100644 index 0000000000000..2c14fd8a9c77b --- /dev/null +++ b/airflow-core/newsfragments/65400.bugfix.rst @@ -0,0 +1 @@ +Set ``task_callback_type`` when scheduler handles tasks that lose heartbeat. Previously ``_purge_task_instances_without_heartbeats`` created a ``TaskCallbackRequest`` without ``task_callback_type``, which defaults to ``None``; the DAG processor then treated the request as a failure and invoked ``on_failure_callback`` even when the task still had retries remaining. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index d949012d4f5a4..4906e7b135c84 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2926,6 +2926,11 @@ def _purge_task_instances_without_heartbeats( bundle_version=_hb_bundle_version, ti=ti, msg=str(task_instance_heartbeat_timeout_message_details), + task_callback_type=( + TaskInstanceState.UP_FOR_RETRY + if ti.max_tries > 0 and ti.try_number <= ti.max_tries + else TaskInstanceState.FAILED + ), context_from_server=TIRunContext( dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True), max_tries=ti.max_tries, diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index b0f72b91db3eb..9c721f1a61a90 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -7638,6 +7638,46 @@ def test_scheduler_passes_context_from_server_on_heartbeat_timeout(self, dag_mak assert callback_request.context_from_server.dag_run.logical_date == dag_run.logical_date assert callback_request.context_from_server.max_tries == ti.max_tries + @pytest.mark.parametrize( + ("retries", "expected_callback_type"), + [ + (1, TaskInstanceState.UP_FOR_RETRY), + (0, TaskInstanceState.FAILED), + ], + ) + def test_heartbeat_timeout_sets_callback_type_based_on_retry_eligibility( + self, dag_maker, session, retries, expected_callback_type + ): + """Heartbeat-timeout cleanup should set ``task_callback_type`` based on retry eligibility. + + Regression test for the case where a TI's worker stops heartbeating (e.g. OOMKilled): the + scheduler created a ``TaskCallbackRequest`` with ``task_callback_type`` unset (defaulting to + ``None``). The DAG processor then dispatched ``on_failure_callback`` even when the TI still + had retries remaining, producing spurious failure alerts. + """ + with dag_maker(dag_id=f"hb_timeout_{expected_callback_type}", session=session): + EmptyOperator(task_id="test_task", retries=retries) + + dag_run = dag_maker.create_dagrun(run_id="test_run", state=DagRunState.RUNNING) + + mock_executor = MagicMock() + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[mock_executor]) + + ti = dag_run.get_task_instance(task_id="test_task") + ti.state = TaskInstanceState.RUNNING + ti.queued_by_job_id = scheduler_job.id + ti.last_heartbeat_at = timezone.utcnow() - timedelta(seconds=600) + session.merge(ti) + session.commit() + + self.job_runner._find_and_purge_task_instances_without_heartbeats() + + mock_executor.send_callback.assert_called_once() + request = mock_executor.send_callback.call_args[0][0] + assert isinstance(request, TaskCallbackRequest) + assert request.task_callback_type == expected_callback_type + @pytest.mark.parametrize( ("retries", "callback_kind", "expected"), [