Skip to content

Set task_callback_type when scheduler purges tasks without heartbeats#65404

Open
kimhaggie wants to merge 1 commit intoapache:mainfrom
kimhaggie:fix/zombie-callback-type
Open

Set task_callback_type when scheduler purges tasks without heartbeats#65404
kimhaggie wants to merge 1 commit intoapache:mainfrom
kimhaggie:fix/zombie-callback-type

Conversation

@kimhaggie
Copy link
Copy Markdown

Summary

Fixes #65400.

_purge_task_instances_without_heartbeats was building a TaskCallbackRequest without task_callback_type. The field defaults to None, and is_failure_callback treats None as a failure, so the DAG processor ran on_failure_callback even when the task still had retries left. This caused spurious failure alerts for tasks that ultimately succeeded on retry.

Changes

  • airflow-core/src/airflow/jobs/scheduler_job_runner.py: set task_callback_type when constructing the callback request in _purge_task_instances_without_heartbeats, based on ti.max_tries > 0 and ti.try_number <= ti.max_tries.
  • airflow-core/tests/unit/jobs/test_scheduler_job.py: add parametrized regression test (retries=1UP_FOR_RETRY, retries=0FAILED).
  • airflow-core/newsfragments/65400.bugfix.rst: changelog entry.

Why a direct check instead of ti.is_eligible_to_retry()?

Adopting the existing pattern elsewhere in this file (task_callback_type=(UP_FOR_RETRY if ti.is_eligible_to_retry() else FAILED) near L1372 on main) was the obvious first instinct, but the unit test uncovered that is_eligible_to_retry() has a fallback branch:

if not getattr(self, "task", None):
    return self.try_number <= self.max_tries

_purge_task_instances_without_heartbeats() doesn't eagerly load ti.task, so it takes that branch — which returns True for retries=0 tasks (try_number=0 <= max_tries=0). A direct ti.max_tries > 0 and ti.try_number <= ti.max_tries check matches the "task loaded" branch of is_eligible_to_retry() (bool(self.task.retries and self.try_number <= self.max_tries)) and avoids the false positive.

Happy to discuss whether the preferred fix is this direct check, or to eagerly load ti.task first and keep is_eligible_to_retry() — whichever is more in line with project style.

Test plan

  • Added unit test covering both retry-remaining and retry-exhausted cases.
  • Ran breeze testing core-tests locally — the new test passes; no regressions in neighboring tests (test_scheduler_passes_context_from_server_on_heartbeat_timeout, test_external_kill_sets_callback_type_param).
  • CI on the PR.

Out of scope

  • Backport to 3.1.x / earlier 3.x: can be handled by maintainers per usual policy.
  • The existing usage at scheduler_job_runner.py L1372 may have the same is_eligible_to_retry() fallback issue depending on whether its caller loads ti.task. Left as a follow-up since it's a separate code path.

When a task instance stops heartbeating (e.g. worker OOMKilled, node
eviction), ``_purge_task_instances_without_heartbeats`` sends a
``TaskCallbackRequest`` to the DAG processor. The request was created
without a ``task_callback_type``, so it defaulted to ``None``; the DAG
processor's ``_execute_task_callbacks`` then dispatched the request to
``on_failure_callback`` even when the task still had retries remaining,
producing spurious failure alerts.

Use a direct ``ti.max_tries > 0 and ti.try_number <= ti.max_tries``
check rather than ``ti.is_eligible_to_retry()``: the scheduler does
not eagerly load ``ti.task`` here, and the latter's "task not loaded"
fallback ignores ``task.retries`` and can incorrectly report
retry-eligibility for tasks declared with ``retries=0``.

Fixes apache#65400

Signed-off-by: kimhaggie <kimhaggie@gmail.com>
@kimhaggie kimhaggie requested review from XD-DENG and ashb as code owners April 17, 2026 07:20
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Apr 17, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Apr 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Scheduler heartbeat-timeout cleanup creates TaskCallbackRequest without task_callback_type, causing on_failure_callback to fire while retries remain

1 participant