Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2543,6 +2543,17 @@ scheduler:
type: integer
default: "20"
see_also: ":ref:`scheduler:ha:tunables`"
max_new_dagruns_per_loop_to_schedule:
description: |
How many NEW dagruns should be scheduled and examined (and locked) when scheduling
and queuing tasks.
If set, select `max_dagruns_per_loop_to_schedule` old dagruns (that have been
examined before)
And `max_new_dagruns_per_loop_to_schedule` new dagruns (that have not yet been examined).
example: ~
version_added: 3.2.1
type: integer
default: "0"
use_job_schedule:
description: |
Turn off scheduler use of cron intervals by setting this to ``False``.
Expand Down
90 changes: 71 additions & 19 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
Index,
Integer,
PrimaryKeyConstraint,
SQLColumnExpression,
String,
Text,
UniqueConstraint,
Expand All @@ -56,7 +57,15 @@
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import Mapped, declared_attr, joinedload, mapped_column, relationship, synonym, validates
from sqlalchemy.orm import (
Mapped,
declared_attr,
joinedload,
mapped_column,
relationship,
synonym,
validates,
)
from sqlalchemy.sql.expression import false, select
from sqlalchemy.sql.functions import coalesce

Expand Down Expand Up @@ -313,6 +322,11 @@ class DagRun(Base, LoggingMixin):
"max_dagruns_per_loop_to_schedule",
fallback=20,
)
DEFAULT_NEW_DAGRUNS_TO_EXAMINE = airflow_conf.getint(
"scheduler",
"max_new_dagruns_per_loop_to_schedule",
fallback=0,
)
_ti_dag_versions = association_proxy("task_instances", "dag_version")
_tih_dag_versions = association_proxy("task_instances_histories", "dag_version")

Expand Down Expand Up @@ -615,7 +629,7 @@ def active_runs_of_dags(

@classmethod
@retry_db_transaction
def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRun]:
def get_running_dag_runs_to_examine(cls, session: Session) -> Sequence[DagRun]:
"""
Return the next DagRuns that the scheduler should attempt to schedule.

Expand All @@ -628,27 +642,65 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRu
from airflow.models.backfill import BackfillDagRun
from airflow.models.dag import DagModel

query = (
select(cls)
.with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql")
.where(cls.state == DagRunState.RUNNING)
.join(DagModel, DagModel.dag_id == cls.dag_id)
.join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True)
.where(
DagModel.is_paused == false(),
DagModel.is_stale == false(),
)
.order_by(
nulls_first(cast("ColumnElement[Any]", BackfillDagRun.sort_ordinal), session=session),
nulls_first(cast("ColumnElement[Any]", cls.last_scheduling_decision), session=session),
cls.run_after,
def _get_dagrun_query(
filters: list[ColumnElement[bool]], order_by: list[SQLColumnExpression[Any]], limit: int
):
return (
select(DagRun)
.with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql")
.where(DagRun.state == DagRunState.RUNNING)
.join(DagModel, DagModel.dag_id == cls.dag_id)
.join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True)
.where(*filters)
.order_by(*order_by)
.limit(limit)
)
.limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)

filters = [
DagRun.run_after <= func.now(),
DagModel.is_paused == false(),
DagModel.is_stale == false(),
]

order = [
nulls_first(cast("ColumnElement[Any]", BackfillDagRun.sort_ordinal), session=session),
nulls_first(cast("ColumnElement[Any]", DagRun.last_scheduling_decision), session=session),
DagRun.run_after,
]

new_dagruns_to_examine = cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE

if new_dagruns_to_examine < 0:
log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller than 0, ignoring configuration")
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning message for negative max_new_dagruns_per_loop_to_schedule could be more actionable if it included the configured value and the fallback behavior (treated as 0). Also, “less than 0” reads more naturally than “smaller than 0”.

Suggested change
log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller than 0, ignoring configuration")
log.warning(
"'max_new_dagruns_per_loop_to_schedule' is configured as %s, which is less than 0; "
"treating it as 0",
new_dagruns_to_examine,
)

Copilot uses AI. Check for mistakes.
new_dagruns_to_examine = 0
Comment on lines +671 to +676
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If max_new_dagruns_per_loop_to_schedule is configured as a negative value, this warning will be emitted on every scheduler loop, potentially spamming logs. Consider clamping/validating the config once when DEFAULT_NEW_DAGRUNS_TO_EXAMINE is initialized (and logging once), instead of warning on every call.

Copilot uses AI. Check for mistakes.

query = _get_dagrun_query(
filters=filters
if new_dagruns_to_examine == 0
else [*filters, DagRun.last_scheduling_decision.is_not(None)],
order_by=order,
limit=dagruns_to_examine,
)

query = query.where(DagRun.run_after <= func.now())
result: Sequence[DagRun] = (
session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)).unique().all()
)

if new_dagruns_to_examine > 0:
new_dagruns_query = _get_dagrun_query(
filters=[*filters, DagRun.last_scheduling_decision.is_(None)],
order_by=order,
limit=new_dagruns_to_examine,
)
new_dagruns: Sequence[DagRun] = (
session.scalars(with_row_locks(new_dagruns_query, of=cls, session=session, skip_locked=True))
.unique()
.all()
)

result = [*result, *new_dagruns]

result = session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)).unique()
return result

@classmethod
Expand Down
124 changes: 122 additions & 2 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session, prev_ti_state, is_ti_sche
schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
assert (upstream.task_id in schedulable_tis) == is_ti_schedulable

def test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
self, session, dag_maker
):

DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0
Comment on lines +997 to +1000
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests mutate the class-level DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE but never restore it, which can make later tests order-dependent. Please use monkeypatch.setattr(...) (or save/restore the original value) so the change is scoped to the test.

Suggested change
self, session, dag_maker
):
DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0
self, session, dag_maker, monkeypatch
):
monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 0)

Copilot uses AI. Check for mistakes.

def create_dagruns(
last_scheduling_decision: datetime.datetime | None = None,
count: int = 20,
):
dagrun = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)
dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)
for _ in range(count - 1):
dagrun = dag_maker.create_dagrun_after(
dagrun,
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)

dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)

with dag_maker(
dag_id="dummy_dag",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task")

create_dagruns(None, 10)

with dag_maker(
dag_id="dummy_dag2",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task2")

create_dagruns(func.now(), 20)

session.flush()

dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session))

assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None]) == 10

assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10

Comment on lines +997 to +1051
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test name implies it covers the "< 0" configuration path, but it sets DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0, so the warning/clamping branch is never exercised. Set a negative value here (e.g. -1) and assert the expected warning (via caplog) to actually cover the behavior.

Suggested change
self, session, dag_maker
):
DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0
def create_dagruns(
last_scheduling_decision: datetime.datetime | None = None,
count: int = 20,
):
dagrun = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)
dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)
for _ in range(count - 1):
dagrun = dag_maker.create_dagrun_after(
dagrun,
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)
dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)
with dag_maker(
dag_id="dummy_dag",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task")
create_dagruns(None, 10)
with dag_maker(
dag_id="dummy_dag2",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task2")
create_dagruns(func.now(), 20)
session.flush()
dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session))
assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None]) == 10
assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10
self, session, dag_maker, caplog
):
original_value = DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
try:
# Set a negative value to exercise the "< 0" clamping and warning path.
DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = -1
# Capture warnings emitted when handling the negative configuration value.
caplog.set_level("WARNING", logger="airflow.models.dagrun")
def create_dagruns(
last_scheduling_decision: datetime.datetime | None = None,
count: int = 20,
):
dagrun = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)
dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)
for _ in range(count - 1):
dagrun = dag_maker.create_dagrun_after(
dagrun,
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)
dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)
with dag_maker(
dag_id="dummy_dag",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task")
create_dagruns(None, 10)
with dag_maker(
dag_id="dummy_dag2",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task2")
create_dagruns(func.now(), 20)
session.flush()
dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session))
# Verify that the negative value was ignored/clamped by checking for the warning.
assert any(
"DEFAULT_NEW_DAGRUNS_TO_EXAMINE" in record.getMessage()
and ("negative" in record.getMessage() or "< 0" in record.getMessage())
for record in caplog.records
)
assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None]) == 10
assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10
finally:
DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = original_value

Copilot uses AI. Check for mistakes.
def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker):

DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 10
Comment on lines +1052 to +1054
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here: DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE is modified without being restored, which can leak state across tests. Please scope this via monkeypatch or restore the previous value in a finally block.

Suggested change
def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker):
DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 10
def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker, monkeypatch):
monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 10)

Copilot uses AI. Check for mistakes.

def create_dagruns(
last_scheduling_decision: datetime.datetime | None = None,
count: int = 20,
):
dagrun = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)
dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)
for _ in range(count - 1):
dagrun = dag_maker.create_dagrun_after(
dagrun,
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)

dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)

with dag_maker(
dag_id="dummy_dag",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task")

create_dagruns(None)

with dag_maker(
dag_id="dummy_dag2",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task2")

create_dagruns(func.now())

session.flush()

dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session))

assert (
len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None])
== DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
)
assert (
len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None])
== DagRun.DEFAULT_DAGRUNS_TO_EXAMINE
)

@pytest.mark.parametrize("state", [DagRunState.QUEUED, DagRunState.RUNNING])
def test_next_dagruns_to_examine_only_unpaused(self, session, state, testing_dag_bundle):
"""
Expand Down Expand Up @@ -1031,17 +1146,22 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state, testing_dag

if state == DagRunState.RUNNING:
func = DagRun.get_running_dag_runs_to_examine
runs = func(session)
else:
func = DagRun.get_queued_dag_runs_to_set_running
runs = func(session).all()
runs = func(session).all()

assert runs == [dr]

orm_dag.is_paused = True
session.merge(orm_dag)
session.commit()

runs = func(session).all()
if state == DagRunState.RUNNING:
runs = func(session)
else:
runs = func(session).all()

assert runs == []

@mock.patch.object(Stats, "timing")
Expand Down
Loading