diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 153b5a7c09c10..a5d750b6d7a1c 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -459,6 +459,14 @@ def ti_update_state( extra=json.dumps({"host_name": hostname}) if hostname else None, ) ) + # Commit the TI state update now to release the task_instance row lock before + # running asset-event queries. The direct-INSERT fix in AssetManager removes + # the O(n) lazy-load on the alias-event table, but register_asset_changes_in_db + # also queries scheduled dags and inserts AssetDagRunQueue rows — all of which + # would otherwise hold the row lock and cause idle-in-transaction pile-up that + # exhausts API server memory and triggers OOMKill under high concurrency. + # The task outcome is durable from this point on. + session.commit() except SQLAlchemyError as e: log.error("Error updating Task Instance state", error=str(e)) raise HTTPException( @@ -490,6 +498,27 @@ def ti_update_state( task_id=task_id, ) + # Asset registration runs outside the TI row lock. The exception is intentionally + # swallowed after logging: the TI state is already committed above, so raising HTTP 500 + # here would be misleading (the task did succeed) and would cause the task-SDK worker + # to retry a state update for a task that has already completed. + if isinstance(ti_patch_payload, TISuccessStatePayload) and ti_patch_payload.task_outlets: + try: + ti_for_assets = session.get(TI, task_instance_id) + if ti_for_assets is not None: + TI.register_asset_changes_in_db( + ti_for_assets, + ti_patch_payload.task_outlets, + ti_patch_payload.outlet_events, + session, + ) + except Exception: + log.exception( + "Failed to register asset changes; task state is already committed", + task_instance_id=str(task_instance_id), + new_state=updated_state, + ) + def _emit_task_span(ti, state): # just to be safe @@ -578,13 +607,7 @@ def _create_ti_state_update_query_and_update_state( retry_reason=(ti_patch_payload.retry_reason[:500] if ti_patch_payload.retry_reason else None), ) elif isinstance(ti_patch_payload, TISuccessStatePayload): - if ti is not None: - TI.register_asset_changes_in_db( - ti, - ti_patch_payload.task_outlets, - ti_patch_payload.outlet_events, - session, - ) + pass # Asset registration happens after the TI state is committed; see ti_update_state. _emit_task_span(ti, state=updated_state) elif isinstance(ti_patch_payload, TIDeferredStatePayload): # Calculate timeout if it was passed diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index 9f333efe8510b..ed185ef48b3c4 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -22,7 +22,7 @@ from typing import TYPE_CHECKING import structlog -from sqlalchemy import exc, or_, select +from sqlalchemy import exc, insert, or_, select from sqlalchemy.orm import joinedload from airflow._shared.observability.metrics import stats @@ -40,6 +40,7 @@ DagScheduleAssetReference, DagScheduleAssetUriReference, PartitionedAssetKeyLog, + asset_alias_asset_event_association_table, ) from airflow.models.log import Log from airflow.utils.helpers import is_container @@ -323,8 +324,17 @@ def register_asset_change( ).unique() for asset_alias_model in asset_alias_models: - asset_alias_model.asset_events.append(asset_event) - session.add(asset_alias_model) + # Use a direct INSERT rather than ORM .append() to avoid lazy-loading the + # entire asset_events collection. On long-running deployments that collection + # can contain thousands of rows, and loading it while the task_instance row + # lock is held (from the calling ti_update_state handler) causes the DB + # connection to sit idle-in-transaction for minutes, blocking other workers. + session.execute( + insert(asset_alias_asset_event_association_table).values( + alias_id=asset_alias_model.id, + event_id=asset_event.id, + ) + ) dags_to_queue_from_asset_alias |= { alias_ref.dag diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 6f19cff9389f2..167ec51c16020 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1368,6 +1368,54 @@ def test_ti_update_state_running_errors(self, client, session, create_task_insta assert response.status_code == 422 + def test_ti_update_state_to_success_asset_registration_failure_returns_204( + self, client, session, create_task_instance + ): + """Regression: asset registration failure after TI state commit must return 204, not 500. + + The TI state is committed (and the row lock released) before asset registration runs. + If registration fails at that point, the task outcome is already durable as SUCCESS, + so surfacing HTTP 500 would be misleading and cause unnecessary worker retries. + """ + asset = AssetModel( + id=42, + name="fail-asset", + uri="s3://bucket/fail-asset", + group="asset", + extra={}, + ) + asset_active = AssetActive.for_asset(asset) + session.add_all([asset, asset_active]) + + ti = create_task_instance( + task_id="test_asset_reg_failure", + start_date=DEFAULT_START_DATE, + state=State.RUNNING, + ) + session.commit() + + with mock.patch( + "airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db", + side_effect=Exception("simulated DB explosion during asset registration"), + ): + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": "success", + "end_date": DEFAULT_END_DATE.isoformat(), + "task_outlets": [ + {"name": "fail-asset", "uri": "s3://bucket/fail-asset", "type": "Asset"} + ], + "outlet_events": [], + }, + ) + + assert response.status_code == 204, f"Expected 204, got {response.status_code}: {response.text}" + session.expire_all() + ti_db = session.get(TaskInstance, ti.id) + assert ti_db is not None + assert ti_db.state == TaskInstanceState.SUCCESS + def test_ti_update_state_database_error(self, client, session, create_task_instance): """ Test that a database error is handled correctly when updating the Task Instance state. diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 1f3eead3dc2eb..29d39f785b70e 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -25,7 +25,7 @@ from unittest import mock import pytest -from sqlalchemy import delete, func, select +from sqlalchemy import delete, func, insert, select from sqlalchemy.orm import Session from airflow import settings @@ -38,6 +38,7 @@ AssetPartitionDagRun, DagScheduleAssetAliasReference, DagScheduleAssetReference, + asset_alias_asset_event_association_table, ) from airflow.models.dag import DAG, DagModel from airflow.sdk.definitions.asset import Asset @@ -162,6 +163,88 @@ def test_register_asset_change_with_alias( ) assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2 + def test_register_asset_change_with_alias_no_lazy_load( + self, session, mock_task_instance, testing_dag_bundle + ): + """Regression: alias-event association must use a direct INSERT, not ORM .append(). + + ORM .append() lazy-loads the entire asset_events collection before writing. + On long-running deployments with thousands of past events, this query runs + while the task_instance row lock is held in ti_update_state, causing idle-in-transaction + pile-up that exhausts API server memory and triggers OOMKill. + """ + asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset", group="asset") + session.add(asm) + asam = AssetAliasModel(name="test_nolazy_alias", group="test") + session.add(asam) + session.flush() + + # Pre-populate existing alias-event rows to simulate a long-running deployment. + # If .append() is used, SQLAlchemy will lazy-load ALL of these before inserting the new one. + existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in range(5)] + session.add_all(existing_events) + session.flush() + for ev in existing_events: + session.execute( + insert(asset_alias_asset_event_association_table).values(alias_id=asam.id, event_id=ev.id) + ) + session.flush() + + # Expire the alias so a lazy-load would have to hit the DB (no in-memory cache). + session.expire(asam) + + asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset") + asset_manager = AssetManager() + + lazy_load_selects: list[str] = [] + real_execute = session.execute + + def tracking_execute(stmt, *args, **kwargs): + try: + compiled = str(stmt.compile(compile_kwargs={"literal_binds": True})) + except Exception: + compiled = str(stmt) + # Detect a lazy-load SELECT joining asset_alias_asset_event with asset_event + if ( + "asset_alias_asset_event" in compiled.lower() + and "asset_event" in compiled.lower() + and compiled.strip().upper().startswith("SELECT") + ): + lazy_load_selects.append(compiled[:120]) + return real_execute(stmt, *args, **kwargs) + + with mock.patch.object(session, "execute", side_effect=tracking_execute): + asset_manager.register_asset_change( + task_instance=mock_task_instance, + asset=asset, + source_alias_names=["test_nolazy_alias"], + session=session, + ) + session.flush() + + # The new association row must exist + new_events = session.scalars( + select(AssetEvent).where( + AssetEvent.asset_id == asm.id, + AssetEvent.id.notin_([ev.id for ev in existing_events]), + ) + ).all() + assert len(new_events) == 1, "Expected exactly one new AssetEvent" + + row_count = session.scalar( + select(func.count()) + .select_from(asset_alias_asset_event_association_table) + .where( + asset_alias_asset_event_association_table.c.alias_id == asam.id, + asset_alias_asset_event_association_table.c.event_id == new_events[0].id, + ) + ) + assert row_count == 1, "Expected the alias-event association row to be written" + + assert lazy_load_selects == [], ( + f"Unexpected lazy-load SELECT on asset_alias_asset_event: {lazy_load_selects}" + ) + def test_register_asset_change_no_downstreams(self, session, mock_task_instance): asset_manager = AssetManager()