Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion airflow-core/src/airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,12 @@ def get_sorted_triggers(
.where(or_(cls.triggerer_id.is_(None), cls.triggerer_id.not_in(alive_triggerer_ids)))
.order_by(coalesce(TaskInstance.priority_weight, 0).desc(), cls.created_date),
# Asset triggers
select(cls.id).where(cls.assets.any()).order_by(cls.created_date),
select(cls.id)
.where(
cls.assets.any(),
or_(cls.triggerer_id.is_(None), cls.triggerer_id.not_in(alive_triggerer_ids)),
)
.order_by(cls.created_date),
]

# Process each query while avoiding unnecessary queries when capacity is reached
Expand Down
99 changes: 99 additions & 0 deletions airflow-core/tests/unit/models/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,3 +919,102 @@ def test_kwargs_not_encrypted():

assert trigger.kwargs["param1"] == "value1"
assert trigger.kwargs["param2"] == "value2"


def test_asset_trigger_unassigned_included(session):
"""Asset triggers with triggerer_id=None are returned."""
asset = AssetModel("test_asset")
trigger = Trigger(classpath="some.trigger", kwargs={})
session.add(trigger)
session.flush() # ensure trigger.id is available
asset.add_trigger(trigger, "test_watcher")
session.add(asset)
session.flush()

alive_ids = [100, 200]
result = Trigger.get_sorted_triggers(
capacity=10,
alive_triggerer_ids=alive_ids,
queues=None,
session=session,
)
ids = [row[0] for row in result]

assert trigger.id in ids


def test_asset_trigger_dead_triggerer_included(session):
"""Asset triggers assigned to a dead triggerer are returned."""
asset = AssetModel("test_asset")
trigger = Trigger(classpath="some.trigger", kwargs={})
trigger.triggerer_id = 999 # dead
session.add(trigger)
session.flush()
asset.add_trigger(trigger, "test_watcher")
session.add(asset)
session.flush()

alive_ids = [100, 200]
result = Trigger.get_sorted_triggers(
capacity=10,
alive_triggerer_ids=alive_ids,
queues=None,
session=session,
)
ids = [row[0] for row in result]

assert trigger.id in ids


def test_asset_trigger_alive_triggerer_excluded(session):
"""Asset triggers assigned to a living triggerer are not returned."""
asset = AssetModel("test_asset")
trigger = Trigger(classpath="some.trigger", kwargs={})
trigger.triggerer_id = 100 # alive
session.add(trigger)
session.flush()
asset.add_trigger(trigger, "test_watcher")
session.add(asset)
session.flush()

alive_ids = [100, 200]
result = Trigger.get_sorted_triggers(
capacity=10,
alive_triggerer_ids=alive_ids,
queues=None,
session=session,
)
ids = [row[0] for row in result]

assert trigger.id not in ids


def test_asset_trigger_ordering_and_capacity(session):
"""Asset triggers are ordered by created_date (oldest first) and respect capacity."""
now = datetime.datetime(2025, 1, 1, tzinfo=timezone.utc)
asset = AssetModel("test_asset")
triggers = []
for i in range(5):
trigger = Trigger(
classpath="some.trigger",
kwargs={},
created_date=now + datetime.timedelta(hours=i),
)
trigger.triggerer_id = None # all unassigned
session.add(trigger)
session.flush()
asset.add_trigger(trigger, f"watcher_{i}")
triggers.append(trigger)
session.add(asset)
session.flush()

result = Trigger.get_sorted_triggers(
capacity=3,
alive_triggerer_ids=[],
queues=None,
session=session,
)
ids = [row[0] for row in result]

# Only the three oldest should be returned, in order
assert ids == [triggers[0].id, triggers[1].id, triggers[2].id]
Loading