Fix in-process execution API startup for triggerer#65993
Conversation
| try: | ||
| asyncio.run_coroutine_threadsafe(start_lifespan(self._cm, self.app), middleware.loop).result( | ||
| timeout=30 | ||
| ) | ||
| except TimeoutError as err: | ||
| raise RuntimeError("Timed out while starting the in-process execution API lifespan") from err |
There was a problem hiding this comment.
This fix improves observability (loud failure instead of silent hang) but does not actually resolve #65945. The reporter's stack trace shows their custom SlurmSharedFileTaskHandler.format crashing inside svcs.register_factory's log.debug() call -- the lifespan startup raises because of a buggy user log handler. After this PR merges they'll see a clear RuntimeError propagated from transport, but the triggerer will still fail to start; the user-side handler bug is the actual cause.
Two follow-ups worth considering: (a) call this out explicitly in the PR description so the reporter understands what behavior to expect post-merge; (b) think about whether the log.debug call inside the svcs/lifespan setup should be made more defensive (or routed through a logger that bypasses the user task handler) so a single broken user handler doesn't take down triggerer startup.
| asyncio.run_coroutine_threadsafe(start_lifespan(self._cm, self.app), middleware.loop) | ||
| try: | ||
| asyncio.run_coroutine_threadsafe(start_lifespan(self._cm, self.app), middleware.loop).result( | ||
| timeout=30 |
There was a problem hiding this comment.
Hardcoded magic number. Lift to a module-level constant (e.g. _LIFESPAN_STARTUP_TIMEOUT_SECONDS = 30) so it's discoverable and tunable, and include the value in the RuntimeError message on line 367 so operators have a hint when a slow real-world startup gets clipped (f"Timed out after {_LIFESPAN_STARTUP_TIMEOUT_SECONDS}s ...").
Has 30s been sanity-checked against the slowest realistic deployment (cold filesystems, large dag_bag warmup, Postgres connection setup on a sleepy network)? If not, it might be worth bumping it or making it configurable.
|
|
||
| import json | ||
| import time | ||
| from concurrent.futures import TimeoutError |
There was a problem hiding this comment.
This shadows the builtin TimeoutError. In Python 3.11+ they're the same class, but Airflow still supports 3.10 where concurrent.futures.TimeoutError and the builtin are distinct. Anywhere later in this module that does except TimeoutError will now silently mean concurrent.futures.TimeoutError.
Prefer from concurrent.futures import TimeoutError as FutureTimeoutError, or move the import inside transport() since that's the only use site.
| @@ -358,7 +359,12 @@ async def start_lifespan(cm: AsyncExitStack, app: FastAPI): | |||
|
|
|||
| self._cm = AsyncExitStack() | |||
There was a problem hiding this comment.
Pre-existing, but visible here: self._cm is set up to register the lifespan's exit handlers, yet I don't see anywhere in InProcessExecutionAPI that calls self._cm.aclose(). That means the lifespan's shutdown code (svcs registry teardown, anything yielded back from registered factories) never runs.
Since you're already touching this block, worth confirming whether the missing teardown is intentional (process exits anyway) or a separate latent bug worth filing.
| api._app = FastAPI(lifespan=lifespan) | ||
|
|
||
| api.transport | ||
|
|
There was a problem hiding this comment.
Stylistic nit: accessing a property purely for its side effect reads like a typo to a future reader. Prefer _ = api.transport or assert api.transport is not None to make the intent explicit.
| def test_transport_waits_for_lifespan_startup(self): | ||
| entered_lifespan = threading.Event() | ||
|
|
||
| @asynccontextmanager | ||
| async def lifespan(app): | ||
| await asyncio.sleep(0.05) | ||
| app.state.lifespan_called = True | ||
| entered_lifespan.set() | ||
| yield | ||
|
|
||
| api = InProcessExecutionAPI() | ||
| api._app = FastAPI(lifespan=lifespan) | ||
|
|
||
| api.transport | ||
|
|
||
| assert entered_lifespan.is_set() | ||
| assert api.app.state.lifespan_called | ||
|
|
||
| def test_transport_surfaces_lifespan_startup_errors(self): | ||
| @asynccontextmanager | ||
| async def lifespan(app): | ||
| raise RuntimeError("lifespan failed") | ||
| yield | ||
|
|
||
| api = InProcessExecutionAPI() | ||
| api._app = FastAPI(lifespan=lifespan) | ||
|
|
||
| with pytest.raises(RuntimeError, match="lifespan failed"): | ||
| api.transport | ||
|
|
There was a problem hiding this comment.
Both tests bypass api.app (the real cached_property that wires up dependency_overrides, dag_bag, and the actual Execution API routes at app.py:325-342) by assigning api._app = FastAPI(lifespan=lifespan) directly. That isolates the timeout/exception logic cleanly, which is fine for a unit test, but it doesn't exercise the real Execution API lifespan -- which is the thing that actually broke in #65945.
Consider also adding an integration-style test that uses the real api.app and asserts startup completes, plus one that injects a slow lifespan to drive the 30s timeout path end-to-end.
|
@shaealh — There are 6 unresolved review thread(s) on this PR from @kaxil. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? Once you believe the feedback is addressed, mark the thread as resolved so the reviewer isn't re-pinged needlessly. Thanks! Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting |
|
@shaealh — There are 6 unresolved review thread(s) on this PR from @kaxil. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? Once you believe the feedback is addressed, mark the thread as resolved so the reviewer isn't re-pinged needlessly. Thanks! Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
Fixes #65945
The triggerer uses the in-process Execution API through
a2wsgi, but the app lifespan startup was scheduled on the background loop without waiting for it to complete. That could let triggerer startup continue while the Execution API was not fully initialized, and any startup failure could be hidden until later request handling.This change waits for the in-process Execution API lifespan to complete before returning the transport, and surfaces startup timeout/failure immediately.
Tests:
uv run ruff check airflow-core/src/airflow/api_fastapi/execution_api/app.py airflow-core/tests/unit/api_fastapi/execution_api/test_app.pyAIRFLOW_HOME=/tmp/airflow-65945-test uv run pytest airflow-core/tests/unit/api_fastapi/execution_api/test_app.py -q --with-db-init