AIP-97 (draft): add FailureDetails primitive for infrastructure-side failure context#66405
Draft
1fanwang wants to merge 8 commits intoapache:mainfrom
Draft
AIP-97 (draft): add FailureDetails primitive for infrastructure-side failure context#664051fanwang wants to merge 8 commits intoapache:mainfrom
1fanwang wants to merge 8 commits intoapache:mainfrom
Conversation
…context Foundation for AIP-97 (Infrastructure-Aware Task Execution). Today the on_task_instance_failed listener only sees the worker-side error exception; failure causes that originate outside the worker process — OOMKilled / PodEvicted on Kubernetes, WorkerLost / SoftTimeLimit on Celery, oom-killer SIGKILL on the local executor, preemption / eviction on resource-managed clusters — are visible only to the executor and never reach the listener. This PR adds the executor-agnostic FailureDetails type (executor_kind, infra_reason, infra_metadata) and extends the on_task_instance_failed hookspec to accept it as an optional keyword argument. Per-executor wiring is intentionally deferred to follow-up PRs so each executor's surfacing PR can iterate against a fixed contract. pluggy dispatches by parameter name, so existing hookimpls that don't declare failure_details keep working unchanged. Tests cover the FailureDetails dataclass surface (parametrized over five realistic executor / reason / metadata shapes, default factory empty dict, frozen immutability) and verify both the new listener-receives-failure-details path and back-compat for legacy listeners that don't declare the parameter.
20d6bd9 to
8c53810
Compare
…site Hookspec extension required all callers to provide the kwarg explicitly. Real e2e against airflow standalone surfaced HookCallError 'hook call must provide argument failure_details' on the existing call sites in task_runner.py, taskinstance.py, and the API server's manual-set path. Pass None until each executor's wiring PR populates it.
This was referenced May 5, 2026
…ire significant type)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
AIP-97 (Infrastructure-Aware Task Execution) is currently in DRAFT on the
cwiki. Opening this as the smallest concrete artifact so the listener-side
API shape can be discussed against real code — marking the PR draft for
that reason.
The gap:
on_task_instance_failedonly sees the worker-sideerrorexception. Failure causes that originate outside the worker process
(Kubernetes
OOMKilled/PodEvicted/ImagePullBackOff, CeleryWorkerLost/SoftTimeLimit, LocalExecutor oom-killerSIGKILL, and soon) are visible only to the executor and never reach the listener. The
shape is the same across executors — a kind tag, a categorical reason,
structured metadata — and
FailureDetailsformalises it:FailureDetailsis a frozenattrsclass withexecutor_kind,infra_reason,infra_metadata. The hookspec gains the optionalfailure_detailskeyword arg; pluggy dispatches by name, so existinghookimpls that don't declare it keep working.
Per-executor wiring (Kubernetes, Celery, LocalExecutor, and any other
executor that surfaces eviction/preemption) is intentionally deferred so
each can iterate against a fixed contract. Persisting
FailureDetailsonthe TaskInstance for UI rendering is also a separate question.
A few questions worth settling on the discussion:
infra_reasonas free-form string vs constrained enum — free-form herefor permissiveness, open to switching.
FailureDetailsbelongs onon_task_instance_successfor success-with-warnings cases (out of scope here; the failure path is
the demonstrably-broken one).
FailureDetailsvsInfraFailureDetailsvsExecutorFailureContext.Testing
TestFailureDetails.test_constructcovers fiverealistic executor / reason / metadata shapes (k8s OOM, k8s
evicted, celery worker-lost, local SIGKILL, unknown reason).
test_default_metadata_is_empty_dictandtest_frozenlockthe dataclass invariants.
TestOnTaskInstanceFailedAcceptsFailureDetailsexercises thepluggy-dispatch contract: the new listener receives the payload,
the legacy listener (no
failure_detailsparameter) still getscalled.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
E2E validation
Bug found and fixed during validation
Initial implementation had
failure_details: FailureDetails | None = Nonein the hookspec. End-to-end pluggy dispatch revealed a quirk: when a listener implementation declares the same parameter with a default (failure_details=None), pluggy treats the impl-side default as authoritative and silently overrides the value the caller passed. Removed the default from the hookspec and added an explicit warning to the docstring telling impl authors to declarefailure_detailswithout a default.After-fix runtime check
The fix means the impl-side signature must be exactly
failure_details(no default). Documented in the hookspec.Real e2e validation surfaced another bug — fixed in this PR
Re-ran with
airflow standaloneagainst the worktree's editable install and dropped a listener that declaresfailure_details(no default, per the hookspec docstring). Triggered a failing DAG.First run revealed a follow-on bug from removing the default: every existing
on_task_instance_failedcall site in the codebase (task_runner.pyfor the worker,taskinstance.pyfor the API server retry path,_emit_state_listener_hooksfor manual API state changes) didn't passfailure_details, and pluggy raisedHookCallError: hook call must provide argument 'failure_details'once the hookspec required it. Every task failure on the upstream branch would have hit this.Fix: pass
failure_details=Noneat every call site until each executor's wiring PR populates it. Commit9a51cbbe60updates the three call sites.After the fix, the listener receives the call cleanly:
failure_details=Noneis the expected value today — no executor populates this field yet, that's deferred to per-executor follow-up PRs (Kubernetes, Celery, LocalExecutor). The listener interface is wired and back-compat: a listener that doesn't declarefailure_detailskeeps working unchanged (pluggy's name-based dispatch); a listener that declares it (without a default) getsNoneuntil an executor populates it.Repro
DAG used:
Integrated mega-branch validation (all 7 PRs composed)
This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services —
airflow standalonerunning scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:What this validates jointly:
msg=...started,success,failed,skipped,up_for_retry,manually_set_to_failed)error: BaseException | NoneRuntimeError(wasstron PR-A alone)AirflowTaskCheckpointedrunning → checkpointedtransition observed at the listener and at the supervisor message boundaryfailure_detailskwargfailure_details=Noneflowing through every failure (no executor populates yet)checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}Repro
Bugs surfaced and fixed during this validation
This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:
AirflowTaskCheckpointedimport inrun()(NameError)_generated.pyTaskInstanceStatemissing CHECKPOINTED (AttributeError)TaskStatesupervisor message Literal rejected CHECKPOINTED (PydanticValidationError)_generated.pyIntermediateTIStatemissing CHECKPOINTEDfailure_details=Nonedefault silently received Noneon_task_instance_failedcall site missingfailure_detailskwarg (HookCallError: hook call must provide argument)Last two would have broken every task failure on apache/airflow
mainif the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.Documented gap (deliberately not fixed in this stack)
task-sdk/.../supervisor.py:STATES_SENT_DIRECTLYlists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back toclient.task_instances.finish()which the API server constrains to terminal states. The mega listener log shows the worker successfully loggingTask checkpointed; reporting CHECKPOINTED state.andon_task_instance_checkpointedfiring with the correct payload — but the DB row eventually transitions tofailedbecause the supervisor cannot persist CHECKPOINTED throughfinish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.