Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/66773.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Detect task instances stuck in ``RUNNING`` that never sent a first heartbeat. The scheduler's heartbeat-timeout query previously used ``last_heartbeat_at < limit_dttm``, which SQL evaluates to NULL for rows that have never heartbeated, so those task instances were silently skipped and stayed ``RUNNING`` forever. The query now falls back to ``start_date`` when ``last_heartbeat_at`` is NULL.
11 changes: 10 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2927,6 +2927,12 @@ def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[T
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._task_instance_heartbeat_timeout_secs)
asset_loader, alias_loader = _eager_load_dag_run_for_validation()
# A task instance can be in RUNNING/RESTARTING without ever having sent a heartbeat
# (worker crashed before the first heartbeat, or the supervisor was killed mid-startup).
# In that case ``last_heartbeat_at`` is NULL, and ``last_heartbeat_at < limit_dttm``
# evaluates to NULL in SQL — not TRUE — so the row would be silently skipped and the
# task would stay in RUNNING forever. Fall back to ``start_date`` for the staleness
# check when no heartbeat has been recorded yet.
task_instances_without_heartbeats = list(
session.scalars(
select(TI)
Expand All @@ -2938,7 +2944,10 @@ def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[T
.join(DM, TI.dag_id == DM.dag_id)
.where(
TI.state.in_((TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING)),
TI.last_heartbeat_at < limit_dttm,
or_(
TI.last_heartbeat_at < limit_dttm,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you walk me through how TI.last_heartbeat_at can be null for a running task as I thought it's set here:

Copy link
Copy Markdown
Contributor Author

@1fanwang 1fanwang May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update 2026-05-19: the timing assumption in this argument was wrong. Live repro on a freshly-migrated A3 deployment showed adopt_or_reset_orphaned_tasks fires within ~14 ms of scheduler startup, rotates the NULL-heartbeat TIs into task_instance_history, and _find_task_instances_without_heartbeats only ever runs against the fresh rows that already have last_heartbeat_at populated by /run. The cleanup query never observes the NULL state, so the predicate this PR adds is unreachable on a normal restart path. See the close-comment for the trace and the real-bug hypothesis on #58307.

Original argument, preserved for thread continuity

Fair point — let me walk through it. The /run endpoint you linked does set state=RUNNING and last_heartbeat_at=utcnow() atomically in the same UPDATE, so in steady-state Airflow 3 a freshly-running TI is not NULL on that path.

The case I'm targeting is the Airflow 2 → 3 upgrade legacy state. Migration 0045_3_0_0_add_last_heartbeat_at_directly_to_ti adds the column as nullable=True without a backfill. Any TI that was already RUNNING at upgrade time has last_heartbeat_at IS NULL until something writes to it. The codebase already acknowledges this exact state inside adopt_or_reset_orphaned_tasks:

# scheduler_job_runner.py:2854-2856
# If old ti from Airflow 2 and last_heartbeat_at is None, set last_heartbeat_at to now
if ti.last_heartbeat_at is None:
    ti.last_heartbeat_at = timezone.utcnow()

adopt_or_reset only runs at scheduler startup / on the scheduler-lock timer. _find_task_instances_without_heartbeats runs on a tighter loop and currently has no matching fallback — a TI in the migration state is invisible to the cleanup query and stays RUNNING forever. This PR is the heartbeat-cleanup-path counterpart to the existing adopt_or_reset fallback.

Captured the regression deterministically — reverting the new or_(...) predicate and rerunning the regression test on main:

FAILED ::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat
    AssertionError: assert ti.key not in MockExecutor.running
    (the cleanup query silently skipped the NULL row; the TI key is still in executor.running)

With the fix in place, the test passes — and the companion ..._null_last_heartbeat_fresh_start case pins that a newly-started TI inside its first timeout window is left alone. Updated the PR body with the full before/after snippet and the steady-state walkthrough.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this doesn't match your PR description and even at that, at scheduler startup after upgrade, the adoption happens and is scheduled to repeat at intervals. This scenario you painted here won't happen in real life.

Copy link
Copy Markdown
Contributor Author

@1fanwang 1fanwang May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circling back — you're right, @ephraimbuddy. Closing this; the live repro shows this isn't the right fix for #58307.

Ran the repro on a freshly-migrated A3 sqlite deployment — airflow db migrate, a real DAG, airflow standalone. Triggered, two TIs reached running through /run. Stopped the scheduler, set last_heartbeat_at = NULL on both rows to simulate the post-migration state, restarted with DEBUG logging.

19:38:30.413671 | info | Adopting or resetting orphaned tasks for active dag runs (scheduler_job_runner.py:2815)
19:38:30.445094 | info | Reset the following 2 orphaned TaskInstances (scheduler_job_runner.py:2894)

adopt_or_reset_orphaned_tasks rotates both into task_instance_history within 32 ms of startup; fresh rows come up with last_heartbeat_at populated by /run in 4 s. The cleanup query only ever sees the fresh rows, so the OR last_heartbeat_at IS NULL predicate never fires.

The repro did surface something different in #58307. vgl-grin is on heartbeat_sec=0, heartbeat_timeout=300, containers respawning between runs. Their last_heartbeat_at isn't NULL; /run sets it. But the adopt path on each respawn looks like it can land a fresh last_heartbeat_at, which would restart the timeout clock indefinitely. That fits the asymmetry — timeout=1 or 2 terminate, 300/4/5/6/10 don't. Different code path than this PR — leaving #58307 open.

Thanks for the pushback.

and_(TI.last_heartbeat_at.is_(None), TI.start_date < limit_dttm),
),
)
.where(TI.queued_by_job_id == self.job.id)
)
Expand Down
108 changes: 108 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7278,6 +7278,114 @@ def test_find_and_purge_task_instances_without_heartbeats(self, session, create_
assert callback_request.context_from_server.dag_run.logical_date == ti.dag_run.logical_date
assert callback_request.context_from_server.max_tries == ti.max_tries

@pytest.mark.usefixtures("testing_dag_bundle")
def test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat(
self, session, create_dagrun
):
"""
Regression test for a task instance that transitions to RUNNING but is
killed before it sends its first heartbeat: ``last_heartbeat_at`` stays
NULL, and the scheduler must still notice the stalled task once
``start_date`` is older than the heartbeat-timeout window.

Previously the cleanup query used ``last_heartbeat_at < limit_dttm``,
which SQL evaluates to NULL (not TRUE) for NULL rows, so these
instances were silently skipped and stayed in RUNNING forever.
"""
dagfile = EXAMPLE_STANDARD_DAGS_FOLDER / "example_branch_operator.py"
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
scheduler_dag = sync_dag_to_db(dag)

dag_v = DagVersion.get_latest_version(dag.dag_id)

data_interval = infer_automated_data_interval(scheduler_dag.timetable, DEFAULT_LOGICAL_DATE)
dag_run = create_dagrun(
scheduler_dag,
logical_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
data_interval=data_interval,
)

executor = MockExecutor()
scheduler_job = Job()
with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock:
loader_mock.return_value = executor
self.job_runner = SchedulerJobRunner(job=scheduler_job)

task = dag.get_task(task_id="run_this_first")
ti = create_task_instance(
task,
run_id=dag_run.run_id,
state=State.RUNNING,
dag_version_id=dag_v.id,
)

# The task transitioned to RUNNING but was killed before the worker
# had a chance to send a heartbeat — last_heartbeat_at stays NULL.
ti.last_heartbeat_at = None
ti.start_date = timezone.utcnow() - timedelta(minutes=10)
ti.queued_by_job_id = scheduler_job.id

session.add(ti)
session.flush()
executor.running.add(ti.key) # The executor normally does this during heartbeat.

self.job_runner._find_and_purge_task_instances_without_heartbeats()
assert ti.key not in executor.running

executor.callback_sink.send.assert_called_once()

@pytest.mark.usefixtures("testing_dag_bundle")
def test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat_fresh_start(
self, session, create_dagrun
):
"""
A task that just transitioned to RUNNING and has not yet sent its first
heartbeat must NOT be purged: ``last_heartbeat_at`` is NULL but
``start_date`` is within the heartbeat-timeout window.
"""
dagfile = EXAMPLE_STANDARD_DAGS_FOLDER / "example_branch_operator.py"
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
scheduler_dag = sync_dag_to_db(dag)

dag_v = DagVersion.get_latest_version(dag.dag_id)

data_interval = infer_automated_data_interval(scheduler_dag.timetable, DEFAULT_LOGICAL_DATE)
dag_run = create_dagrun(
scheduler_dag,
logical_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
data_interval=data_interval,
)

executor = MockExecutor()
scheduler_job = Job()
with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock:
loader_mock.return_value = executor
self.job_runner = SchedulerJobRunner(job=scheduler_job)

task = dag.get_task(task_id="run_this_first")
ti = create_task_instance(
task,
run_id=dag_run.run_id,
state=State.RUNNING,
dag_version_id=dag_v.id,
)
ti.last_heartbeat_at = None
ti.start_date = timezone.utcnow() # just started, no heartbeat yet
ti.queued_by_job_id = scheduler_job.id

session.add(ti)
session.flush()
executor.running.add(ti.key)

self.job_runner._find_and_purge_task_instances_without_heartbeats()
assert ti.key in executor.running

executor.callback_sink.send.assert_not_called()

@pytest.mark.usefixtures("testing_dag_bundle")
def test_task_instance_heartbeat_timeout_message(self, session, create_dagrun):
"""
Expand Down
Loading