Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2358,7 +2358,7 @@ def _schedule_dag_run(
DualStatsManager.timing(
"dagrun.duration.failed",
duration,
tags={},
tags=dag_run.stats_tags,
extra_tags={"dag_id": dag_run.dag_id},
)
return callback_to_execute
Expand Down
33 changes: 33 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3386,6 +3386,39 @@ def test_dagrun_timeout_fails_run(self, dag_maker):
session.rollback()
session.close()

def test_dagrun_timeout_emits_run_type_in_stats_tags(self, dag_maker):
"""
Test that dagrun.duration.failed metric includes run_type tag when failure
is caused by dagrun_timeout (regression test for missing run_type tag).
"""
session = settings.Session()
with dag_maker(
dag_id="test_scheduler_fail_dagrun_timeout_stats",
dagrun_timeout=datetime.timedelta(seconds=60),
session=session,
):
EmptyOperator(task_id="dummy")

dr = dag_maker.create_dagrun(
start_date=timezone.utcnow() - datetime.timedelta(days=1),
run_type=DagRunType.SCHEDULED,
)

Comment on lines +3395 to +3406
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dag_maker.create_dagrun(...) defaults run_type to DagRunType.MANUAL unless explicitly provided (see devel-common/src/tests_common/pytest_plugin.py:1118-1120), so schedule="@daily" here doesn’t make this DagRun “scheduled”. To avoid the test being misleading (and to better match the reported impact), consider either passing run_type=DagRunType.SCHEDULED and asserting the expected value, or dropping the schedule argument.

Copilot uses AI. Check for mistakes.
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

with mock.patch("airflow.jobs.scheduler_job_runner.DualStatsManager.timing") as mock_timing:
self.job_runner._schedule_dag_run(dr, session)
session.flush()

timing_calls = {call.args[0]: call.kwargs for call in mock_timing.call_args_list}
assert "dagrun.duration.failed" in timing_calls
tags = timing_calls["dagrun.duration.failed"].get("tags", {})
assert tags.get("run_type") == "scheduled", f"Expected run_type=scheduled in dagrun.duration.failed tags: {tags}"

session.rollback()
session.close()

def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker):
"""
Test that dagrun timeout fails run and update the next dagrun
Expand Down