Under which category would you file this issue?
Airflow Core
Apache Airflow version
3.2.1
What happened and how to reproduce it?
After upgrading 3.1.8 → 3.2.1 we started seeing PostgreSQL deadlocks on task_instance: 0 in the 30 days before the upgrade, 2 in the next 7 hours after. The pattern is the same cross-index row-lock contention reported in #65818, but between a different pair of writers, and #65920 / #65836 do not touch it. It occurs on a setup with a single scheduler, single triggerer, and single api-server colocated in one ECS task, so HA topology is not required.
Setup is Airflow 3.1.8 (no issue) → 3.2.1 (issue starts), Aurora PostgreSQL 15.10, CeleryExecutor. The workload includes one DAG scheduled every minute (* * * * *); both observed deadlocks involved that DAG, where new dag_runs are constantly being created while task instances of older runs are still completing.
The regression is in _verify_integrity_if_dag_changed (airflow/jobs/scheduler_job_runner.py). In 3.1.8 it iterated dag_run.task_instances in Python and assigned ti.dag_version = latest_dag_version for unfinished TIs, so SQLAlchemy emitted per-row UPDATEs that locked by PK. In 3.2.1 the same logic became a single bulk UPDATE that filters by state IN (State.unfinished) and reaches rows by (dag_id, run_id).
session.execute(
update(TI)
.where(
TI.dag_id == dag_run.dag_id,
TI.run_id == dag_run.run_id,
TI.state.in_(State.unfinished),
)
.values(dag_version_id=latest_dag_version.id),
execution_options={"synchronize_session": False},
)
The api-server side is ti_update_state in airflow/api_fastapi/execution_api/routes/task_instances.py. It takes SELECT FOR UPDATE on the target row by primary key twice: once at the top of the function (a joined select with .with_for_update(of=TI)), and again inside _create_ti_state_update_query_and_update_state via session.get(TI, task_instance_id, with_for_update=True). So the api-server locks its target row by the PK index while the scheduler bulk UPDATE reaches its rows via the ti_dag_run (dag_id, run_id) index rather than the PK. When their row sets overlap, the two paths acquire row locks in different orders and PG detects an A→B / B→A deadlock.
Here is the api-server traceback. The deadlock surfaces on the second SELECT FOR UPDATE. We did not have pg_locks enabled at the time, so we cannot say with certainty which statement PG picked as the victim.
Traceback (most recent call last):
File ".../airflow/api_fastapi/execution_api/routes/task_instances.py", line 390, in ti_update_state
File ".../airflow/api_fastapi/execution_api/routes/task_instances.py", line 507, in _create_ti_state_update_query_and_update_state
File ".../sqlalchemy/orm/session.py", line 3680, in get
...
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
[parameters: {'pk_1': UUID('...')}]
PG monitoring shows Lock:transactionid waits during these events, and the top SQL in those windows matches the 3.2.1 bulk UPDATE: UPDATE task_instance SET updated_at=?, dag_version_id=? WHERE dag_id=? AND run_id=? AND state IN (?, ?, ?, ?, ?, ?, ?, ?) (8 placeholders = State.unfinished; updated_at is added by SQLAlchemy's onupdate for the column, the source only sets dag_version_id).
The trigger condition for the bulk UPDATE is not dag_run.bundle_version and not check_version_id_exists_in_dr(latest_dag_version.id), so this path runs for any dag_run that lacks a bundle_version and has not yet been tagged with the latest dag_version_id. It covers both the upgrade-time wave and ongoing operation.
There is no user-visible impact today. The Task SDK retries PATCH /state in a fresh session about one second later and the task is recorded as success. But the api-server logs ERROR - Error updating Task Instance state. Setting the task to failed. along with a 500 traceback, which is misleading. The error handler in ti_update_state looks like this.
except Exception:
log.exception("Error updating Task Instance state. Setting the task to failed.", ...)
ti = session.get(TI, task_instance_id, with_for_update=True) # SELECT FOR UPDATE on the aborted session
...
query = query.values(state=(updated_state := TaskInstanceState.FAILED))
# outer try:
try:
result = session.execute(query) # also fails on the aborted session
By the time this runs the session is already in InFailedSqlTransaction, so the recovery session.get(...) fails first, the follow-up session.execute(query) fails too, and the state=failed write never reaches the database. Operators see "failed" in the logs and a 500 response for a task that did not in fact fail. The Task SDK retry path is what currently keeps this from becoming real data corruption, but if that path ever stops compensating, the same bug starts recording successful tasks as failed.
What you think should happen instead?
_verify_integrity_if_dag_changed should adopt the same locking discipline as #65920 / #65836: SELECT id ORDER BY id LIMIT N FOR UPDATE SKIP LOCKED, then UPDATE WHERE id IN (...), in bounded batches, so both writers lock in PK order.
The error handler in ti_update_state is harder to defend. Treating OperationalError with PG SQLSTATE 40P01 / 40001 (or MySQL 1213) as retryable and returning an explicit retryable response with Retry-After (e.g., 503 Service Unavailable) would make the Task SDK retry a real contract instead of relying on the 500 response being retried. If a non-retryable failure write is still wanted as a fallback, it needs to run in a fresh session, since the one the handler inherits is already aborted.
Operating System
Debian 12 (apache/airflow:slim-3.2.1-python3.12 image)
Deployment
Other Docker-based deployment
Apache Airflow Provider(s)
No response
Versions of Apache Airflow Providers
No response
Official Helm Chart version
Not Applicable
Kubernetes Version
No response
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
Related: #65818, #65920, #65836.
The traceback and SQL evidence above are from a running Airflow 3.2.1 deployment on Amazon ECS. Let us know if more diagnostic data would help with investigation.
Drafted with AI assistance (Claude Opus 4.7 with 1M context); content verified against Airflow 3.1.8 and 3.2.1 sources before submission.
Are you willing to submit PR?
Code of Conduct
Under which category would you file this issue?
Airflow Core
Apache Airflow version
3.2.1
What happened and how to reproduce it?
After upgrading 3.1.8 → 3.2.1 we started seeing PostgreSQL deadlocks on task_instance: 0 in the 30 days before the upgrade, 2 in the next 7 hours after. The pattern is the same cross-index row-lock contention reported in #65818, but between a different pair of writers, and #65920 / #65836 do not touch it. It occurs on a setup with a single scheduler, single triggerer, and single api-server colocated in one ECS task, so HA topology is not required.
Setup is Airflow 3.1.8 (no issue) → 3.2.1 (issue starts), Aurora PostgreSQL 15.10, CeleryExecutor. The workload includes one DAG scheduled every minute (
* * * * *); both observed deadlocks involved that DAG, where new dag_runs are constantly being created while task instances of older runs are still completing.The regression is in
_verify_integrity_if_dag_changed(airflow/jobs/scheduler_job_runner.py). In 3.1.8 it iterateddag_run.task_instancesin Python and assignedti.dag_version = latest_dag_versionfor unfinished TIs, so SQLAlchemy emitted per-row UPDATEs that locked by PK. In 3.2.1 the same logic became a single bulk UPDATE that filters bystate IN (State.unfinished)and reaches rows by (dag_id, run_id).The api-server side is
ti_update_stateinairflow/api_fastapi/execution_api/routes/task_instances.py. It takes SELECT FOR UPDATE on the target row by primary key twice: once at the top of the function (a joined select with.with_for_update(of=TI)), and again inside_create_ti_state_update_query_and_update_stateviasession.get(TI, task_instance_id, with_for_update=True). So the api-server locks its target row by the PK index while the scheduler bulk UPDATE reaches its rows via theti_dag_run(dag_id, run_id) index rather than the PK. When their row sets overlap, the two paths acquire row locks in different orders and PG detects an A→B / B→A deadlock.Here is the api-server traceback. The deadlock surfaces on the second SELECT FOR UPDATE. We did not have pg_locks enabled at the time, so we cannot say with certainty which statement PG picked as the victim.
PG monitoring shows Lock:transactionid waits during these events, and the top SQL in those windows matches the 3.2.1 bulk UPDATE:
UPDATE task_instance SET updated_at=?, dag_version_id=? WHERE dag_id=? AND run_id=? AND state IN (?, ?, ?, ?, ?, ?, ?, ?)(8 placeholders = State.unfinished;updated_atis added by SQLAlchemy'sonupdatefor the column, the source only setsdag_version_id).The trigger condition for the bulk UPDATE is
not dag_run.bundle_version and not check_version_id_exists_in_dr(latest_dag_version.id), so this path runs for any dag_run that lacks a bundle_version and has not yet been tagged with the latest dag_version_id. It covers both the upgrade-time wave and ongoing operation.There is no user-visible impact today. The Task SDK retries PATCH /state in a fresh session about one second later and the task is recorded as success. But the api-server logs
ERROR - Error updating Task Instance state. Setting the task to failed.along with a 500 traceback, which is misleading. The error handler inti_update_statelooks like this.By the time this runs the session is already in
InFailedSqlTransaction, so the recoverysession.get(...)fails first, the follow-upsession.execute(query)fails too, and thestate=failedwrite never reaches the database. Operators see "failed" in the logs and a 500 response for a task that did not in fact fail. The Task SDK retry path is what currently keeps this from becoming real data corruption, but if that path ever stops compensating, the same bug starts recording successful tasks as failed.What you think should happen instead?
_verify_integrity_if_dag_changedshould adopt the same locking discipline as #65920 / #65836: SELECT id ORDER BY id LIMIT N FOR UPDATE SKIP LOCKED, then UPDATE WHERE id IN (...), in bounded batches, so both writers lock in PK order.The error handler in
ti_update_stateis harder to defend. Treating OperationalError with PG SQLSTATE 40P01 / 40001 (or MySQL 1213) as retryable and returning an explicit retryable response with Retry-After (e.g., 503 Service Unavailable) would make the Task SDK retry a real contract instead of relying on the 500 response being retried. If a non-retryable failure write is still wanted as a fallback, it needs to run in a fresh session, since the one the handler inherits is already aborted.Operating System
Debian 12 (apache/airflow:slim-3.2.1-python3.12 image)
Deployment
Other Docker-based deployment
Apache Airflow Provider(s)
No response
Versions of Apache Airflow Providers
No response
Official Helm Chart version
Not Applicable
Kubernetes Version
No response
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
Related: #65818, #65920, #65836.
The traceback and SQL evidence above are from a running Airflow 3.2.1 deployment on Amazon ECS. Let us know if more diagnostic data would help with investigation.
Drafted with AI assistance (Claude Opus 4.7 with 1M context); content verified against Airflow 3.1.8 and 3.2.1 sources before submission.
Are you willing to submit PR?
Code of Conduct