diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 5dcbdc21521d9..3ea24d5c24b08 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1806,7 +1806,8 @@ def _do_scheduling(self, session: Session) -> int: # Bulk fetch the currently active dag runs for the dags we are # examining, rather than making one query per DagRun - dag_runs = DagRun.get_running_dag_runs_to_examine(session=session) + dag_runs = DagRun.get_running_dag_runs_to_examine(session=session).all() + dag_runs_examined = len(dag_runs) callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) @@ -1857,6 +1858,14 @@ def _do_scheduling(self, session: Session) -> int: guard.commit() + stats.gauge("scheduler.dag_runs.examined", dag_runs_examined) + stats.gauge("scheduler.executor.open_slots", total_free_executor_slots) + self.log.info( + "Scheduling loop summary | dag_runs_examined=%d queued_tis=%d open_executor_slots=%d", + dag_runs_examined, + num_queued_tis, + total_free_executor_slots, + ) return num_queued_tis def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[str]: @@ -2988,6 +2997,7 @@ def _purge_task_instances_without_heartbeats( else: dag_id_to_team_name = {} + stats.gauge("scheduler.tasks.heartbeat_timeout", len(task_instances_without_heartbeats)) for ti in task_instances_without_heartbeats: task_instance_heartbeat_timeout_message_details = ( self._generate_task_instance_heartbeat_timeout_message_details(ti) @@ -3031,11 +3041,22 @@ def _purge_task_instances_without_heartbeats( ), ) ) + _heartbeat_age_secs = ( + (timezone.utcnow() - ti.last_heartbeat_at).total_seconds() if ti.last_heartbeat_at else None + ) + _task_running_secs = ( + (timezone.utcnow() - ti.start_date).total_seconds() if ti.start_date else None + ) self.log.error( "Detected a task instance without a heartbeat: %s " + "heartbeat_age_seconds=%s hostname=%s pid=%s task_running_seconds=%s " "(See https://airflow.apache.org/docs/apache-airflow/" "stable/core-concepts/tasks.html#task-instance-heartbeat-timeout)", request, + f"{_heartbeat_age_secs:.1f}" if _heartbeat_age_secs is not None else "unknown", + ti.hostname or "unknown", + ti.pid or "unknown", + f"{_task_running_secs:.1f}" if _task_running_secs is not None else "unknown", ) self.executor.send_callback(request) executor = self._try_to_load_executor( diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 876cc3cc8953b..d49f021fa80dc 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -633,6 +633,26 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.dag_runs.examined" + description: "Number of active DAG runs examined in a scheduler loop iteration." + type: "gauge" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.executor.open_slots" + description: "Number of open executor slots available at the end of a scheduler loop iteration." + type: "gauge" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.tasks.heartbeat_timeout" + description: >- + Number of task instances whose heartbeat timed out and were + terminated in a scheduler loop iteration. + type: "gauge" + legacy_name: "-" + name_variables: [] + - name: "dagrun.first_task_scheduling_delay" description: "Milliseconds elapsed between first task start_date and dagrun expected start" type: "timer"