Skip to content

[v3-2-test] Recover stuck TIs when direct terminal-state API call fails (#66574)#67204

Merged
vatsrahul1001 merged 2 commits into
v3-2-testfrom
backport-173c2a1-v3-2-test
May 20, 2026
Merged

[v3-2-test] Recover stuck TIs when direct terminal-state API call fails (#66574)#67204
vatsrahul1001 merged 2 commits into
v3-2-testfrom
backport-173c2a1-v3-2-test

Conversation

@vatsrahul1001
Copy link
Copy Markdown
Contributor

Manual backport of #66574 — auto-backport failed due to the RetryTask branch in supervisor's _handle_request.

Conflict resolution

task-sdk/src/airflow/sdk/execution_time/supervisor.py — one block under elif isinstance(msg, RetryTask):. v3-2-test still had the inline self.client.task_instances.retry(id=..., end_date=..., rendered_map_index=...) call. The PR's squash-merge bundled the "refactor terminal-state dispatch" reshape that replaces all four direct API calls (SucceedTask / RetryTask / DeferTask / RescheduleTask) with self._send_terminal_state_msg(msg). The new _send_terminal_state_msg helper is included in this cherry-pick (defined at supervisor.py:1246 post-merge), so accepting the incoming side at the conflict is the correct resolution — it routes RetryTask through the same dispatcher the other three terminal states already use post-rebase.

The test file (test_supervisor.py) auto-merged cleanly — the new parametrized test_terminal_state_not_set_when_direct_api_fails and test_update_task_state_replays_pending_terminal_state_call (across all four message types) landed as-is.

Scope

2 files changed, +219 / −22 — exact mirror of #66574's merged diff.

* Recover stuck TIs when direct terminal-state API call fails

The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask,
and RescheduleTask set _terminal_state BEFORE calling the matching
client.task_instances.{succeed,retry,defer,reschedule}() API. If that
API call raised (transient network blip, server 5xx, etc.),
_terminal_state was set on the supervisor but the server never saw
the transition. The supervisor's update_task_state_if_needed then
saw final_state in STATES_SENT_DIRECTLY and short-circuited the
recovery finish() call -- leaving the TaskInstance stuck RUNNING
on the server forever, blocking downstream dependencies and
triggering false alerts.

Two-part fix:

1. Make the direct API call FIRST. Only set _terminal_state and the
   new _terminal_state_synced_to_server flag after the call returns
   successfully. If the API raises, both stay unset and the exception
   propagates to handle_requests, where the existing catch-all sends
   an ErrorResponse to the task subprocess.

2. Have update_task_state_if_needed always call finish() when
   _terminal_state_synced_to_server is False, regardless of what
   final_state happens to return. The finish() API takes the state
   value, so a SUCCESS / DEFERRED / etc. transition that originally
   failed is re-attempted via finish() on subprocess exit.
   Pre-existing semantics for the no-direct-API states (FAILED,
   UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in
   the same finish() branch.

Tests added:

- _terminal_state not set when succeed() raises.
- update_task_state_if_needed calls finish() when synced flag is
  False, even with final_state == SUCCESS.
- update_task_state_if_needed skips finish() when synced flag is
  True (preserves the existing happy-path optimisation).

Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007).

* Refactor terminal-state dispatch and parametrize tests across all 4 states

Address review feedback on #66574:

- Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch
  for succeed / retry / defer / reschedule lives in one place. Both
  `_handle_request` and `_replay_pending_terminal_state_msg` now go
  through it instead of duplicating the four-branch isinstance chain.
- Parametrize the two recovery tests over all four terminal-state
  message types (was only Succeed + Defer); add UP_FOR_RETRY and
  UP_FOR_RESCHEDULE coverage.

* Narrow _pending_terminal_state_msg type to satisfy mypy

The field was annotated as BaseModel | None, but _send_terminal_state_msg
expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy
couldn't prove the narrowing at the _replay_pending_terminal_state_msg
call site. Tighten the field type to the exact union the setter assigns
and the consumer accepts.

---------

Co-authored-by: vatsrahul1001 <rah.sharma11@gmail.com>
Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
(cherry picked from commit 173c2a1)
…test signature

The cherry-picked _send_terminal_state_msg dispatcher passed
retry_delay_seconds and retry_reason kwargs (via getattr defensive
fallback) to client.task_instances.retry(). The v3-2-test version of
retry() in task-sdk/src/airflow/sdk/api/client.py only accepts
(id, end_date, rendered_map_index) — those kwargs don't exist on this
branch yet.

In mock-based unit tests the extra kwargs were silently accepted by
Mock but tripped assert_called_once_with. In real DB tests (Postgres
test_task_instance_history_is_created_when_ti_goes_for_retry,
MySQL/SQLite equivalents) the retry() call raised TypeError, the API
server never received the retry transition, and TaskInstanceHistory
never got created — the test's UUID-rotation assertion failed.
@vatsrahul1001 vatsrahul1001 merged commit 635fc94 into v3-2-test May 20, 2026
89 checks passed
@vatsrahul1001 vatsrahul1001 deleted the backport-173c2a1-v3-2-test branch May 20, 2026 03:59
vatsrahul1001 added a commit that referenced this pull request May 20, 2026
…ls (#66574) (#67204)

* Recover stuck TIs when direct terminal-state API call fails (#66574)

* Recover stuck TIs when direct terminal-state API call fails

The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask,
and RescheduleTask set _terminal_state BEFORE calling the matching
client.task_instances.{succeed,retry,defer,reschedule}() API. If that
API call raised (transient network blip, server 5xx, etc.),
_terminal_state was set on the supervisor but the server never saw
the transition. The supervisor's update_task_state_if_needed then
saw final_state in STATES_SENT_DIRECTLY and short-circuited the
recovery finish() call -- leaving the TaskInstance stuck RUNNING
on the server forever, blocking downstream dependencies and
triggering false alerts.

Two-part fix:

1. Make the direct API call FIRST. Only set _terminal_state and the
   new _terminal_state_synced_to_server flag after the call returns
   successfully. If the API raises, both stay unset and the exception
   propagates to handle_requests, where the existing catch-all sends
   an ErrorResponse to the task subprocess.

2. Have update_task_state_if_needed always call finish() when
   _terminal_state_synced_to_server is False, regardless of what
   final_state happens to return. The finish() API takes the state
   value, so a SUCCESS / DEFERRED / etc. transition that originally
   failed is re-attempted via finish() on subprocess exit.
   Pre-existing semantics for the no-direct-API states (FAILED,
   UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in
   the same finish() branch.

Tests added:

- _terminal_state not set when succeed() raises.
- update_task_state_if_needed calls finish() when synced flag is
  False, even with final_state == SUCCESS.
- update_task_state_if_needed skips finish() when synced flag is
  True (preserves the existing happy-path optimisation).

Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007).

* Refactor terminal-state dispatch and parametrize tests across all 4 states

Address review feedback on #66574:

- Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch
  for succeed / retry / defer / reschedule lives in one place. Both
  `_handle_request` and `_replay_pending_terminal_state_msg` now go
  through it instead of duplicating the four-branch isinstance chain.
- Parametrize the two recovery tests over all four terminal-state
  message types (was only Succeed + Defer); add UP_FOR_RETRY and
  UP_FOR_RESCHEDULE coverage.

* Narrow _pending_terminal_state_msg type to satisfy mypy

The field was annotated as BaseModel | None, but _send_terminal_state_msg
expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy
couldn't prove the narrowing at the _replay_pending_terminal_state_msg
call site. Tighten the field type to the exact union the setter assigns
and the consumer accepts.

---------

Co-authored-by: vatsrahul1001 <rah.sharma11@gmail.com>
Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
(cherry picked from commit 173c2a1)

* Don't pass retry_delay_seconds/retry_reason to retry() — not in v3-2-test signature

The cherry-picked _send_terminal_state_msg dispatcher passed
retry_delay_seconds and retry_reason kwargs (via getattr defensive
fallback) to client.task_instances.retry(). The v3-2-test version of
retry() in task-sdk/src/airflow/sdk/api/client.py only accepts
(id, end_date, rendered_map_index) — those kwargs don't exist on this
branch yet.

In mock-based unit tests the extra kwargs were silently accepted by
Mock but tripped assert_called_once_with. In real DB tests (Postgres
test_task_instance_history_is_created_when_ti_goes_for_retry,
MySQL/SQLite equivalents) the retry() call raised TypeError, the API
server never received the retry transition, and TaskInstanceHistory
never got created — the test's UUID-rotation assertion failed.

---------

Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
vatsrahul1001 added a commit that referenced this pull request May 20, 2026
…ls (#66574) (#67204)

* Recover stuck TIs when direct terminal-state API call fails (#66574)

* Recover stuck TIs when direct terminal-state API call fails

The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask,
and RescheduleTask set _terminal_state BEFORE calling the matching
client.task_instances.{succeed,retry,defer,reschedule}() API. If that
API call raised (transient network blip, server 5xx, etc.),
_terminal_state was set on the supervisor but the server never saw
the transition. The supervisor's update_task_state_if_needed then
saw final_state in STATES_SENT_DIRECTLY and short-circuited the
recovery finish() call -- leaving the TaskInstance stuck RUNNING
on the server forever, blocking downstream dependencies and
triggering false alerts.

Two-part fix:

1. Make the direct API call FIRST. Only set _terminal_state and the
   new _terminal_state_synced_to_server flag after the call returns
   successfully. If the API raises, both stay unset and the exception
   propagates to handle_requests, where the existing catch-all sends
   an ErrorResponse to the task subprocess.

2. Have update_task_state_if_needed always call finish() when
   _terminal_state_synced_to_server is False, regardless of what
   final_state happens to return. The finish() API takes the state
   value, so a SUCCESS / DEFERRED / etc. transition that originally
   failed is re-attempted via finish() on subprocess exit.
   Pre-existing semantics for the no-direct-API states (FAILED,
   UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in
   the same finish() branch.

Tests added:

- _terminal_state not set when succeed() raises.
- update_task_state_if_needed calls finish() when synced flag is
  False, even with final_state == SUCCESS.
- update_task_state_if_needed skips finish() when synced flag is
  True (preserves the existing happy-path optimisation).

Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007).

* Refactor terminal-state dispatch and parametrize tests across all 4 states

Address review feedback on #66574:

- Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch
  for succeed / retry / defer / reschedule lives in one place. Both
  `_handle_request` and `_replay_pending_terminal_state_msg` now go
  through it instead of duplicating the four-branch isinstance chain.
- Parametrize the two recovery tests over all four terminal-state
  message types (was only Succeed + Defer); add UP_FOR_RETRY and
  UP_FOR_RESCHEDULE coverage.

* Narrow _pending_terminal_state_msg type to satisfy mypy

The field was annotated as BaseModel | None, but _send_terminal_state_msg
expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy
couldn't prove the narrowing at the _replay_pending_terminal_state_msg
call site. Tighten the field type to the exact union the setter assigns
and the consumer accepts.

---------

Co-authored-by: vatsrahul1001 <rah.sharma11@gmail.com>
Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
(cherry picked from commit 173c2a1)

* Don't pass retry_delay_seconds/retry_reason to retry() — not in v3-2-test signature

The cherry-picked _send_terminal_state_msg dispatcher passed
retry_delay_seconds and retry_reason kwargs (via getattr defensive
fallback) to client.task_instances.retry(). The v3-2-test version of
retry() in task-sdk/src/airflow/sdk/api/client.py only accepts
(id, end_date, rendered_map_index) — those kwargs don't exist on this
branch yet.

In mock-based unit tests the extra kwargs were silently accepted by
Mock but tripped assert_called_once_with. In real DB tests (Postgres
test_task_instance_history_is_created_when_ti_goes_for_retry,
MySQL/SQLite equivalents) the retry() call raised TypeError, the API
server never received the retry transition, and TaskInstanceHistory
never got created — the test's UUID-rotation assertion failed.

---------

Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
vatsrahul1001 added a commit that referenced this pull request May 21, 2026
…ls (#66574) (#67204)

* Recover stuck TIs when direct terminal-state API call fails (#66574)

* Recover stuck TIs when direct terminal-state API call fails

The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask,
and RescheduleTask set _terminal_state BEFORE calling the matching
client.task_instances.{succeed,retry,defer,reschedule}() API. If that
API call raised (transient network blip, server 5xx, etc.),
_terminal_state was set on the supervisor but the server never saw
the transition. The supervisor's update_task_state_if_needed then
saw final_state in STATES_SENT_DIRECTLY and short-circuited the
recovery finish() call -- leaving the TaskInstance stuck RUNNING
on the server forever, blocking downstream dependencies and
triggering false alerts.

Two-part fix:

1. Make the direct API call FIRST. Only set _terminal_state and the
   new _terminal_state_synced_to_server flag after the call returns
   successfully. If the API raises, both stay unset and the exception
   propagates to handle_requests, where the existing catch-all sends
   an ErrorResponse to the task subprocess.

2. Have update_task_state_if_needed always call finish() when
   _terminal_state_synced_to_server is False, regardless of what
   final_state happens to return. The finish() API takes the state
   value, so a SUCCESS / DEFERRED / etc. transition that originally
   failed is re-attempted via finish() on subprocess exit.
   Pre-existing semantics for the no-direct-API states (FAILED,
   UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in
   the same finish() branch.

Tests added:

- _terminal_state not set when succeed() raises.
- update_task_state_if_needed calls finish() when synced flag is
  False, even with final_state == SUCCESS.
- update_task_state_if_needed skips finish() when synced flag is
  True (preserves the existing happy-path optimisation).

Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007).

* Refactor terminal-state dispatch and parametrize tests across all 4 states

Address review feedback on #66574:

- Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch
  for succeed / retry / defer / reschedule lives in one place. Both
  `_handle_request` and `_replay_pending_terminal_state_msg` now go
  through it instead of duplicating the four-branch isinstance chain.
- Parametrize the two recovery tests over all four terminal-state
  message types (was only Succeed + Defer); add UP_FOR_RETRY and
  UP_FOR_RESCHEDULE coverage.

* Narrow _pending_terminal_state_msg type to satisfy mypy

The field was annotated as BaseModel | None, but _send_terminal_state_msg
expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy
couldn't prove the narrowing at the _replay_pending_terminal_state_msg
call site. Tighten the field type to the exact union the setter assigns
and the consumer accepts.

---------

Co-authored-by: vatsrahul1001 <rah.sharma11@gmail.com>
Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
(cherry picked from commit 173c2a1)

* Don't pass retry_delay_seconds/retry_reason to retry() — not in v3-2-test signature

The cherry-picked _send_terminal_state_msg dispatcher passed
retry_delay_seconds and retry_reason kwargs (via getattr defensive
fallback) to client.task_instances.retry(). The v3-2-test version of
retry() in task-sdk/src/airflow/sdk/api/client.py only accepts
(id, end_date, rendered_map_index) — those kwargs don't exist on this
branch yet.

In mock-based unit tests the extra kwargs were silently accepted by
Mock but tripped assert_called_once_with. In real DB tests (Postgres
test_task_instance_history_is_created_when_ti_goes_for_retry,
MySQL/SQLite equivalents) the retry() call raised TypeError, the API
server never received the retry transition, and TaskInstanceHistory
never got created — the test's UUID-rotation assertion failed.

---------

Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants