diff --git a/airflow-core/newsfragments/66773.bugfix.rst b/airflow-core/newsfragments/66773.bugfix.rst new file mode 100644 index 0000000000000..16ebc1e80b8d4 --- /dev/null +++ b/airflow-core/newsfragments/66773.bugfix.rst @@ -0,0 +1 @@ +Detect task instances stuck in ``RUNNING`` that never sent a first heartbeat. The scheduler's heartbeat-timeout query previously used ``last_heartbeat_at < limit_dttm``, which SQL evaluates to NULL for rows that have never heartbeated, so those task instances were silently skipped and stayed ``RUNNING`` forever. The query now falls back to ``start_date`` when ``last_heartbeat_at`` is NULL. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 3eed95a8bb030..bcffd0790fdaf 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2927,6 +2927,12 @@ def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[T self.log.debug("Finding 'running' jobs without a recent heartbeat") limit_dttm = timezone.utcnow() - timedelta(seconds=self._task_instance_heartbeat_timeout_secs) asset_loader, alias_loader = _eager_load_dag_run_for_validation() + # A task instance can be in RUNNING/RESTARTING without ever having sent a heartbeat + # (worker crashed before the first heartbeat, or the supervisor was killed mid-startup). + # In that case ``last_heartbeat_at`` is NULL, and ``last_heartbeat_at < limit_dttm`` + # evaluates to NULL in SQL — not TRUE — so the row would be silently skipped and the + # task would stay in RUNNING forever. Fall back to ``start_date`` for the staleness + # check when no heartbeat has been recorded yet. task_instances_without_heartbeats = list( session.scalars( select(TI) @@ -2938,7 +2944,10 @@ def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[T .join(DM, TI.dag_id == DM.dag_id) .where( TI.state.in_((TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING)), - TI.last_heartbeat_at < limit_dttm, + or_( + TI.last_heartbeat_at < limit_dttm, + and_(TI.last_heartbeat_at.is_(None), TI.start_date < limit_dttm), + ), ) .where(TI.queued_by_job_id == self.job.id) ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 56e69459104ea..7997c25956c66 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -7278,6 +7278,114 @@ def test_find_and_purge_task_instances_without_heartbeats(self, session, create_ assert callback_request.context_from_server.dag_run.logical_date == ti.dag_run.logical_date assert callback_request.context_from_server.max_tries == ti.max_tries + @pytest.mark.usefixtures("testing_dag_bundle") + def test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat( + self, session, create_dagrun + ): + """ + Regression test for a task instance that transitions to RUNNING but is + killed before it sends its first heartbeat: ``last_heartbeat_at`` stays + NULL, and the scheduler must still notice the stalled task once + ``start_date`` is older than the heartbeat-timeout window. + + Previously the cleanup query used ``last_heartbeat_at < limit_dttm``, + which SQL evaluates to NULL (not TRUE) for NULL rows, so these + instances were silently skipped and stayed in RUNNING forever. + """ + dagfile = EXAMPLE_STANDARD_DAGS_FOLDER / "example_branch_operator.py" + dagbag = DagBag(dagfile) + dag = dagbag.get_dag("example_branch_operator") + scheduler_dag = sync_dag_to_db(dag) + + dag_v = DagVersion.get_latest_version(dag.dag_id) + + data_interval = infer_automated_data_interval(scheduler_dag.timetable, DEFAULT_LOGICAL_DATE) + dag_run = create_dagrun( + scheduler_dag, + logical_date=DEFAULT_DATE, + run_type=DagRunType.SCHEDULED, + data_interval=data_interval, + ) + + executor = MockExecutor() + scheduler_job = Job() + with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock: + loader_mock.return_value = executor + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + task = dag.get_task(task_id="run_this_first") + ti = create_task_instance( + task, + run_id=dag_run.run_id, + state=State.RUNNING, + dag_version_id=dag_v.id, + ) + + # The task transitioned to RUNNING but was killed before the worker + # had a chance to send a heartbeat — last_heartbeat_at stays NULL. + ti.last_heartbeat_at = None + ti.start_date = timezone.utcnow() - timedelta(minutes=10) + ti.queued_by_job_id = scheduler_job.id + + session.add(ti) + session.flush() + executor.running.add(ti.key) # The executor normally does this during heartbeat. + + self.job_runner._find_and_purge_task_instances_without_heartbeats() + assert ti.key not in executor.running + + executor.callback_sink.send.assert_called_once() + + @pytest.mark.usefixtures("testing_dag_bundle") + def test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat_fresh_start( + self, session, create_dagrun + ): + """ + A task that just transitioned to RUNNING and has not yet sent its first + heartbeat must NOT be purged: ``last_heartbeat_at`` is NULL but + ``start_date`` is within the heartbeat-timeout window. + """ + dagfile = EXAMPLE_STANDARD_DAGS_FOLDER / "example_branch_operator.py" + dagbag = DagBag(dagfile) + dag = dagbag.get_dag("example_branch_operator") + scheduler_dag = sync_dag_to_db(dag) + + dag_v = DagVersion.get_latest_version(dag.dag_id) + + data_interval = infer_automated_data_interval(scheduler_dag.timetable, DEFAULT_LOGICAL_DATE) + dag_run = create_dagrun( + scheduler_dag, + logical_date=DEFAULT_DATE, + run_type=DagRunType.SCHEDULED, + data_interval=data_interval, + ) + + executor = MockExecutor() + scheduler_job = Job() + with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock: + loader_mock.return_value = executor + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + task = dag.get_task(task_id="run_this_first") + ti = create_task_instance( + task, + run_id=dag_run.run_id, + state=State.RUNNING, + dag_version_id=dag_v.id, + ) + ti.last_heartbeat_at = None + ti.start_date = timezone.utcnow() # just started, no heartbeat yet + ti.queued_by_job_id = scheduler_job.id + + session.add(ti) + session.flush() + executor.running.add(ti.key) + + self.job_runner._find_and_purge_task_instances_without_heartbeats() + assert ti.key in executor.running + + executor.callback_sink.send.assert_not_called() + @pytest.mark.usefixtures("testing_dag_bundle") def test_task_instance_heartbeat_timeout_message(self, session, create_dagrun): """