Feature/add max new dagruns to schedule#64294
Feature/add max new dagruns to schedule#64294Nataneljpwd wants to merge 12 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a scheduler tuning knob to limit how many new (never-before-examined) running DagRuns are considered per scheduling loop, to reduce starvation/slowdown when large batches of DagRuns are created at once.
Changes:
- Add
scheduler.max_new_dagruns_per_loop_to_scheduleconfig (default0) and plumb it into DagRun selection. - Update
DagRun.get_running_dag_runs_to_examine()to optionally split selection into “previously examined” vs “new” DagRuns. - Add/adjust unit tests to cover the new selection behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| airflow-core/src/airflow/models/dagrun.py | Adds config-backed limit and changes running DagRun selection logic to optionally fetch “old” and “new” runs separately. |
| airflow-core/src/airflow/config_templates/config.yml | Documents the new scheduler configuration option. |
| airflow-core/tests/unit/models/test_dagrun.py | Adds tests for the new DagRun selection behavior and updates an existing test to handle the new return type. |
| self, session, dag_maker | ||
| ): | ||
|
|
||
| DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0 | ||
|
|
||
| def create_dagruns( | ||
| last_scheduling_decision: datetime.datetime | None = None, | ||
| count: int = 20, | ||
| ): | ||
| dagrun = dag_maker.create_dagrun( | ||
| run_type=DagRunType.SCHEDULED, | ||
| state=State.RUNNING, | ||
| run_after=datetime.datetime(2024, 1, 1), | ||
| ) | ||
| dagrun.last_scheduling_decision = last_scheduling_decision | ||
| session.merge(dagrun) | ||
| for _ in range(count - 1): | ||
| dagrun = dag_maker.create_dagrun_after( | ||
| dagrun, | ||
| run_type=DagRunType.SCHEDULED, | ||
| state=State.RUNNING, | ||
| run_after=datetime.datetime(2024, 1, 1), | ||
| ) | ||
|
|
||
| dagrun.last_scheduling_decision = last_scheduling_decision | ||
| session.merge(dagrun) | ||
|
|
||
| with dag_maker( | ||
| dag_id="dummy_dag", | ||
| schedule=datetime.timedelta(days=1), | ||
| start_date=datetime.datetime(2024, 1, 1), | ||
| session=session, | ||
| ): | ||
| EmptyOperator(task_id="dummy_task") | ||
|
|
||
| create_dagruns(None, 10) | ||
|
|
||
| with dag_maker( | ||
| dag_id="dummy_dag2", | ||
| schedule=datetime.timedelta(days=1), | ||
| start_date=datetime.datetime(2024, 1, 1), | ||
| session=session, | ||
| ): | ||
| EmptyOperator(task_id="dummy_task2") | ||
|
|
||
| create_dagruns(func.now(), 20) | ||
|
|
||
| session.flush() | ||
|
|
||
| dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session)) | ||
|
|
||
| assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None]) == 10 | ||
|
|
||
| assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10 | ||
|
|
There was a problem hiding this comment.
This test name implies it covers the "< 0" configuration path, but it sets DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0, so the warning/clamping branch is never exercised. Set a negative value here (e.g. -1) and assert the expected warning (via caplog) to actually cover the behavior.
| self, session, dag_maker | |
| ): | |
| DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0 | |
| def create_dagruns( | |
| last_scheduling_decision: datetime.datetime | None = None, | |
| count: int = 20, | |
| ): | |
| dagrun = dag_maker.create_dagrun( | |
| run_type=DagRunType.SCHEDULED, | |
| state=State.RUNNING, | |
| run_after=datetime.datetime(2024, 1, 1), | |
| ) | |
| dagrun.last_scheduling_decision = last_scheduling_decision | |
| session.merge(dagrun) | |
| for _ in range(count - 1): | |
| dagrun = dag_maker.create_dagrun_after( | |
| dagrun, | |
| run_type=DagRunType.SCHEDULED, | |
| state=State.RUNNING, | |
| run_after=datetime.datetime(2024, 1, 1), | |
| ) | |
| dagrun.last_scheduling_decision = last_scheduling_decision | |
| session.merge(dagrun) | |
| with dag_maker( | |
| dag_id="dummy_dag", | |
| schedule=datetime.timedelta(days=1), | |
| start_date=datetime.datetime(2024, 1, 1), | |
| session=session, | |
| ): | |
| EmptyOperator(task_id="dummy_task") | |
| create_dagruns(None, 10) | |
| with dag_maker( | |
| dag_id="dummy_dag2", | |
| schedule=datetime.timedelta(days=1), | |
| start_date=datetime.datetime(2024, 1, 1), | |
| session=session, | |
| ): | |
| EmptyOperator(task_id="dummy_task2") | |
| create_dagruns(func.now(), 20) | |
| session.flush() | |
| dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session)) | |
| assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None]) == 10 | |
| assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10 | |
| self, session, dag_maker, caplog | |
| ): | |
| original_value = DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE | |
| try: | |
| # Set a negative value to exercise the "< 0" clamping and warning path. | |
| DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = -1 | |
| # Capture warnings emitted when handling the negative configuration value. | |
| caplog.set_level("WARNING", logger="airflow.models.dagrun") | |
| def create_dagruns( | |
| last_scheduling_decision: datetime.datetime | None = None, | |
| count: int = 20, | |
| ): | |
| dagrun = dag_maker.create_dagrun( | |
| run_type=DagRunType.SCHEDULED, | |
| state=State.RUNNING, | |
| run_after=datetime.datetime(2024, 1, 1), | |
| ) | |
| dagrun.last_scheduling_decision = last_scheduling_decision | |
| session.merge(dagrun) | |
| for _ in range(count - 1): | |
| dagrun = dag_maker.create_dagrun_after( | |
| dagrun, | |
| run_type=DagRunType.SCHEDULED, | |
| state=State.RUNNING, | |
| run_after=datetime.datetime(2024, 1, 1), | |
| ) | |
| dagrun.last_scheduling_decision = last_scheduling_decision | |
| session.merge(dagrun) | |
| with dag_maker( | |
| dag_id="dummy_dag", | |
| schedule=datetime.timedelta(days=1), | |
| start_date=datetime.datetime(2024, 1, 1), | |
| session=session, | |
| ): | |
| EmptyOperator(task_id="dummy_task") | |
| create_dagruns(None, 10) | |
| with dag_maker( | |
| dag_id="dummy_dag2", | |
| schedule=datetime.timedelta(days=1), | |
| start_date=datetime.datetime(2024, 1, 1), | |
| session=session, | |
| ): | |
| EmptyOperator(task_id="dummy_task2") | |
| create_dagruns(func.now(), 20) | |
| session.flush() | |
| dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session)) | |
| # Verify that the negative value was ignored/clamped by checking for the warning. | |
| assert any( | |
| "DEFAULT_NEW_DAGRUNS_TO_EXAMINE" in record.getMessage() | |
| and ("negative" in record.getMessage() or "< 0" in record.getMessage()) | |
| for record in caplog.records | |
| ) | |
| assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None]) == 10 | |
| assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10 | |
| finally: | |
| DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = original_value |
| self, session, dag_maker | ||
| ): | ||
|
|
||
| DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0 |
There was a problem hiding this comment.
These tests mutate the class-level DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE but never restore it, which can make later tests order-dependent. Please use monkeypatch.setattr(...) (or save/restore the original value) so the change is scoped to the test.
| self, session, dag_maker | |
| ): | |
| DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0 | |
| self, session, dag_maker, monkeypatch | |
| ): | |
| monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 0) |
| def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker): | ||
|
|
||
| DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 10 |
There was a problem hiding this comment.
Same issue here: DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE is modified without being restored, which can leak state across tests. Please scope this via monkeypatch or restore the previous value in a finally block.
| def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker): | |
| DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 10 | |
| def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker, monkeypatch): | |
| monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 10) |
| from sqlalchemy import ( | ||
| JSON, | ||
| Enum, | ||
| ForeignKey, | ||
| ForeignKeyConstraint, | ||
| Index, | ||
| Integer, | ||
| PrimaryKeyConstraint, | ||
| SQLColumnExpression, | ||
| String, | ||
| Text, |
There was a problem hiding this comment.
SQLColumnExpression is only used for typing in _get_dagrun_query, and this file already keeps most SQLAlchemy typing-only imports under TYPE_CHECKING. Consider moving this import under TYPE_CHECKING (or using an already-imported typing like ColumnElement[Any]) to avoid adding an extra runtime dependency/import surface.
| new_dagruns_to_examine = cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE | ||
| dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE | ||
|
|
||
| if new_dagruns_to_examine < 0: | ||
| log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller than 0, ignoring configuration") | ||
| new_dagruns_to_examine = 0 |
There was a problem hiding this comment.
If max_new_dagruns_per_loop_to_schedule is configured as a negative value, this warning will be emitted on every scheduler loop, potentially spamming logs. Consider clamping/validating the config once when DEFAULT_NEW_DAGRUNS_TO_EXAMINE is initialized (and logging once), instead of warning on every call.
When new dagruns are created in bulk (i.e with triggerDagRunOperator), the scheduler might struggle with the amount created, and cause other dagruns to starve.
This is due to the sort order in get_running_dagruns_to_examine which selects (with a nulls first) by last scheduling decision, which means that if a lot of new dagruns are created, the scheduler will examine them first, and in situations where the dags have a lot of tasks (hundreds to tens of thousands) it can cause the scheduler to stall, as it has to both examine a lot of dagruns, and create new tasks for those dagruns.
When we have tried to tune the max_dagruns_per_loop_to_schedule we either got starvation of other dagruns OR the scheduler being reset due to not returning a heartbeat for a long time and failing the readiness probe.
To fix this, a new configuration is added, max_new_dagruns_per_loop_to_schedule which can help when a lot of new dagruns are created in large batches at the same time, and allow the scheduler to both look at existing dagruns (not starving them and causing them to timeout with no running / scheduled tasks) and create and manage the new dagruns.
Was generative AI tooling used to co-author this PR?