Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,14 @@ def ti_update_state(
extra=json.dumps({"host_name": hostname}) if hostname else None,
)
)
# Commit the TI state update now to release the task_instance row lock before
# running asset-event queries. The direct-INSERT fix in AssetManager removes
# the O(n) lazy-load on the alias-event table, but register_asset_changes_in_db
# also queries scheduled dags and inserts AssetDagRunQueue rows — all of which
# would otherwise hold the row lock and cause idle-in-transaction pile-up that
# exhausts API server memory and triggers OOMKill under high concurrency.
# The task outcome is durable from this point on.
session.commit()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed after changes in the alias side? I'm asking because of the silent dropping here https://github.com/apache/airflow/pull/66854/changes#diff-680cf6d70e96761db3869142642ba1df32e5c2c5c8d1b9ab65d0200dca4718daR500. or could we have better handling there?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The early session.commit() is still needed even with the direct-INSERT fix in manager.py. The manager.py change only eliminates the O(n) lazy-load SELECT on the alias-event table. register_asset_changes_in_db also queries scheduled dags and inserts AssetDagRunQueue rows, all of which would otherwise hold the row lock and cause the same idle-in-transaction pile-up.

For the silent-drop concern: swallowing the exception here is intentional. By the time register_asset_changes_in_db runs, the TI state update is already committed and durable. Returning HTTP 500 at this point would cause the task-SDK worker to retry a state update for a task that has already successfully completed, which is worse than a silent failure. I've improved the comment on both blocks to make this design intent explicit.

except SQLAlchemyError as e:
log.error("Error updating Task Instance state", error=str(e))
raise HTTPException(
Expand Down Expand Up @@ -490,6 +498,27 @@ def ti_update_state(
task_id=task_id,
)

# Asset registration runs outside the TI row lock. The exception is intentionally
# swallowed after logging: the TI state is already committed above, so raising HTTP 500
# here would be misleading (the task did succeed) and would cause the task-SDK worker
# to retry a state update for a task that has already completed.
if isinstance(ti_patch_payload, TISuccessStatePayload) and ti_patch_payload.task_outlets:
try:
ti_for_assets = session.get(TI, task_instance_id)
if ti_for_assets is not None:
TI.register_asset_changes_in_db(
ti_for_assets,
ti_patch_payload.task_outlets,
ti_patch_payload.outlet_events,
session,
)
except Exception:
log.exception(
"Failed to register asset changes; task state is already committed",
task_instance_id=str(task_instance_id),
new_state=updated_state,
)


def _emit_task_span(ti, state):
# just to be safe
Expand Down Expand Up @@ -578,13 +607,7 @@ def _create_ti_state_update_query_and_update_state(
retry_reason=(ti_patch_payload.retry_reason[:500] if ti_patch_payload.retry_reason else None),
)
elif isinstance(ti_patch_payload, TISuccessStatePayload):
if ti is not None:
TI.register_asset_changes_in_db(
ti,
ti_patch_payload.task_outlets,
ti_patch_payload.outlet_events,
session,
)
pass # Asset registration happens after the TI state is committed; see ti_update_state.
_emit_task_span(ti, state=updated_state)
elif isinstance(ti_patch_payload, TIDeferredStatePayload):
# Calculate timeout if it was passed
Expand Down
16 changes: 13 additions & 3 deletions airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import TYPE_CHECKING

import structlog
from sqlalchemy import exc, or_, select
from sqlalchemy import exc, insert, or_, select
from sqlalchemy.orm import joinedload

from airflow._shared.observability.metrics import stats
Expand All @@ -40,6 +40,7 @@
DagScheduleAssetReference,
DagScheduleAssetUriReference,
PartitionedAssetKeyLog,
asset_alias_asset_event_association_table,
)
from airflow.models.log import Log
from airflow.utils.helpers import is_container
Expand Down Expand Up @@ -323,8 +324,17 @@ def register_asset_change(
).unique()

for asset_alias_model in asset_alias_models:
asset_alias_model.asset_events.append(asset_event)
session.add(asset_alias_model)
# Use a direct INSERT rather than ORM .append() to avoid lazy-loading the
# entire asset_events collection. On long-running deployments that collection
# can contain thousands of rows, and loading it while the task_instance row
# lock is held (from the calling ti_update_state handler) causes the DB
# connection to sit idle-in-transaction for minutes, blocking other workers.
session.execute(
insert(asset_alias_asset_event_association_table).values(
alias_id=asset_alias_model.id,
event_id=asset_event.id,
)
)

dags_to_queue_from_asset_alias |= {
alias_ref.dag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,54 @@ def test_ti_update_state_running_errors(self, client, session, create_task_insta

assert response.status_code == 422

def test_ti_update_state_to_success_asset_registration_failure_returns_204(
self, client, session, create_task_instance
):
"""Regression: asset registration failure after TI state commit must return 204, not 500.

The TI state is committed (and the row lock released) before asset registration runs.
If registration fails at that point, the task outcome is already durable as SUCCESS,
so surfacing HTTP 500 would be misleading and cause unnecessary worker retries.
"""
asset = AssetModel(
id=42,
name="fail-asset",
uri="s3://bucket/fail-asset",
group="asset",
extra={},
)
asset_active = AssetActive.for_asset(asset)
session.add_all([asset, asset_active])

ti = create_task_instance(
task_id="test_asset_reg_failure",
start_date=DEFAULT_START_DATE,
state=State.RUNNING,
)
session.commit()

with mock.patch(
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
side_effect=Exception("simulated DB explosion during asset registration"),
):
response = client.patch(
f"/execution/task-instances/{ti.id}/state",
json={
"state": "success",
"end_date": DEFAULT_END_DATE.isoformat(),
"task_outlets": [
{"name": "fail-asset", "uri": "s3://bucket/fail-asset", "type": "Asset"}
],
"outlet_events": [],
},
)

assert response.status_code == 204, f"Expected 204, got {response.status_code}: {response.text}"
session.expire_all()
ti_db = session.get(TaskInstance, ti.id)
assert ti_db is not None
assert ti_db.state == TaskInstanceState.SUCCESS

def test_ti_update_state_database_error(self, client, session, create_task_instance):
"""
Test that a database error is handled correctly when updating the Task Instance state.
Expand Down
85 changes: 84 additions & 1 deletion airflow-core/tests/unit/assets/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from unittest import mock

import pytest
from sqlalchemy import delete, func, select
from sqlalchemy import delete, func, insert, select
from sqlalchemy.orm import Session

from airflow import settings
Expand All @@ -38,6 +38,7 @@
AssetPartitionDagRun,
DagScheduleAssetAliasReference,
DagScheduleAssetReference,
asset_alias_asset_event_association_table,
)
from airflow.models.dag import DAG, DagModel
from airflow.sdk.definitions.asset import Asset
Expand Down Expand Up @@ -162,6 +163,88 @@ def test_register_asset_change_with_alias(
)
assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2

def test_register_asset_change_with_alias_no_lazy_load(
self, session, mock_task_instance, testing_dag_bundle
):
"""Regression: alias-event association must use a direct INSERT, not ORM .append().

ORM .append() lazy-loads the entire asset_events collection before writing.
On long-running deployments with thousands of past events, this query runs
while the task_instance row lock is held in ti_update_state, causing idle-in-transaction
pile-up that exhausts API server memory and triggers OOMKill.
"""
asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset", group="asset")
session.add(asm)
asam = AssetAliasModel(name="test_nolazy_alias", group="test")
session.add(asam)
session.flush()

# Pre-populate existing alias-event rows to simulate a long-running deployment.
# If .append() is used, SQLAlchemy will lazy-load ALL of these before inserting the new one.
existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in range(5)]
session.add_all(existing_events)
session.flush()
for ev in existing_events:
session.execute(
insert(asset_alias_asset_event_association_table).values(alias_id=asam.id, event_id=ev.id)
)
session.flush()

# Expire the alias so a lazy-load would have to hit the DB (no in-memory cache).
session.expire(asam)

asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset")
asset_manager = AssetManager()

lazy_load_selects: list[str] = []
real_execute = session.execute

def tracking_execute(stmt, *args, **kwargs):
try:
compiled = str(stmt.compile(compile_kwargs={"literal_binds": True}))
except Exception:
compiled = str(stmt)
# Detect a lazy-load SELECT joining asset_alias_asset_event with asset_event
if (
"asset_alias_asset_event" in compiled.lower()
and "asset_event" in compiled.lower()
and compiled.strip().upper().startswith("SELECT")
):
lazy_load_selects.append(compiled[:120])
return real_execute(stmt, *args, **kwargs)

with mock.patch.object(session, "execute", side_effect=tracking_execute):
asset_manager.register_asset_change(
task_instance=mock_task_instance,
asset=asset,
source_alias_names=["test_nolazy_alias"],
session=session,
)
session.flush()

# The new association row must exist
new_events = session.scalars(
select(AssetEvent).where(
AssetEvent.asset_id == asm.id,
AssetEvent.id.notin_([ev.id for ev in existing_events]),
)
).all()
assert len(new_events) == 1, "Expected exactly one new AssetEvent"

row_count = session.scalar(
select(func.count())
.select_from(asset_alias_asset_event_association_table)
.where(
asset_alias_asset_event_association_table.c.alias_id == asam.id,
asset_alias_asset_event_association_table.c.event_id == new_events[0].id,
)
)
assert row_count == 1, "Expected the alias-event association row to be written"

assert lazy_load_selects == [], (
f"Unexpected lazy-load SELECT on asset_alias_asset_event: {lazy_load_selects}"
)

def test_register_asset_change_no_downstreams(self, session, mock_task_instance):
asset_manager = AssetManager()

Expand Down
Loading