Skip to content

Scheduler heartbeat-timeout cleanup creates TaskCallbackRequest without task_callback_type, causing on_failure_callback to fire while retries remain #65400

@kimhaggie

Description

@kimhaggie

Apache Airflow version

3.1.8

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When a task instance stops heartbeating (e.g. worker OOMKilled, node eviction), the scheduler's _purge_task_instances_without_heartbeats() sends a TaskCallbackRequest to the DAG processor without setting task_callback_type. The DAG processor then treats the request as a failure callback and invokes on_failure_callback, even though the task still has retries remaining and will subsequently be rescheduled.

Concretely, at airflow-core/src/airflow/jobs/scheduler_job_runner.py (v3.1.8, line 2552):

request = TaskCallbackRequest(
    filepath=ti.dag_model.relative_fileloc,
    bundle_name=ti.dag_version.bundle_name,
    bundle_version=ti.dag_run.bundle_version,
    ti=ti,
    msg=str(task_instance_heartbeat_timeout_message_details),
    context_from_server=TIRunContext(
        dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True),
        max_tries=ti.max_tries,
        variables=[],
        connections=[],
        xcom_keys_to_clear=[],
    ),
)

task_callback_type is omitted, so it defaults to None per the dataclass definition in airflow-core/src/airflow/callbacks/callback_requests.py (v3.1.8, line 62):

class TaskCallbackRequest(BaseCallbackRequest):
    ...
    task_callback_type: TaskInstanceState | None = None

Notably, the same class's is_failure_callback property treats None as a failure (v3.1.8, lines 68-76):

@property
def is_failure_callback(self) -> bool:
    """Returns True if the callback is a failure callback."""
    if self.task_callback_type is None:
        return True
    return self.task_callback_type in {
        TaskInstanceState.FAILED,
        TaskInstanceState.UP_FOR_RETRY,
        TaskInstanceState.UPSTREAM_FAILED,
    }

So the heartbeat-timeout request passes the is_failure_callback gate at the start of _execute_task_callbacks() (processor.py line 341), then hits this dispatch (v3.1.8, lines 355-358):

if request.task_callback_type is TaskInstanceState.UP_FOR_RETRY:
    callbacks = task.on_retry_callback
else:
    callbacks = task.on_failure_callback

Because task_callback_type is None (not UP_FOR_RETRY), on_failure_callback is executed regardless of whether the task has retries remaining. The task is then transitioned back to UP_FOR_RETRY and eventually succeeds, but the failure callback has already fired — alerting stakeholders about a "failure" that never ultimately happened.

Notably, the same scheduler file already contains a correct pattern elsewhere when constructing TaskCallbackRequest (line ~1004):

task_callback_type=(
    TaskInstanceState.UP_FOR_RETRY
    if ti.is_eligible_to_retry()
    else TaskInstanceState.FAILED
),

_purge_task_instances_without_heartbeats() appears to be missing that same decision. The bug is present on current main as well (verified at 3361ec5 on 2026-04-17).

Note on the fix direction: while adopting the above pattern is the obvious first instinct, we found that ti.is_eligible_to_retry() has a "task not loaded" fallback (return self.try_number <= self.max_tries) which ignores task.retries and incorrectly reports retry-eligibility for tasks declared with retries=0 when ti.task isn't eagerly loaded — which is the case in _purge_task_instances_without_heartbeats(). A direct check (ti.max_tries > 0 and ti.try_number <= ti.max_tries) avoids this edge case. Happy to discuss the preferred approach in the PR.

What you think should happen instead?

When the scheduler creates a TaskCallbackRequest for a task that lost its heartbeat, it should inspect whether the task still has retries available, and set task_callback_type accordingly:

  • TaskInstanceState.UP_FOR_RETRY if retries remain → DAG processor runs on_retry_callback
  • TaskInstanceState.FAILED if retries are exhausted → DAG processor runs on_failure_callback

This matches the contract of _execute_task_callbacks().

How to reproduce

We initially observed this in production when Celery workers were OOMKilled; the task eventually succeeded on retry, but on_failure_callback had already fired, sending spurious failure alerts to our oncall channel.

Rather than relying on a timing-dependent integration reproduction (which requires killing a worker at the right moment and waiting for scheduler_zombie_task_threshold), the bug can be demonstrated by composing four facts from the v3.1.8 source tree:

1. TaskCallbackRequest.task_callback_type defaults to Noneairflow-core/src/airflow/callbacks/callback_requests.py#L60-L63.

2. is_failure_callback treats None as a failure — callback_requests.py#L68-L76.

3. _execute_task_callbacks() dispatches to on_failure_callback when task_callback_type is not UP_FOR_RETRYairflow-core/src/airflow/dag_processing/processor.py#L340-L358.

4. _purge_task_instances_without_heartbeats() constructs TaskCallbackRequest without task_callback_typeairflow-core/src/airflow/jobs/scheduler_job_runner.py#L2552-L2565.

Composing (1)+(4): zombie requests carry task_callback_type=None. Composing (2)+(3): a None request passes the is_failure_callback gate and is then dispatched to on_failure_callback. The retry-eligibility of the TI is never consulted along this path.

Optional integration reproduction (timing-dependent)
  1. Deploy Airflow 3.1.8 with CeleryExecutor (or KubernetesExecutor) and KEDA/HPA-scaled workers.

  2. Define a DAG containing a task with:

    PythonOperator(
        task_id="t",
        python_callable=lambda: ...,
        retries=3,
        retry_delay=timedelta(seconds=10),
        on_failure_callback=notify_failure,
        on_retry_callback=notify_retry,
    )
  3. Trigger the DAG, then while the task is running, kill the worker pod abruptly (e.g. kubectl delete pod --grace-period=0, simulate OOMKilled, or kill -9 the celery worker process).

  4. Wait for [scheduler] scheduler_zombie_task_threshold to elapse.

Observed: notify_failure is invoked in the DAG processor log even though ti.try_number <= ti.max_tries. The task subsequently transitions to up_for_retry and, on retry, succeeds.

Expected: notify_retry is invoked (or no callback at all, deferring to the actual retry/final-failure lifecycle), and notify_failure is invoked only if retries are exhausted.

Operating System

Debian (official apache/airflow:3.1.8-python3.10 image)

Versions of Apache Airflow Providers

apache-airflow-providers-celery, apache-airflow-providers-cncf-kubernetes, apache-airflow-providers-git, apache-airflow-providers-fab (versions from constraints-3.1.8)

Deployment

Official Apache Airflow Helm Chart

Deployment details

Kubernetes, CeleryExecutor, external PostgreSQL.

Anything else?

Possibly related / same code path:

As a workaround we filter these spurious failure alerts at our notifier layer: the on_failure_callback function inspects ti.try_number vs ti.max_tries and returns early (without sending the alert) when retries remain. The unwanted on_failure_callback invocation still happens inside Airflow, but the user-visible symptom (false alert) is suppressed. We have a PR ready that addresses the root cause in _purge_task_instances_without_heartbeats().

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:Schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions