Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,8 @@ def _do_scheduling(self, session: Session) -> int:

# Bulk fetch the currently active dag runs for the dags we are
# examining, rather than making one query per DagRun
dag_runs = DagRun.get_running_dag_runs_to_examine(session=session)
dag_runs = DagRun.get_running_dag_runs_to_examine(session=session).all()
dag_runs_examined = len(dag_runs)

callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

Expand Down Expand Up @@ -1857,6 +1858,14 @@ def _do_scheduling(self, session: Session) -> int:

guard.commit()

stats.gauge("scheduler.dag_runs.examined", dag_runs_examined)
stats.gauge("scheduler.executor.open_slots", total_free_executor_slots)
self.log.info(
"Scheduling loop summary | dag_runs_examined=%d queued_tis=%d open_executor_slots=%d",
dag_runs_examined,
num_queued_tis,
total_free_executor_slots,
)
return num_queued_tis

def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[str]:
Expand Down Expand Up @@ -2988,6 +2997,7 @@ def _purge_task_instances_without_heartbeats(
else:
dag_id_to_team_name = {}

stats.gauge("scheduler.tasks.heartbeat_timeout", len(task_instances_without_heartbeats))
for ti in task_instances_without_heartbeats:
task_instance_heartbeat_timeout_message_details = (
self._generate_task_instance_heartbeat_timeout_message_details(ti)
Expand Down Expand Up @@ -3031,11 +3041,22 @@ def _purge_task_instances_without_heartbeats(
),
)
)
_heartbeat_age_secs = (
(timezone.utcnow() - ti.last_heartbeat_at).total_seconds() if ti.last_heartbeat_at else None
)
_task_running_secs = (
(timezone.utcnow() - ti.start_date).total_seconds() if ti.start_date else None
)
self.log.error(
"Detected a task instance without a heartbeat: %s "
"heartbeat_age_seconds=%s hostname=%s pid=%s task_running_seconds=%s "
"(See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#task-instance-heartbeat-timeout)",
request,
f"{_heartbeat_age_secs:.1f}" if _heartbeat_age_secs is not None else "unknown",
ti.hostname or "unknown",
ti.pid or "unknown",
f"{_task_running_secs:.1f}" if _task_running_secs is not None else "unknown",
)
self.executor.send_callback(request)
executor = self._try_to_load_executor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,26 @@ metrics:
legacy_name: "-"
name_variables: []

- name: "scheduler.dag_runs.examined"
description: "Number of active DAG runs examined in a scheduler loop iteration."
type: "gauge"
legacy_name: "-"
name_variables: []

- name: "scheduler.executor.open_slots"
description: "Number of open executor slots available at the end of a scheduler loop iteration."
type: "gauge"
legacy_name: "-"
name_variables: []

- name: "scheduler.tasks.heartbeat_timeout"
description: >-
Number of task instances whose heartbeat timed out and were
terminated in a scheduler loop iteration.
type: "gauge"
legacy_name: "-"
name_variables: []

- name: "dagrun.first_task_scheduling_delay"
description: "Milliseconds elapsed between first task start_date and dagrun expected start"
type: "timer"
Expand Down