Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/65400.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Set ``task_callback_type`` when scheduler handles tasks that lose heartbeat. Previously ``_purge_task_instances_without_heartbeats`` created a ``TaskCallbackRequest`` without ``task_callback_type``, which defaults to ``None``; the DAG processor then treated the request as a failure and invoked ``on_failure_callback`` even when the task still had retries remaining.
5 changes: 5 additions & 0 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2926,6 +2926,11 @@ def _purge_task_instances_without_heartbeats(
bundle_version=_hb_bundle_version,
ti=ti,
msg=str(task_instance_heartbeat_timeout_message_details),
task_callback_type=(
TaskInstanceState.UP_FOR_RETRY
if ti.max_tries > 0 and ti.try_number <= ti.max_tries
else TaskInstanceState.FAILED
),
context_from_server=TIRunContext(
dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True),
max_tries=ti.max_tries,
Expand Down
40 changes: 40 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7638,6 +7638,46 @@ def test_scheduler_passes_context_from_server_on_heartbeat_timeout(self, dag_mak
assert callback_request.context_from_server.dag_run.logical_date == dag_run.logical_date
assert callback_request.context_from_server.max_tries == ti.max_tries

@pytest.mark.parametrize(
("retries", "expected_callback_type"),
[
(1, TaskInstanceState.UP_FOR_RETRY),
(0, TaskInstanceState.FAILED),
],
)
def test_heartbeat_timeout_sets_callback_type_based_on_retry_eligibility(
self, dag_maker, session, retries, expected_callback_type
):
"""Heartbeat-timeout cleanup should set ``task_callback_type`` based on retry eligibility.

Regression test for the case where a TI's worker stops heartbeating (e.g. OOMKilled): the
scheduler created a ``TaskCallbackRequest`` with ``task_callback_type`` unset (defaulting to
``None``). The DAG processor then dispatched ``on_failure_callback`` even when the TI still
had retries remaining, producing spurious failure alerts.
"""
with dag_maker(dag_id=f"hb_timeout_{expected_callback_type}", session=session):
EmptyOperator(task_id="test_task", retries=retries)

dag_run = dag_maker.create_dagrun(run_id="test_run", state=DagRunState.RUNNING)

mock_executor = MagicMock()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(scheduler_job, executors=[mock_executor])

ti = dag_run.get_task_instance(task_id="test_task")
ti.state = TaskInstanceState.RUNNING
ti.queued_by_job_id = scheduler_job.id
ti.last_heartbeat_at = timezone.utcnow() - timedelta(seconds=600)
session.merge(ti)
session.commit()

self.job_runner._find_and_purge_task_instances_without_heartbeats()

mock_executor.send_callback.assert_called_once()
request = mock_executor.send_callback.call_args[0][0]
assert isinstance(request, TaskCallbackRequest)
assert request.task_callback_type == expected_callback_type

@pytest.mark.parametrize(
("retries", "callback_kind", "expected"),
[
Expand Down