From 03003490b8166a502df390bb73f69563695f477f Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Mon, 18 May 2026 04:31:26 +0530 Subject: [PATCH 1/4] observability: add structured diagnostics to scheduler loop and heartbeat-timeout detection - Capture dag_runs_examined count in _do_scheduling() - Emit scheduler.dag_runs.examined and scheduler.executor.open_slots gauges - Add scheduling loop summary log with dag_runs, queued_tis, open_slots - Emit scheduler.tasks.heartbeat_timeout gauge in _purge_task_instances_without_heartbeats() - Enrich heartbeat-timeout error log with heartbeat_age_seconds, hostname, pid, task_running_seconds Signed-off-by: Prince Kumar --- .../src/airflow/jobs/scheduler_job_runner.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 5dcbdc21521d9..51ba903db469c 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1807,6 +1807,7 @@ def _do_scheduling(self, session: Session) -> int: # Bulk fetch the currently active dag runs for the dags we are # examining, rather than making one query per DagRun dag_runs = DagRun.get_running_dag_runs_to_examine(session=session) + dag_runs_examined = len(dag_runs) callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) @@ -1857,6 +1858,14 @@ def _do_scheduling(self, session: Session) -> int: guard.commit() + stats.gauge("scheduler.dag_runs.examined", dag_runs_examined) + stats.gauge("scheduler.executor.open_slots", total_free_executor_slots) + self.log.info( + "Scheduling loop summary | dag_runs_examined=%d queued_tis=%d open_executor_slots=%d", + dag_runs_examined, + num_queued_tis, + total_free_executor_slots, + ) return num_queued_tis def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[str]: @@ -2988,6 +2997,7 @@ def _purge_task_instances_without_heartbeats( else: dag_id_to_team_name = {} + stats.gauge("scheduler.tasks.heartbeat_timeout", len(task_instances_without_heartbeats)) for ti in task_instances_without_heartbeats: task_instance_heartbeat_timeout_message_details = ( self._generate_task_instance_heartbeat_timeout_message_details(ti) @@ -3031,11 +3041,22 @@ def _purge_task_instances_without_heartbeats( ), ) ) + _heartbeat_age_secs = ( + (timezone.utcnow() - ti.last_heartbeat_at).total_seconds() if ti.last_heartbeat_at else None + ) + _task_running_secs = ( + (timezone.utcnow() - ti.start_date).total_seconds() if ti.start_date else None + ) self.log.error( "Detected a task instance without a heartbeat: %s " + "heartbeat_age_seconds=%s hostname=%s pid=%s task_running_seconds=%s " "(See https://airflow.apache.org/docs/apache-airflow/" "stable/core-concepts/tasks.html#task-instance-heartbeat-timeout)", request, + f"{_heartbeat_age_secs:.1f}" if _heartbeat_age_secs is not None else "unknown", + ti.hostname or "unknown", + ti.pid or "unknown", + f"{_task_running_secs:.1f}" if _task_running_secs is not None else "unknown", ) self.executor.send_callback(request) executor = self._try_to_load_executor( From 1ee5a6af5adf3ee8a5df2e5070e43a9435b9427c Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Mon, 18 May 2026 18:18:08 +0530 Subject: [PATCH 2/4] Register missing scheduler loop metrics in Stats registry - scheduler.dag_runs.examined - scheduler.executor.open_slots - scheduler.tasks.heartbeat_timeout Signed-off-by: Prince Kumar --- .../metrics/metrics_template.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 876cc3cc8953b..a26d6e51ea669 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -633,6 +633,24 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.dag_runs.examined" + description: "Number of active DAG runs examined in a scheduler loop iteration." + type: "gauge" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.executor.open_slots" + description: "Number of open executor slots available at the end of a scheduler loop iteration." + type: "gauge" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.tasks.heartbeat_timeout" + description: "Number of task instances whose heartbeat timed out and were terminated in a scheduler loop iteration." + type: "gauge" + legacy_name: "-" + name_variables: [] + - name: "dagrun.first_task_scheduling_delay" description: "Milliseconds elapsed between first task start_date and dagrun expected start" type: "timer" From ccf495a4c9f28910c975fd3b8cbedac6ddec495b Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Tue, 19 May 2026 06:59:39 +0530 Subject: [PATCH 3/4] fix: call .all() on ScalarResult before len() in _do_scheduling dag_runs is a ScalarResult which has no len(). Calling .all() materializes it into a list, matching the existing test mock contract at test_scheduler_job.py:9215. Signed-off-by: Prince Kumar --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 51ba903db469c..3ea24d5c24b08 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1806,7 +1806,7 @@ def _do_scheduling(self, session: Session) -> int: # Bulk fetch the currently active dag runs for the dags we are # examining, rather than making one query per DagRun - dag_runs = DagRun.get_running_dag_runs_to_examine(session=session) + dag_runs = DagRun.get_running_dag_runs_to_examine(session=session).all() dag_runs_examined = len(dag_runs) callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) From 5d18bf6d70b60171f24c80934e505f2603695be6 Mon Sep 17 00:00:00 2001 From: prince8273 Date: Tue, 19 May 2026 21:44:04 +0530 Subject: [PATCH 4/4] fix: wrap long description line in metrics_template.yaml --- .../observability/metrics/metrics_template.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index a26d6e51ea669..d49f021fa80dc 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -646,7 +646,9 @@ metrics: name_variables: [] - name: "scheduler.tasks.heartbeat_timeout" - description: "Number of task instances whose heartbeat timed out and were terminated in a scheduler loop iteration." + description: >- + Number of task instances whose heartbeat timed out and were + terminated in a scheduler loop iteration. type: "gauge" legacy_name: "-" name_variables: []