Use async DB session for Execution API task-instance heartbeat#67800
Use async DB session for Execution API task-instance heartbeat#67800Dev-iL wants to merge 1 commit into
Conversation
Convert ti_heartbeat from the synchronous SessionDep to the async AsyncSessionDep, adopting the async metadata engine that already ships in Airflow 3.x. The route's behavior is unchanged: same 204/404/409/410 responses, same fast-path UPDATE / SELECT ... FOR UPDATE fallback, same last_heartbeat_at semantics, no version bump. Heartbeat's async writes exposed a test-harness issue on Postgres: the async engine binds its connection pool to the event loop that created it, while the test harness builds a fresh app and loop per test, so a pooled connection from a prior test's closed loop was reused. Add a reconfigure_async_db_engine autouse fixture to the heartbeat tests that rebuilds the async session per test, mirroring the existing workaround in TestWaitDagRun. Document the asyncpg + transaction-mode pgbouncer prepared-statement caveat in the sql_alchemy_connect_args_async config reference and the PGBouncer setup guide; engine-default hardening is deferred.
455e42b to
8d811ea
Compare
|
|
||
| See also :ref:`Helm Chart production guide <production-guide:pgbouncer>` | ||
|
|
||
| Some Airflow database routes use an async engine (the Execution API, for example). The asyncpg |
There was a problem hiding this comment.
I think instead of just noting this (which will get ignored/missed by many users) we should change the default async to psycopg3. The only reason we had it set up as asyncpg was because we were stuck on SQLA1.4, but you've already fixed that :)
There was a problem hiding this comment.
Do we not want to keep asyncpg for the better performance? I did a benchmark recently and this is still the fastest driver.
|
|
||
| # ti_heartbeat runs on the async engine. The async engine binds its pool to | ||
| # the event loop that created it (once per process), but the test harness | ||
| # builds a fresh FastAPI app and event loop per test, so a pooled connection |
There was a problem hiding this comment.
I wonder what impact re-creating the connection many times will have on test durations? (Not much for this case, but as we move more things to async?)
Should we look at changing the default event-loop scoping maybe? That also has draw backs for sure.
I think the agent made these up :) |
Related:
What & why
Converts the Execution API
ti_heartbeatroute from the synchronousSessionDepto the asyncAsyncSessionDep, adopting the async metadata engine that already ships in Airflow 3.x (create_async_engine,async_sessionmaker,AsyncSessionDep,create_session_async).Heartbeat is the highest-QPS write path in the system (one call per running task per heartbeat interval, per worker), so it is a meaningful first production route to run on the async engine. The goal of this PR is behavioral parity with zero regression — not a throughput change. It follows the same low-blast-radius conversion pattern already merged for
GET /execution/variables/keys.What changed
execution_api/routes/task_instances.py):ti_heartbeatis nowasync defand takesAsyncSessionDep; the fast-pathUPDATE, the slow-pathSELECT … FOR UPDATE(.one()), the Task-Instance-History existencescalar, and the finalUPDATEare awaited. No explicitcommit()is added — the async dependency commits on success and rolls back (releasing theFOR UPDATErow lock) on exception, mirroring the sync dependency exactly.synchronize_session=Falseandwith_for_update()are unchanged.versions/head/test_task_instances.py):UPDATEnow patchsqlalchemy.ext.asyncio.AsyncSessionwith an async interceptor (the route nowawaitsAsyncSession.execute); assertions are unchanged.reconfigure_async_db_engineautouse fixture toTestTIHealthEndpoint. The async engine binds its connection pool to the event loop that created it, while the test harness builds a fresh app and event loop per test; without this, a pooled connection from a prior test's closed loop is reused and fails (attached to a different loop). This is the same workaround already used byTestWaitDagRun.sql_alchemy_connect_args_asyncconfig reference and in the PgBouncer section of the database setup guide.max_connectionsfor both.Behavioral parity
No change to the endpoint contract: same
204success, same404/409(running-elsewhere / not-running) /410(cleared-and-archived) responses with identical detail payloads, same request signature. No Execution API version bump. The pre-existing heartbeat tests pass unchanged — that byte-identical pass is the parity proof.Testing
Heartbeat suite (
-k heartbeat, 12 tests covering success, fast path, slow-path fallback,404,409,410, and therowcount == 0/ unknown-rowcountfall-through branches) passes on all three supported backends:ruffandmypy(airflow-core) are clean.Operational notes
max_connectionsaccordingly (called out in the newsfragment).Note to reviewers
A cosmetic
Event loop is closedmessage is logged during session teardown when the async engine is disposed (dispose_ormclosing the async engine synchronously). It appears on all three async drivers, is pre-existing engine disposal behavior unrelated to this change, and every run exits0. Tidying async-engine disposal is out of scope for this PR.Engine-default hardening for asyncpg/PgBouncer will be tracked in a dedicated issue to be opened soon.
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Opus 4.8 following the guidelines
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.