Skip to content

Commit

Permalink
Fix future DagRun rarely triggered by race conditions when max_active…
Browse files Browse the repository at this point in the history
…_runs reached its upper limit. (#31414)

* feat: select dag_model with row lock

* fix: logging that scheduling was skipped

* fix: remove unused get_dagmodel

* fix: correct log message to more generic word

---------

Co-authored-by: doiken <doiken@users.noreply.github.com>
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com>
  • Loading branch information
4 people committed Aug 8, 2023
1 parent e43206e commit b53e2ae
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from sqlalchemy import and_, delete, func, not_, or_, select, text, update
from sqlalchemy.engine import Result
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Query, Session, load_only, make_transient, selectinload
from sqlalchemy.orm import Query, Session, joinedload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression

from airflow import settings
Expand Down Expand Up @@ -1397,11 +1397,24 @@ def _schedule_dag_run(
callback: DagCallbackRequest | None = None

dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
dag_model = DM.get_dagmodel(dag_run.dag_id, session)
# Adopt row locking to account for inconsistencies when next_dagrun_create_after = None
query = (
session.query(DagModel)
.filter(DagModel.dag_id == dag_run.dag_id)
.options(joinedload(DagModel.parent_dag))
)
dag_model = with_row_locks(
query, of=DagModel, session=session, **skip_locked(session=session)
).one_or_none()

if not dag or not dag_model:
if not dag:
self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id)
return callback
if not dag_model:
self.log.info(
"DAG %s scheduling was skipped, probably because the DAG record was locked", dag_run.dag_id
)
return callback

if (
dag_run.start_date
Expand Down

0 comments on commit b53e2ae

Please sign in to comment.