Skip to content

Add required context messages to all task instance state change notifications#66394

Draft
1fanwang wants to merge 5 commits intoapache:mainfrom
1fanwang:1fanwang/aip97-task-listener-msg
Draft

Add required context messages to all task instance state change notifications#66394
1fanwang wants to merge 5 commits intoapache:mainfrom
1fanwang:1fanwang/aip97-task-listener-msg

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 5, 2026

Description

Mirrors the DagRun listener msg arg pattern (#56272) for the four task
instance listener hooks: on_task_instance_running,
on_task_instance_success, on_task_instance_failed,
on_task_instance_skipped.

The new msg arg carries short canonical context for the state change so
listeners can route or filter events without re-deriving intent from other
fields. on_task_instance_failed especially benefits — the msg
distinguishes terminal failures from up_for_retry paths without
inspecting error or task config.

Canonical values

Hook Worker path API-driven path
on_task_instance_running "started"
on_task_instance_success "success" "manually_set_to_success"
on_task_instance_failed "failed" (terminal), "up_for_retry" (will retry) "manually_set_to_failed"
on_task_instance_skipped "skipped" "manually_set_to_skipped"

Backwards compatibility

pluggy dispatches by parameter name, so existing @hookimpl
implementations that don't declare msg keep working unchanged.

Testing

  • CustomListener test fixture in
    task-sdk/tests/task_sdk/execution_time/test_task_runner.py records
    the msg on every hook call and asserts the canonical value on the
    existing running/success/failed/skipped tests.
  • New parametrized test
    test_task_runner_listener_msg_distinguishes_retry_vs_terminal
    exercises both branches in finalize(): should_retry=True
    (UP_FOR_RETRY, "up_for_retry"), should_retry=False
    (FAILED, "failed").

^ Add meaningful description above
Read the Pull Request Guidelines for more information.

E2E validation

Direct pluggy dispatch test against the worktree source (no full Airflow boot, just the listener manager + new hookspec + recording listeners). Result:

=== new listener (declares msg) received ===
  ('running', 'started')
  ('success', 'success')
  ('skipped', 'skipped')
  ('failed', 'failed', 'ValueError')
  ('failed', 'up_for_retry', 'ValueError')
  ('failed', 'manually_set_to_failed', 'str')

=== legacy listener (no msg arg) still called ===
  ('legacy_running',)
  ('legacy_success',)

Confirms: the new msg arg flows to listeners that declare it, the canonical values match the table above, and pre-existing listeners that don't declare msg continue to work unchanged (pluggy's name-based dispatch).

Real E2E validation (Airflow standalone)

Re-ran with airflow standalone (real scheduler + API server + LocalExecutor worker + sqlite metadata DB) on the worktree's editable install. Recording listener plugin appended every TI hook invocation to a log file. DAGs that exercise success / failed / skipped / retry-then-fail / manual-set paths.

running   prev=TaskInstanceState.QUEUED  msg=started              task=ok_task        # e2e_success
running   prev=TaskInstanceState.QUEUED  msg=started              task=boom_task      # e2e_failed
failed    prev=TaskInstanceState.RUNNING msg=failed               task=boom_task     error_type=ValueError
success   prev=TaskInstanceState.RUNNING msg=success              task=ok_task
running   prev=TaskInstanceState.QUEUED  msg=started              task=skip_task      # e2e_skipped
skipped   prev=TaskInstanceState.RUNNING msg=skipped              task=skip_task
running   prev=TaskInstanceState.QUEUED  msg=started              task=retry_task     # e2e_retry_then_fail (try 1)
failed    prev=TaskInstanceState.RUNNING msg=up_for_retry         task=retry_task    error_type=ValueError
running   prev=TaskInstanceState.QUEUED  msg=started              task=retry_task     # e2e_retry_then_fail (try 2)
failed    prev=TaskInstanceState.RUNNING msg=failed               task=retry_task    error_type=ValueError

--- BEGIN MANUAL SET (PATCH /api/v2/.../taskInstances/ok_task new_state=failed) ---
failed    prev=None                      msg=manually_set_to_failed task=ok_task     error_type=str   error=TaskInstance's state was manually set to `failed`.

All 6 canonical msg values land correctly through the real worker/scheduler/API-server pipeline:

  • started / success / failed / skipped / up_for_retry from the worker (Task SDK task_runner.run()finalize())
  • manually_set_to_failed from the API server (_emit_state_listener_hooks triggered by the PATCH endpoint)

The error_type=str on the manual-set row is the current behavior on this branch — #66399 wraps that in a RuntimeError for type uniformity.

Integrated mega-branch validation (all 7 PRs composed)

This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services — airflow standalone running scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:

running   prev=QUEUED    msg=started               task=ok_task                                        ← PR-A msg arg
success   prev=RUNNING   msg=success               task=ok_task                                        ← PR-A
running   prev=QUEUED    msg=started               task=boom_task
failed    prev=RUNNING   msg=failed                task=boom_task   error_type=ValueError   fd=None    ← PR-A + PR-D + PR-F kwarg
running   prev=QUEUED    msg=started               task=skip_task
skipped   prev=RUNNING   msg=skipped               task=skip_task                                      ← PR-A skipped path
running   prev=QUEUED    msg=started               task=retry_task
failed    prev=RUNNING   msg=up_for_retry          task=retry_task  error_type=ValueError              ← PR-A retry-vs-terminal
running   prev=QUEUED    msg=started               task=retry_task    (try 2 of 2)
failed    prev=RUNNING   msg=failed                task=retry_task  error_type=ValueError
running   prev=QUEUED    msg=started               task=checkpoint_task
checkpointed prev=RUNNING                          task=checkpoint_task  checkpoint_data={'step': 5,
                                                                          'iterator_offset': 1024}     ← PR-E + PR-G

--- BEGIN MANUAL SET (PATCH /api/v2/.../taskInstances/ok_task new_state=failed) ---
failed    prev=None      msg=manually_set_to_failed task=ok_task    error_type=RuntimeError   fd=None  ← PR-D RuntimeError wrap
                                                                                                          (would be `str` on the PR-A-only branch)

What this validates jointly:

PR Surface Evidence in log
#66394 (msg arg) every TI hook has msg=... 6 canonical values fire (started, success, failed, skipped, up_for_retry, manually_set_to_failed)
#66395 (hook-name log, TI) logs identify the failing hook tested separately with throwing listener — see PR body
#66397 (hook-name log, rest) lifecycle / DagRun / asset surfaces tested separately with throwing listener — see PR body
#66399 (tighten error type) error: BaseException | None manual-set path delivers RuntimeError (was str on PR-A alone)
#66402 (CHECKPOINTED state) worker catches AirflowTaskCheckpointed running → checkpointed transition observed at the listener and at the supervisor message boundary
#66405 (FailureDetails) listener can declare failure_details kwarg failure_details=None flowing through every failure (no executor populates yet)
#66410 (on_task_instance_checkpointed) new hook fires with payload checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}

Repro

# Combine all 7 branches onto a mega branch (resolve trivial overlap on the
# spec file's failure hook signature — error + msg + failure_details kwargs
# in one signature) and install editable:
pip install -e shared/listeners -e task-sdk -e airflow-core
AIRFLOW__CORE__EXECUTOR=LocalExecutor airflow standalone &

# Drop the recording listener (declares all 5 hooks including the new
# checkpointed one) into $AIRFLOW_HOME/plugins/, drop 5 DAGs into dags/
# (success / failed / skipped / retry-then-fail / checkpointed), trigger them.
for dag in e2e_success e2e_failed e2e_skipped e2e_retry_then_fail e2e_checkpointed; do
  airflow dags trigger $dag
done

# Then PATCH a state via the public API to exercise the manual path.

Bugs surfaced and fixed during this validation

This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:

Last two would have broken every task failure on apache/airflow main if the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.

Documented gap (deliberately not fixed in this stack)

task-sdk/.../supervisor.py:STATES_SENT_DIRECTLY lists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back to client.task_instances.finish() which the API server constrains to terminal states. The mega listener log shows the worker successfully logging Task checkpointed; reporting CHECKPOINTED state. and on_task_instance_checkpointed firing with the correct payload — but the DB row eventually transitions to failed because the supervisor cannot persist CHECKPOINTED through finish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.

…ications

Mirrors the DagRun listener msg arg pattern (apache#56272) for the four task
instance listener hooks: on_task_instance_running, on_task_instance_success,
on_task_instance_failed, on_task_instance_skipped.

The new msg arg carries short canonical context for the state change so
listeners can route or filter events without re-deriving intent from other
fields. Canonical values: "started", "success", "skipped", "failed",
"up_for_retry" from the worker; "manually_set_to_*" when the state was
changed via the API. on_task_instance_failed especially benefits because
the msg distinguishes terminal failures from up_for_retry without
inspecting error.

pluggy dispatches by parameter name, so existing @hookimpl implementations
that don't declare msg keep working unchanged.
@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:task-sdk labels May 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant