Recover stuck TIs when direct terminal-state API call fails#66574
Conversation
1c22726 to
6e75113
Compare
6e75113 to
2f7437b
Compare
|
I'd love to get this one merged — and would love it in 3.2.2 if it's not too late. cc @vatsrahul1001 (3.2.2 RM) Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting |
|
@potiuk can you resolve the comments ? |
amoghrajesh
left a comment
There was a problem hiding this comment.
Looks ok, needs update to test
…tates Address review feedback on apache#66574: - Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch for succeed / retry / defer / reschedule lives in one place. Both `_handle_request` and `_replay_pending_terminal_state_msg` now go through it instead of duplicating the four-branch isinstance chain. - Parametrize the two recovery tests over all four terminal-state message types (was only Succeed + Defer); add UP_FOR_RETRY and UP_FOR_RESCHEDULE coverage.
The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask,
and RescheduleTask set _terminal_state BEFORE calling the matching
client.task_instances.{succeed,retry,defer,reschedule}() API. If that
API call raised (transient network blip, server 5xx, etc.),
_terminal_state was set on the supervisor but the server never saw
the transition. The supervisor's update_task_state_if_needed then
saw final_state in STATES_SENT_DIRECTLY and short-circuited the
recovery finish() call -- leaving the TaskInstance stuck RUNNING
on the server forever, blocking downstream dependencies and
triggering false alerts.
Two-part fix:
1. Make the direct API call FIRST. Only set _terminal_state and the
new _terminal_state_synced_to_server flag after the call returns
successfully. If the API raises, both stay unset and the exception
propagates to handle_requests, where the existing catch-all sends
an ErrorResponse to the task subprocess.
2. Have update_task_state_if_needed always call finish() when
_terminal_state_synced_to_server is False, regardless of what
final_state happens to return. The finish() API takes the state
value, so a SUCCESS / DEFERRED / etc. transition that originally
failed is re-attempted via finish() on subprocess exit.
Pre-existing semantics for the no-direct-API states (FAILED,
UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in
the same finish() branch.
Tests added:
- _terminal_state not set when succeed() raises.
- update_task_state_if_needed calls finish() when synced flag is
False, even with final_state == SUCCESS.
- update_task_state_if_needed skips finish() when synced flag is
True (preserves the existing happy-path optimisation).
Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007).
…tates Address review feedback on apache#66574: - Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch for succeed / retry / defer / reschedule lives in one place. Both `_handle_request` and `_replay_pending_terminal_state_msg` now go through it instead of duplicating the four-branch isinstance chain. - Parametrize the two recovery tests over all four terminal-state message types (was only Succeed + Defer); add UP_FOR_RETRY and UP_FOR_RESCHEDULE coverage.
The field was annotated as BaseModel | None, but _send_terminal_state_msg expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy couldn't prove the narrowing at the _replay_pending_terminal_state_msg call site. Tighten the field type to the exact union the setter assigns and the consumer accepts.
043b73a to
88557d8
Compare
Backport failed to create: v3-2-test. View the failure log Run detailsNote: As of Merging PRs targeted for Airflow 3.X In matter of doubt please ask in #release-management Slack channel.
You can attempt to backport this manually by running: cherry_picker 173c2a1 v3-2-testThis should apply the commit to the v3-2-test branch and leave the commit in conflict state marking After you have resolved the conflicts, you can continue the backport process by running: cherry_picker --continueIf you don't have cherry-picker installed, see the installation guide. |
|
Manual backport for review #67204 |
…ls (#66574) (#67204) * Recover stuck TIs when direct terminal-state API call fails (#66574) * Recover stuck TIs when direct terminal-state API call fails The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask, and RescheduleTask set _terminal_state BEFORE calling the matching client.task_instances.{succeed,retry,defer,reschedule}() API. If that API call raised (transient network blip, server 5xx, etc.), _terminal_state was set on the supervisor but the server never saw the transition. The supervisor's update_task_state_if_needed then saw final_state in STATES_SENT_DIRECTLY and short-circuited the recovery finish() call -- leaving the TaskInstance stuck RUNNING on the server forever, blocking downstream dependencies and triggering false alerts. Two-part fix: 1. Make the direct API call FIRST. Only set _terminal_state and the new _terminal_state_synced_to_server flag after the call returns successfully. If the API raises, both stay unset and the exception propagates to handle_requests, where the existing catch-all sends an ErrorResponse to the task subprocess. 2. Have update_task_state_if_needed always call finish() when _terminal_state_synced_to_server is False, regardless of what final_state happens to return. The finish() API takes the state value, so a SUCCESS / DEFERRED / etc. transition that originally failed is re-attempted via finish() on subprocess exit. Pre-existing semantics for the no-direct-API states (FAILED, UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in the same finish() branch. Tests added: - _terminal_state not set when succeed() raises. - update_task_state_if_needed calls finish() when synced flag is False, even with final_state == SUCCESS. - update_task_state_if_needed skips finish() when synced flag is True (preserves the existing happy-path optimisation). Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007). * Refactor terminal-state dispatch and parametrize tests across all 4 states Address review feedback on #66574: - Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch for succeed / retry / defer / reschedule lives in one place. Both `_handle_request` and `_replay_pending_terminal_state_msg` now go through it instead of duplicating the four-branch isinstance chain. - Parametrize the two recovery tests over all four terminal-state message types (was only Succeed + Defer); add UP_FOR_RETRY and UP_FOR_RESCHEDULE coverage. * Narrow _pending_terminal_state_msg type to satisfy mypy The field was annotated as BaseModel | None, but _send_terminal_state_msg expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy couldn't prove the narrowing at the _replay_pending_terminal_state_msg call site. Tighten the field type to the exact union the setter assigns and the consumer accepts. --------- Co-authored-by: vatsrahul1001 <rah.sharma11@gmail.com> Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com> (cherry picked from commit 173c2a1) * Don't pass retry_delay_seconds/retry_reason to retry() — not in v3-2-test signature The cherry-picked _send_terminal_state_msg dispatcher passed retry_delay_seconds and retry_reason kwargs (via getattr defensive fallback) to client.task_instances.retry(). The v3-2-test version of retry() in task-sdk/src/airflow/sdk/api/client.py only accepts (id, end_date, rendered_map_index) — those kwargs don't exist on this branch yet. In mock-based unit tests the extra kwargs were silently accepted by Mock but tripped assert_called_once_with. In real DB tests (Postgres test_task_instance_history_is_created_when_ti_goes_for_retry, MySQL/SQLite equivalents) the retry() call raised TypeError, the API server never received the retry transition, and TaskInstanceHistory never got created — the test's UUID-rotation assertion failed. --------- Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
…ls (#66574) (#67204) * Recover stuck TIs when direct terminal-state API call fails (#66574) * Recover stuck TIs when direct terminal-state API call fails The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask, and RescheduleTask set _terminal_state BEFORE calling the matching client.task_instances.{succeed,retry,defer,reschedule}() API. If that API call raised (transient network blip, server 5xx, etc.), _terminal_state was set on the supervisor but the server never saw the transition. The supervisor's update_task_state_if_needed then saw final_state in STATES_SENT_DIRECTLY and short-circuited the recovery finish() call -- leaving the TaskInstance stuck RUNNING on the server forever, blocking downstream dependencies and triggering false alerts. Two-part fix: 1. Make the direct API call FIRST. Only set _terminal_state and the new _terminal_state_synced_to_server flag after the call returns successfully. If the API raises, both stay unset and the exception propagates to handle_requests, where the existing catch-all sends an ErrorResponse to the task subprocess. 2. Have update_task_state_if_needed always call finish() when _terminal_state_synced_to_server is False, regardless of what final_state happens to return. The finish() API takes the state value, so a SUCCESS / DEFERRED / etc. transition that originally failed is re-attempted via finish() on subprocess exit. Pre-existing semantics for the no-direct-API states (FAILED, UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in the same finish() branch. Tests added: - _terminal_state not set when succeed() raises. - update_task_state_if_needed calls finish() when synced flag is False, even with final_state == SUCCESS. - update_task_state_if_needed skips finish() when synced flag is True (preserves the existing happy-path optimisation). Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007). * Refactor terminal-state dispatch and parametrize tests across all 4 states Address review feedback on #66574: - Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch for succeed / retry / defer / reschedule lives in one place. Both `_handle_request` and `_replay_pending_terminal_state_msg` now go through it instead of duplicating the four-branch isinstance chain. - Parametrize the two recovery tests over all four terminal-state message types (was only Succeed + Defer); add UP_FOR_RETRY and UP_FOR_RESCHEDULE coverage. * Narrow _pending_terminal_state_msg type to satisfy mypy The field was annotated as BaseModel | None, but _send_terminal_state_msg expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy couldn't prove the narrowing at the _replay_pending_terminal_state_msg call site. Tighten the field type to the exact union the setter assigns and the consumer accepts. --------- Co-authored-by: vatsrahul1001 <rah.sharma11@gmail.com> Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com> (cherry picked from commit 173c2a1) * Don't pass retry_delay_seconds/retry_reason to retry() — not in v3-2-test signature The cherry-picked _send_terminal_state_msg dispatcher passed retry_delay_seconds and retry_reason kwargs (via getattr defensive fallback) to client.task_instances.retry(). The v3-2-test version of retry() in task-sdk/src/airflow/sdk/api/client.py only accepts (id, end_date, rendered_map_index) — those kwargs don't exist on this branch yet. In mock-based unit tests the extra kwargs were silently accepted by Mock but tripped assert_called_once_with. In real DB tests (Postgres test_task_instance_history_is_created_when_ti_goes_for_retry, MySQL/SQLite equivalents) the retry() call raised TypeError, the API server never received the retry transition, and TaskInstanceHistory never got created — the test's UUID-rotation assertion failed. --------- Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
…ls (#66574) (#67204) * Recover stuck TIs when direct terminal-state API call fails (#66574) * Recover stuck TIs when direct terminal-state API call fails The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask, and RescheduleTask set _terminal_state BEFORE calling the matching client.task_instances.{succeed,retry,defer,reschedule}() API. If that API call raised (transient network blip, server 5xx, etc.), _terminal_state was set on the supervisor but the server never saw the transition. The supervisor's update_task_state_if_needed then saw final_state in STATES_SENT_DIRECTLY and short-circuited the recovery finish() call -- leaving the TaskInstance stuck RUNNING on the server forever, blocking downstream dependencies and triggering false alerts. Two-part fix: 1. Make the direct API call FIRST. Only set _terminal_state and the new _terminal_state_synced_to_server flag after the call returns successfully. If the API raises, both stay unset and the exception propagates to handle_requests, where the existing catch-all sends an ErrorResponse to the task subprocess. 2. Have update_task_state_if_needed always call finish() when _terminal_state_synced_to_server is False, regardless of what final_state happens to return. The finish() API takes the state value, so a SUCCESS / DEFERRED / etc. transition that originally failed is re-attempted via finish() on subprocess exit. Pre-existing semantics for the no-direct-API states (FAILED, UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in the same finish() branch. Tests added: - _terminal_state not set when succeed() raises. - update_task_state_if_needed calls finish() when synced flag is False, even with final_state == SUCCESS. - update_task_state_if_needed skips finish() when synced flag is True (preserves the existing happy-path optimisation). Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007). * Refactor terminal-state dispatch and parametrize tests across all 4 states Address review feedback on #66574: - Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch for succeed / retry / defer / reschedule lives in one place. Both `_handle_request` and `_replay_pending_terminal_state_msg` now go through it instead of duplicating the four-branch isinstance chain. - Parametrize the two recovery tests over all four terminal-state message types (was only Succeed + Defer); add UP_FOR_RETRY and UP_FOR_RESCHEDULE coverage. * Narrow _pending_terminal_state_msg type to satisfy mypy The field was annotated as BaseModel | None, but _send_terminal_state_msg expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy couldn't prove the narrowing at the _replay_pending_terminal_state_msg call site. Tighten the field type to the exact union the setter assigns and the consumer accepts. --------- Co-authored-by: vatsrahul1001 <rah.sharma11@gmail.com> Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com> (cherry picked from commit 173c2a1) * Don't pass retry_delay_seconds/retry_reason to retry() — not in v3-2-test signature The cherry-picked _send_terminal_state_msg dispatcher passed retry_delay_seconds and retry_reason kwargs (via getattr defensive fallback) to client.task_instances.retry(). The v3-2-test version of retry() in task-sdk/src/airflow/sdk/api/client.py only accepts (id, end_date, rendered_map_index) — those kwargs don't exist on this branch yet. In mock-based unit tests the extra kwargs were silently accepted by Mock but tripped assert_called_once_with. In real DB tests (Postgres test_task_instance_history_is_created_when_ti_goes_for_retry, MySQL/SQLite equivalents) the retry() call raised TypeError, the API server never received the retry transition, and TaskInstanceHistory never got created — the test's UUID-rotation assertion failed. --------- Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
…ls (#66574) (#67204) * Recover stuck TIs when direct terminal-state API call fails (#66574) * Recover stuck TIs when direct terminal-state API call fails The supervisor's _handle_request for SucceedTask, RetryTask, DeferTask, and RescheduleTask set _terminal_state BEFORE calling the matching client.task_instances.{succeed,retry,defer,reschedule}() API. If that API call raised (transient network blip, server 5xx, etc.), _terminal_state was set on the supervisor but the server never saw the transition. The supervisor's update_task_state_if_needed then saw final_state in STATES_SENT_DIRECTLY and short-circuited the recovery finish() call -- leaving the TaskInstance stuck RUNNING on the server forever, blocking downstream dependencies and triggering false alerts. Two-part fix: 1. Make the direct API call FIRST. Only set _terminal_state and the new _terminal_state_synced_to_server flag after the call returns successfully. If the API raises, both stay unset and the exception propagates to handle_requests, where the existing catch-all sends an ErrorResponse to the task subprocess. 2. Have update_task_state_if_needed always call finish() when _terminal_state_synced_to_server is False, regardless of what final_state happens to return. The finish() API takes the state value, so a SUCCESS / DEFERRED / etc. transition that originally failed is re-attempted via finish() on subprocess exit. Pre-existing semantics for the no-direct-API states (FAILED, UP_FOR_RETRY without RetryTask, etc.) preserved -- those land in the same finish() branch. Tests added: - _terminal_state not set when succeed() raises. - update_task_state_if_needed calls finish() when synced flag is False, even with final_state == SUCCESS. - update_task_state_if_needed skips finish() when synced flag is True (preserves the existing happy-path optimisation). Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-007). * Refactor terminal-state dispatch and parametrize tests across all 4 states Address review feedback on #66574: - Extract `_send_terminal_state_msg` helper so the per-msg-type dispatch for succeed / retry / defer / reschedule lives in one place. Both `_handle_request` and `_replay_pending_terminal_state_msg` now go through it instead of duplicating the four-branch isinstance chain. - Parametrize the two recovery tests over all four terminal-state message types (was only Succeed + Defer); add UP_FOR_RETRY and UP_FOR_RESCHEDULE coverage. * Narrow _pending_terminal_state_msg type to satisfy mypy The field was annotated as BaseModel | None, but _send_terminal_state_msg expects SucceedTask | RetryTask | DeferTask | RescheduleTask. mypy couldn't prove the narrowing at the _replay_pending_terminal_state_msg call site. Tighten the field type to the exact union the setter assigns and the consumer accepts. --------- Co-authored-by: vatsrahul1001 <rah.sharma11@gmail.com> Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com> (cherry picked from commit 173c2a1) * Don't pass retry_delay_seconds/retry_reason to retry() — not in v3-2-test signature The cherry-picked _send_terminal_state_msg dispatcher passed retry_delay_seconds and retry_reason kwargs (via getattr defensive fallback) to client.task_instances.retry(). The v3-2-test version of retry() in task-sdk/src/airflow/sdk/api/client.py only accepts (id, end_date, rendered_map_index) — those kwargs don't exist on this branch yet. In mock-based unit tests the extra kwargs were silently accepted by Mock but tripped assert_called_once_with. In real DB tests (Postgres test_task_instance_history_is_created_when_ti_goes_for_retry, MySQL/SQLite equivalents) the retry() call raised TypeError, the API server never received the retry transition, and TaskInstanceHistory never got created — the test's UUID-rotation assertion failed. --------- Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
Summary
The supervisor's
_handle_requestforSucceedTask,RetryTask,DeferTask, andRescheduleTaskset_terminal_statebefore calling the matchingclient.task_instances.{succeed,retry,defer,reschedule}()API. If that API call raised (transient network blip, server 5xx, etc.),_terminal_statewas set on the supervisor but the server never saw the transition. The supervisor'supdate_task_state_if_neededthen sawfinal_state in STATES_SENT_DIRECTLYand short-circuited the recoveryfinish()call — leaving the TaskInstance stuck RUNNING on the server forever, blocking downstream dependencies and triggering false alerts.Fix (two parts)
1. Set
_terminal_stateafter the direct API call succeedsMake the direct API call first. Only set
_terminal_stateand the new_terminal_state_synced_to_serverflag after the call returns successfully. If the API raises, both stay unset and the exception propagates tohandle_requests, where the existing catch-all sends anErrorResponseto the task subprocess.2. Recovery in
update_task_state_if_neededAlways call
finish()when_terminal_state_synced_to_serverisFalse, regardless of whatfinal_statehappens to return. Thefinish()API takes the state value, so a SUCCESS / DEFERRED / etc. transition that originally failed is re-attempted viafinish()on subprocess exit. Pre-existing semantics for the no-direct-API states (FAILED, UP_FOR_RETRY without RetryTask, etc.) are preserved — those land in the samefinish()branch.Tests
_terminal_statenot set whensucceed()raises.update_task_state_if_neededcallsfinish()when synced flag isFalse, even withfinal_state == SUCCESS.update_task_state_if_neededskipsfinish()when synced flag isTrue(preserves the existing happy-path optimisation).Reported by
L3 ASVS sweep — apache/tooling-agents#24 (FINDING-007).
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.7) following the guidelines