Skip to content

Add Execution API endpoints for callback lifecycle and token swap#66141

Open
wjddn279 wants to merge 3 commits intoapache:mainfrom
wjddn279:add-callback-lifecycle-api
Open

Add Execution API endpoints for callback lifecycle and token swap#66141
wjddn279 wants to merge 3 commits intoapache:mainfrom
wjddn279:add-callback-lifecycle-api

Conversation

@wjddn279
Copy link
Copy Markdown
Contributor

@wjddn279 wjddn279 commented Apr 30, 2026

Problems

1. Workload token leaks into callback supervisor's runtime API calls

related #60108, When the executor enqueues an ExecuteCallback, the supervisor receives a long-lived workload-scoped token (intended only as a one-time admission ticket, like for ExecuteTask). However, unlike ExecuteTask — which immediately swaps it for an execution-scoped token via PATCH /task-instances/{id}/run — supervise_callback never performs the swap. It carries the workload token straight through into runtime API calls (e.g. Connection.get(), Variable.get()).

2026-04-29T11:08:35.766821Z [warning  ] Token type not allowed for endpoint [airflow.api_fastapi.execution_api.security] allowed_types=['execution'] correlation_id=019dd8ed-2e34-7b0d-bfba-d7fe16612411 loc=security.py:174 path=/execution/connections/slack_default token_scope=workload
2026-04-29T11:08:35.767466Z [info     ] request finished               [http.access] client_addr=172.18.0.7:47640 duration_us=1634 loc=http_access_log.py:98 method=GET path=/execution/connections/slack_default query= status_code=403

These shared endpoints only accept execution-scoped tokens, so any callback that touches Connections or Variables hits 403 from require_auth's scope check.

2. process_executor_events is the only writer of callback state, and it's executor-specific

Today, callback state transitions (QUEUED → RUNNING → SUCCESS/FAILED) are written only by the scheduler's _process_executor_events, which reads from each executor's in-process event_buffer. It does not work for executors that don't use the scheduler's in-process event channel — most notably EdgeExecutor, where workers run on remote machines and report back via HTTP. Their callbacks stay forever in QUEUED.

Solution

Move callback state transitions to the Execution API, mirroring the ExecuteTask pattern:

  • POST /callbacks/{id}/run — accepts both workload and execution tokens; transitions QUEUED → RUNNING and returns a fresh execution token via the Refreshed-API-Token header (same swap mechanism PR Two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues #60108 introduced for TI). All subsequent callback API calls use the execution token.
  • PATCH /callbacks/{id}/state — execution-only; records the terminal SUCCESS / FAILED outcome with optional output.
  • supervise_callback calls client.callbacks.start() before forking the subprocess and client.callbacks.finish() afterward, so the API is the single source of truth for callback
    state — works uniformly across all executors (Local, Celery, Kubernetes, Edge).
  • _process_executor_events keeps its callback handling as a forward-only fallback for cases where the supervisor crashes before reporting a terminal state; it never regresses a state already advanced by the API.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    claude code (opus 4.7) to make test codes and ci green

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:Scheduler including HA (high availability) scheduler area:task-sdk labels Apr 30, 2026
@anishgirianish
Copy link
Copy Markdown
Contributor

anishgirianish commented Apr 30, 2026

@wjddn279 Hey! Confirmed the regression is real and on me, generate_token flipped to workload scope at the base class but supervise_callback never got the paired swap. Thanks for catching it.

Quick thought before you go deeper on the PR: this is now the third place we'd be hand-rolling the workload→execution swap (tasks /run, your callback /run, and #62343's connection-test PATCH). Wonder if it's worth lifting it into a generic POST /auth/exchange + a transparent swap in the SDK client, so resource endpoints stay execution-only and future workload types inherit the fix automatically. Happy to chat / draft if it sounds reasonable.

Would love to hear @ashb @amoghrajesh @kaxil thoughts on this as well

@wjddn279
Copy link
Copy Markdown
Contributor Author

@anishgirianish

So you're suggesting we separate out only the API that exchanges the token into its own? Considering that workload types will likely grow, that sounds like a reasonable idea. But right now, we're tightening the security of the token swap by extremely restricting when the API can succeed, right? (e.g., only allowing it to succeed on the queued → running state transition.) Would that still be possible if we generalize it?

@anishgirianish
Copy link
Copy Markdown
Contributor

anishgirianish commented Apr 30, 2026

@anishgirianish

So you're suggesting we separate out only the API that exchanges the token into its own? Considering that workload types will likely grow, that sounds like a reasonable idea. But right now, we're tightening the security of the token swap by extremely restricting when the API can succeed, right? (e.g., only allowing it to succeed on the queued → running state transition.) Would that still be possible if we generalize it?

Yeah, thinking about it again,  you're right, the QUEUED → RUNNING gate gives us single-use structurally, and that's worth keeping, I think, maybe abstraction is not worth it as of now.Thanks for pointing out

Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/callbacks.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/callbacks.py Outdated
Comment thread task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
Comment thread task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py Outdated
@wjddn279 wjddn279 force-pushed the add-callback-lifecycle-api branch from 7709b17 to 4af312d Compare May 5, 2026 13:22
@potiuk
Copy link
Copy Markdown
Member

potiuk commented May 5, 2026

@wjddn279 — Your unresolved review thread(s) from @amoghrajesh appear to have been addressed (post-review commits and/or in-thread replies on every thread, with the latest commit pushed after the most recent thread). I've added the ready for maintainer review label so the PR re-enters the maintainer review queue.

@amoghrajesh — could you take another look when you have a chance? If you agree the feedback was addressed, please mark the threads as resolved so the queue signal stays accurate. If a thread still needs work, please reply in-line — @wjddn279 will follow up.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label May 5, 2026
@wjddn279 wjddn279 force-pushed the add-callback-lifecycle-api branch 3 times, most recently from 1325cbb to 137b659 Compare May 7, 2026 02:43
@potiuk
Copy link
Copy Markdown
Member

potiuk commented May 10, 2026

conflicts :(

@wjddn279 wjddn279 force-pushed the add-callback-lifecycle-api branch from 137b659 to a95e678 Compare May 10, 2026 08:31
Copy link
Copy Markdown
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid architectural change — moving callback state transitions onto the Execution API closes the EdgeExecutor (and any future remote-execution executor) gap, and the workload-token swap fixes the 403s on Connection / Variable lookups inside callbacks. The _process_executor_events fallback is correctly forward-only — terminal states won't regress on stale executor events. Tests are decent: 264 lines for the new endpoints, parametrized forward-only matrix in test_scheduler_job.py, and the supervise_callback bookend tests cover all three subprocess outcomes (success / non-zero exit / raise). CI is green.

@amoghrajesh's earlier review was substantially addressed in 7a0e3523 / a95e678d — the race-condition concern (callbacks.py:88), the callback.output-before-assignment regression, and the spec-less mocks all look fixed. One thread is still open: the dead-code branch on RUNNING events in scheduler_job_runner.py (see inline #1).

My own pass added a few more observations inline. Disposition: COMMENT — none are blocking, but I'd want #1, #2, and #3 addressed before APPROVE. The architecture is sound; this is about tightening edges.


This review was drafted by an AI-assisted tool and
confirmed by an Apache Airflow maintainer. The findings
below are observations, not blockers; an Apache Airflow
maintainer — a real person — will take the next look at the
PR. If you think a finding is mis-applied, please reply on
the PR and a maintainer will weigh in.

More on how Apache Airflow handles maintainer review:
contributing-docs/05_pull_requests.rst.


need_to_modify = False

if state == CallbackState.RUNNING:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major (open thread, not yet addressed). @amoghrajesh flagged this in the prior review pass: the if state == CallbackState.RUNNING: branch only emits a log and modifies nothing, because the API is now the source of truth for RUNNING transitions. With the API-driven design, executor RUNNING events are informational at best.

Two cleanups worth picking from:

  1. Drop the branch entirely (and the corresponding entry from the state in (...) filter at the top of process_executor_events). The fallback only needs to advance terminal states.
  2. Keep the branch, but downgrade the log to cls.logger().debug(...) and add a one-line comment that the API is authoritative for RUNNING transitions and this log is informational.

Either is fine; the current shape sits in the middle and reads as dead code at first glance.

value: Annotated[str, Field(title="Value")]


class State(str, Enum):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major. Generic class name State in a shared _generated.py is collision-prone (it's literally the most-used identifier in the codebase). The cause is the source: CallbackTerminalState = Literal["success", "failed"] in execution_api/datamodels/callback.py — Cadwyn's OpenAPI generation can't see through type aliases and falls back to naming the enum after the field (stateState).

Fix is in the source, not here. Replace the Literal alias with a real Enum:

# execution_api/datamodels/callback.py
class CallbackTerminalState(str, Enum):
    SUCCESS = "success"
    FAILED = "failed"


class CallbackTerminalStatePayload(StrictBaseModel):
    state: CallbackTerminalState
    output: str | None = None

Then the generated module gets class CallbackTerminalState(str, Enum), which is collision-safe and self-documenting. The prek hook regenerates _generated.py automatically.

clear_db_callbacks()

@pytest.mark.parametrize("scope", ["workload", "execution"])
def test_run_marks_callback_running_and_swaps_workload_token(self, client, exec_app, session, scope):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major — missing test for the highest-value behaviour change in this PR. The race-condition fix (RUNNING-idempotent retry on POST /callbacks/{id}/run, with the workload-token swap still firing) is the exact path @amoghrajesh's earlier comment was about. The fix landed in the route, but no test covers it.

Worth adding alongside the existing run test:

def test_run_idempotent_when_already_running_with_workload_token(
    self, client, exec_app, session
):
    cb = _make_callback(CallbackState.RUNNING, session)
    mock_gen = mock.MagicMock(spec=JWTGenerator)
    mock_gen.generate.return_value = "mock-execution-token"
    lifespan.registry.register_value(JWTGenerator, mock_gen)
    _override_require_auth(exec_app, scope="workload")

    response = client.post(f"/execution/callbacks/{cb.id}/run")
    exec_app.dependency_overrides.pop(require_auth, None)

    assert response.status_code == 204
    assert response.headers["Refreshed-API-Token"] == "mock-execution-token"
    session.expire_all()
    assert session.get(ExecutorCallback, cb.id).state == CallbackState.RUNNING

Locks in (a) idempotent 204, (b) fresh execution token issued for the retry, (c) state stays RUNNING. Also worth a sibling case for execution-scoped retry where no swap should happen.

def finish(
self,
id: uuid.UUID | str,
state: str,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. state: str is too loose — the only valid values are "success" and "failed". A typo like state="successs" or state="FAILED" would only surface as a 422 from the API. Tighten with Literal (or import CallbackTerminalState once #2 lands):

from typing import Literal

def finish(
    self,
    id: uuid.UUID | str,
    state: Literal["success", "failed"],
    output: str | None = None,
) -> None:

Matches the API's actual contract and gives the supervisor's caller a compile-time check.

from cadwyn import VersionChange, endpoint


class AddCallbackEndpoints(VersionChange):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor / question. The version label 2026-04-30 is the date this PR's first commit was authored. The convention in the rest of the bundle (and what an archeologist reading the file later would expect) is the date the change LANDED. With the PR opened in late April and now mid-May, a 2026-05-XX label is closer to reality.

Not a blocker — if apache/airflow has settled on a different convention (dev-start date, milestone date, etc.), happy to be corrected. The mechanical change is just renaming the file and the version-bundle entry. cadwyn's Version("YYYY-MM-DD", AddCallbackEndpoints) doesn't care about the actual date semantically; it's a sortable label.

Copy link
Copy Markdown
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up from a second-read pass through Codex (adversarial reviewer). It found a real stranded-state path I missed in the primary review — [high] needs-attention, see inline. The fix interacts with my finding #1 (dead-code RUNNING branch): both are about the fallback being too narrow, and the right resolution probably addresses both at once.


This review was drafted by an AI-assisted tool and
confirmed by an Apache Airflow maintainer. The findings
below are observations, not blockers; an Apache Airflow
maintainer — a real person — will take the next look at the
PR. If you think a finding is mis-applied, please reply on
the PR and a maintainer will weigh in.

More on how Apache Airflow handles maintainer review:
contributing-docs/05_pull_requests.rst.

Comment on lines 1278 to +1284
elif state == CallbackState.FAILED:
callback.state = CallbackState.FAILED
callback.output = str(info) if info else "Execution failed"
cls.logger().error("Callback %s failed: %s", callback_id, callback.output)
session.add(callback)
callback_output = str(info) if info else "Execution failed"
cls.logger().error("Callback %s failed: %s", callback_id, callback_output)
if callback.state == CallbackState.RUNNING:
callback.state = CallbackState.FAILED
callback.output = callback_output
need_to_modify = True
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flagged by the adversarial reviewer (Codex). Cross-checked — the trace holds.

[high] Executor callback failures are ignored while the DB row is still QUEUED

The scheduler now only persists executor FAILED / SUCCESS events when the callback row is already RUNNING. But the worker reports RUNNING through the executor event channel before supervise_callback() calls client.callbacks.start(), and that RUNNING event no longer updates the DB. If the start API call fails before it changes the row to RUNNING, the worker raises and emits a FAILED event while the database row is still QUEUED. This branch then logs the failure but does not save it, leaving the callback in QUEUED forever; _enqueue_executor_callbacks() only selects PENDING callbacks, so the callback is neither retried nor marked failed.

Recommendation: Make the executor-event fallback handle QUEUED as well as RUNNING for terminal FAILED / SUCCESS events, or restore persisting the RUNNING event when the current DB state is QUEUED. Add a regression test where callbacks.start() raises and the scheduler receives a FAILED callback event.

This pairs with my finding #1 above (the dead-code RUNNING branch). Both are symptoms of the same constraint — the fallback assuming the API has already advanced the row to RUNNING. A clean resolution would probably (a) keep the RUNNING branch live and update the DB when DB-state is QUEUED, or (b) loosen the terminal-state branches to accept QUEUED → terminal directly. The forward-only invariant the rest of the PR relies on still holds either way: a callback in a terminal state never regresses; what changes is that QUEUED → terminal becomes a legal one-step jump in the safety net.

Failure modes worth testing for:

  • client.callbacks.start() raises before the route runs (network, DNS, 5xx) — current head leaves the row stuck.
  • Worker process killed (SIGKILL) between the executor emitting RUNNING and supervise_callback() reaching client.callbacks.start() — same stuck-QUEUED outcome.
  • Worker process killed after start() returns but before subprocess exits — the supervisor's finish() in finally runs (good), but a hard kill bypasses finally too. Currently OK because DB is RUNNING and the executor's terminal event drives the fallback, but worth a parametrized test in test_scheduler_job.py's forward-only matrix.

Copy link
Copy Markdown
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes until the adversarial review findings are fixed. It looks pretty plausible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:Scheduler including HA (high availability) scheduler area:task-sdk ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants