Skip to content

Purge heartbeat-timed-out tasks whose last_heartbeat_at is still NULL#66773

Closed
1fanwang wants to merge 2 commits into
apache:mainfrom
1fanwang:fix/heartbeat-null-timeout
Closed

Purge heartbeat-timed-out tasks whose last_heartbeat_at is still NULL#66773
1fanwang wants to merge 2 commits into
apache:mainfrom
1fanwang:fix/heartbeat-null-timeout

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 12, 2026

Closed. Live repro on a freshly-migrated A3 deployment showed adopt_or_reset_orphaned_tasks rotates any NULL-heartbeat TIs at scheduler startup, before _find_task_instances_without_heartbeats runs. The OR last_heartbeat_at IS NULL predicate this PR adds is unreachable on a normal restart path. See the close-comment for the trace and the real-bug hypothesis on #58307 (which stays open).


Original PR body — the proposed-fix hypothesis the live repro disproved

When a worker pod dies early enough that last_heartbeat_at never got its first write, the row sits with NULL. The purge query today filters last_heartbeat_at < cutoff, which silently skips NULL rows — those tasks stay in running forever, with knock-on effects on pool slot occupancy and scheduler queue depth.

_find_task_instances_without_heartbeats filters with TI.last_heartbeat_at < limit_dttm. In SQL three-valued logic, that predicate evaluates to NULL (not TRUE) when the row's last_heartbeat_at IS NULL, so the row is never returned and the TI never gets purged.

When last_heartbeat_at IS NULL can hold for a RUNNING TI

In a steady-state Airflow 3 deployment, the /run execution-API endpoint sets state=RUNNING and last_heartbeat_at=utcnow() atomically in the same UPDATE (airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:236-237), so a freshly-running TI is not normally NULL. The state can still appear NULL on a real cluster:

  1. Airflow 2 → 3 upgrade. Migration 0045_3_0_0_add_last_heartbeat_at_directly_to_ti adds the column as nullable=True without a backfill. Any TI that was RUNNING at upgrade time has the column unset until something writes to it.

  2. Restored DB dumps / manual DB intervention that put a TI back into RUNNING without touching last_heartbeat_at.

  3. The window between scheduler startup and adopt_or_reset_orphaned_tasks clearing the migration state.

The codebase already acknowledges this state explicitly inside adopt_or_reset_orphaned_tasks:

# scheduler_job_runner.py:2854-2856
# If old ti from Airflow 2 and last_heartbeat_at is None, set last_heartbeat_at to now
if ti.last_heartbeat_at is None:
    ti.last_heartbeat_at = timezone.utcnow()

adopt_or_reset_orphaned_tasks only runs at scheduler startup and on the scheduler-lock timer. Between those events — or before the first invocation completes — _find_task_instances_without_heartbeats runs on a tighter loop and currently has no matching fallback. A TI that lands in this state is invisible to the cleanup query and stays in RUNNING forever.

Fix

Extend the predicate so a NULL last_heartbeat_at falls through to start_date for the staleness check:

or_(
    TI.last_heartbeat_at < limit_dttm,
    and_(TI.last_heartbeat_at.is_(None), TI.start_date < limit_dttm),
)

A TI that started long enough ago to be past the heartbeat-timeout and has still never reported a heartbeat is the exact stuck-forever case the cleanup is meant to handle. The fix is the heartbeat-cleanup-path counterpart to the existing adopt_or_reset_orphaned_tasks fallback — both arms cover the same edge case.

Reproducer and before/after evidence

Two new cases in tests/unit/jobs/test_scheduler_job.py:

  • test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeatRUNNING TI with last_heartbeat_at=NULL and start_date past the timeout window. Should be caught by the cleanup query.
  • test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat_fresh_start — same shape but with start_date=utcnow(). Must NOT be caught (guards against killing newly-started tasks).

Before fix (with the new or_(...) predicate removed), the first case fails — the cleanup query silently skips the NULL row, so the TI's key stays in executor.running:

FAILED ::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat
    AssertionError: assert TaskInstanceKey(
        dag_id='example_branch_operator',
        task_id='run_this_first',
        run_id='scheduled__2016-01-01T00:00:00+00:00',
        ...
    ) not in {TaskInstanceKey(... [same key] ...)}
    where the LHS is ti.key after _find_and_purge_task_instances_without_heartbeats ran
    and the RHS is MockExecutor.running (the cleanup did not remove it).

PASSED ::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat_fresh_start

The fresh-start case still passes on main because that TI's predicate is also NULL — it just happens to be the correct outcome there.

After fix, both cases pass:

PASSED ::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat
PASSED ::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat_fresh_start

A worker that crashed before sending its first heartbeat leaves the task
instance in RUNNING with last_heartbeat_at IS NULL. The scheduler's
cleanup query filtered on ``last_heartbeat_at < limit_dttm``, which SQL
evaluates to NULL — not TRUE — for those rows, so the stuck instances
were silently skipped and stayed RUNNING forever.

Fall back to ``start_date`` for the staleness check when no heartbeat
has been recorded yet.

closes: apache#58307
@1fanwang 1fanwang requested review from XD-DENG and ashb as code owners May 12, 2026 14:46
@boring-cyborg boring-cyborg Bot added the area:Scheduler including HA (high availability) scheduler label May 12, 2026
Signed-off-by: 1fanwang <1fannnw@gmail.com>
TI.state.in_((TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING)),
TI.last_heartbeat_at < limit_dttm,
or_(
TI.last_heartbeat_at < limit_dttm,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you walk me through how TI.last_heartbeat_at can be null for a running task as I thought it's set here:

Copy link
Copy Markdown
Contributor Author

@1fanwang 1fanwang May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update 2026-05-19: the timing assumption in this argument was wrong. Live repro on a freshly-migrated A3 deployment showed adopt_or_reset_orphaned_tasks fires within ~14 ms of scheduler startup, rotates the NULL-heartbeat TIs into task_instance_history, and _find_task_instances_without_heartbeats only ever runs against the fresh rows that already have last_heartbeat_at populated by /run. The cleanup query never observes the NULL state, so the predicate this PR adds is unreachable on a normal restart path. See the close-comment for the trace and the real-bug hypothesis on #58307.

Original argument, preserved for thread continuity

Fair point — let me walk through it. The /run endpoint you linked does set state=RUNNING and last_heartbeat_at=utcnow() atomically in the same UPDATE, so in steady-state Airflow 3 a freshly-running TI is not NULL on that path.

The case I'm targeting is the Airflow 2 → 3 upgrade legacy state. Migration 0045_3_0_0_add_last_heartbeat_at_directly_to_ti adds the column as nullable=True without a backfill. Any TI that was already RUNNING at upgrade time has last_heartbeat_at IS NULL until something writes to it. The codebase already acknowledges this exact state inside adopt_or_reset_orphaned_tasks:

# scheduler_job_runner.py:2854-2856
# If old ti from Airflow 2 and last_heartbeat_at is None, set last_heartbeat_at to now
if ti.last_heartbeat_at is None:
    ti.last_heartbeat_at = timezone.utcnow()

adopt_or_reset only runs at scheduler startup / on the scheduler-lock timer. _find_task_instances_without_heartbeats runs on a tighter loop and currently has no matching fallback — a TI in the migration state is invisible to the cleanup query and stays RUNNING forever. This PR is the heartbeat-cleanup-path counterpart to the existing adopt_or_reset fallback.

Captured the regression deterministically — reverting the new or_(...) predicate and rerunning the regression test on main:

FAILED ::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat
    AssertionError: assert ti.key not in MockExecutor.running
    (the cleanup query silently skipped the NULL row; the TI key is still in executor.running)

With the fix in place, the test passes — and the companion ..._null_last_heartbeat_fresh_start case pins that a newly-started TI inside its first timeout window is left alone. Updated the PR body with the full before/after snippet and the steady-state walkthrough.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this doesn't match your PR description and even at that, at scheduler startup after upgrade, the adoption happens and is scheduled to repeat at intervals. This scenario you painted here won't happen in real life.

Copy link
Copy Markdown
Contributor Author

@1fanwang 1fanwang May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circling back — you're right, @ephraimbuddy. Closing this; the live repro shows this isn't the right fix for #58307.

Ran the repro on a freshly-migrated A3 sqlite deployment — airflow db migrate, a real DAG, airflow standalone. Triggered, two TIs reached running through /run. Stopped the scheduler, set last_heartbeat_at = NULL on both rows to simulate the post-migration state, restarted with DEBUG logging.

19:38:30.413671 | info | Adopting or resetting orphaned tasks for active dag runs (scheduler_job_runner.py:2815)
19:38:30.445094 | info | Reset the following 2 orphaned TaskInstances (scheduler_job_runner.py:2894)

adopt_or_reset_orphaned_tasks rotates both into task_instance_history within 32 ms of startup; fresh rows come up with last_heartbeat_at populated by /run in 4 s. The cleanup query only ever sees the fresh rows, so the OR last_heartbeat_at IS NULL predicate never fires.

The repro did surface something different in #58307. vgl-grin is on heartbeat_sec=0, heartbeat_timeout=300, containers respawning between runs. Their last_heartbeat_at isn't NULL; /run sets it. But the adopt path on each respawn looks like it can land a fresh last_heartbeat_at, which would restart the timeout clock indefinitely. That fits the asymmetry — timeout=1 or 2 terminate, 300/4/5/6/10 don't. Different code path than this PR — leaving #58307 open.

Thanks for the pushback.

Copy link
Copy Markdown
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you produce a producer that doesn't manually set queued_by_job_id and last_heartbeat_at=None together i.e., one where the state arises through the actual scheduler /run / adopt_or_reset paths on a freshly-migrated A3 deployment.

TI.state.in_((TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING)),
TI.last_heartbeat_at < limit_dttm,
or_(
TI.last_heartbeat_at < limit_dttm,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this doesn't match your PR description and even at that, at scheduler startup after upgrade, the adoption happens and is scheduled to repeat at intervals. This scenario you painted here won't happen in real life.

@1fanwang
Copy link
Copy Markdown
Contributor Author

#66773 (comment)

@1fanwang 1fanwang closed this May 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants