Honor AirflowFailException raised inside on_retry_callback#66781
Honor AirflowFailException raised inside on_retry_callback#667811fanwang wants to merge 3 commits into
Conversation
SameerMesiah97
left a comment
There was a problem hiding this comment.
Approach looks fine to me but someone more familiar with this area should weigh in. I have left some comments.
| # For UP_FOR_RETRY, defer sending the message until after on_retry_callback has run | ||
| # (finalize() sends it). This lets an AirflowFailException raised inside the callback | ||
| # promote the state to FAILED instead of letting the supervisor record a retry that | ||
| # the user explicitly asked to skip. See #60172. |
There was a problem hiding this comment.
nit: this could be clearer:
# Delay reporting UP_FOR_RETRY to the supervisor until after
# on_retry_callback runs so AirflowFailException can promote
# the task to FAILED and suppress the retry.
I don't think the issue reference is needed. But this is a more subjective point.
There was a problem hiding this comment.
Done in 193d2d6 — applied the shorter wording and dropped the issue reference.
| except Exception: | ||
| log.exception("error calling listener") | ||
| if error and task.email_on_retry and task.email: | ||
| _send_error_email_notification(task, ti, context, error, log) |
There was a problem hiding this comment.
Right now, this is a bit hard to follow. I would recommend extracting the listener and email notification into a helper like this:
def _handle_failure_notifications(
*,
task,
ti,
context,
error,
log,
send_email: bool,
) -> None:
try:
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING,
task_instance=ti,
error=error,
)
except Exception:
log.exception("error calling listener")
if send_email and task.email:
_send_error_email_notification(task, ti, context, error, log)
Then plug _handle_failure_notifications in so that lines 1960-1976 are replaced by this:
_handle_failure_notifications(
task=task,
ti=ti,
context=context,
error=error,
log=log,
send_email=task.email_on_failure,
)
else:
_handle_failure_notifications(
task=task,
ti=ti,
context=context,
error=error,
log=log,
send_email=bool(error and task.email_on_retry),
)
There was a problem hiding this comment.
Done in 193d2d6 — added _handle_failure_notifications and applied it across all three failure-path branches in finalize (AirflowFailException, retry, and FAILED) so the listener+email pattern only lives in one place.
| """ | ||
| AirflowFailException raised in on_retry_callback should fail the task without retrying. | ||
|
|
||
| Regression test for #60172. |
There was a problem hiding this comment.
Done in f59182e — dropped the docstring; the test name already describes the behavior.
| # Both callbacks should have run (retry callback first, then failure callback after | ||
| # AirflowFailException promoted the state to FAILED). | ||
| assert len(retry_callback_calls) == 1 | ||
| assert len(failure_callback_calls) == 1 |
There was a problem hiding this comment.
Maybe it would be better to assert the state transition here instead of the number of calls:
assert retry_callback_calls == [TaskInstanceState.UP_FOR_RETRY]
assert failure_callback_calls == [TaskInstanceState.FAILED]
There was a problem hiding this comment.
Done in f59182e — retry_callback_calls == [UP_FOR_RETRY] and failure_callback_calls == [FAILED] pin both the count and the state at callback time.
f59182e to
8f538c8
Compare
Raising AirflowFailException is the documented way to fail a task without retrying. Until now that signal was silently swallowed when raised from on_retry_callback: the catch-all `except Exception` inside `_run_task_state_change_callbacks` ate it, the task stayed UP_FOR_RETRY, and another attempt was scheduled. The retry path now defers sending its terminal message until after on_retry_callback has run. If the callback raises AirflowFailException, the state is promoted to FAILED, the pending RetryTask is replaced with TaskState(FAILED), and the failure-path finalizers (on_failure_callback, listener.on_task_instance_failed, email_on_failure) run as if the task had failed without ever attempting a retry. Other exceptions from on_retry_callback are still logged and swallowed, so callbacks that optimistically clean up partial data continue to work unchanged. closes: apache#60172
…mment - Rename newsfragment to match PR number (60172 -> 66781). - Tighten the comment in `run()` explaining why UP_FOR_RETRY messages are deferred until after on_retry_callback runs. - Extract `_handle_failure_notifications` for the listener + email pattern repeated across the three failure-path branches in `finalize` (AirflowFailException, retry, and FAILED). Signed-off-by: 1fanwang <1fannnw@gmail.com>
…transitions - Drop the docstring on `test_airflow_fail_exception_in_on_retry_callback_fails_task`; the test name already describes the assertion, and the project style guide prefers no docstrings in tests that merely repeat the function name. - Assert the recorded callback states directly (`retry_callback_calls == [UP_FOR_RETRY]`, `failure_callback_calls == [FAILED]`) instead of the call counts. Pins both the count AND the state at the moment each callback ran -- proving on_retry_callback fired while the task was UP_FOR_RETRY and on_failure_callback fired after AirflowFailException promoted it to FAILED. Signed-off-by: 1fanwang <1fannnw@gmail.com>
8f538c8 to
c47b237
Compare
Closes #60172.
_run_task_state_change_callbacksintask-sdk/src/airflow/sdk/execution_time/task_runner.pycatches every exception from a callback and logs it. That's the right default for noisy cleanup work, but it also swallows the explicitAirflowFailExceptionsignal — a user raising it insideon_retry_callbackto say "fail without retrying" had no way to actually fail the task. The state stayedUP_FOR_RETRYand another attempt was scheduled.The fix narrows the catch in the retry-callback path:
AirflowFailExceptionre-raises so the caller can react.Exceptionis still logged + swallowed — Wei's concern on the prior attempt (PR Fix AirflowFailException in on_retry_callback not preventing retries #64198) about cleanup callbacks that may fail is preserved.run()now defers the supervisorRetryTaskmessage until after the retry callback runs.finalize()gained an optionalmsgparameter: when the retry callback raisesAirflowFailException, finalize promotes the state toFAILED, replaces the pendingRetryTaskwithTaskState(FAILED), and runs the failure-path finalizers (on_failure_callback, listener hook,email_on_failure).Two prior attempts (#60415 closed 2026-03-05, #64198 closed 2026-05-05) tried to solve this; this PR picks up the Wei-shaped design and ships it with a regression test.
Tests
task-sdk/tests/task_sdk/execution_time/test_task_runner.py::TestTaskRunnerCallsCallbacks::test_airflow_fail_exception_in_on_retry_callback_fails_taskexercises the full path: retry callback raisesAirflowFailException, the failure callback runs, the supervisor receivesTaskState(FAILED)instead ofRetryTask. The test fails on main and passes with this PR; existing callback tests in the file behave unchanged (genericExceptionfrom a retry callback still swallowed + retry still scheduled).Reproducer
Reverting just the production changes (
task-sdk/src/airflow/sdk/execution_time/task_runner.pyandsupervisor.py) back toupstream/mainwhile keeping the new test surfaces the bug directly. With the call site adapted to the olderfinalize()signature (nomsgkwarg),runtime_ti.statelands onUP_FOR_RETRYinstead ofFAILED:Restoring the production code (the full PR diff) flips the same test to PASSED:
The
UP_FOR_RETRYline is the literal symptom from #60172 — the user raisedAirflowFailExceptioninsideon_retry_callbackand the task still ended up scheduled for another attempt. With the fix, the same callback raise promotes the task toFAILEDand the failure-path finalizers fire under that state.