Skip to content

Commit

Permalink
Improve trigger assign_unassigned by merging alive_triggerer_ids and …
Browse files Browse the repository at this point in the history
…get_sorted_triggers queries (apache#38664)
  • Loading branch information
hussein-awala authored and idantepper@gmail.com committed Apr 3, 2024
1 parent d0a94a6 commit 64c7c7a
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

if TYPE_CHECKING:
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.triggers.base import BaseTrigger

Expand Down Expand Up @@ -263,13 +264,11 @@ def assign_unassigned(
if capacity <= 0:
return

alive_triggerer_ids = session.scalars(
select(Job.id).where(
Job.end_date.is_(None),
Job.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=health_check_threshold),
Job.job_type == "TriggererJob",
)
).all()
alive_triggerer_ids = select(Job.id).where(
Job.end_date.is_(None),
Job.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=health_check_threshold),
Job.job_type == "TriggererJob",
)

# Find triggers who do NOT have an alive triggerer_id, and then assign
# up to `capacity` of those to us.
Expand All @@ -287,7 +286,13 @@ def assign_unassigned(
session.commit()

@classmethod
def get_sorted_triggers(cls, capacity, alive_triggerer_ids, session):
def get_sorted_triggers(cls, capacity: int, alive_triggerer_ids: list[int] | Select, session: Session):
"""Get sorted triggers based on capacity and alive triggerer ids.
:param capacity: The capacity of the triggerer.
:param alive_triggerer_ids: The alive triggerer ids as a list or a select query.
:param session: The database session.
"""
query = with_row_locks(
select(cls.id)
.join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=False)
Expand Down

0 comments on commit 64c7c7a

Please sign in to comment.