From b38d59bdce302ba393398f85f5bdf95d6ae25f5b Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 21 Feb 2024 18:00:37 +0100 Subject: [PATCH] Revert "Fix future DagRun rarely triggered by race conditions when max_active_runs reached its upper limit. (#31414)" (#37596) This reverts commit b53e2aeefc1714d306f93e58d211ad9d52356470. --- airflow/jobs/scheduler_job_runner.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index c0c9474913482..32cc9f5a634ab 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -33,7 +33,7 @@ from sqlalchemy import and_, delete, func, not_, or_, select, text, update from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, selectinload +from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload from sqlalchemy.sql import expression from airflow import settings @@ -1418,22 +1418,11 @@ def _schedule_dag_run( callback: DagCallbackRequest | None = None dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) - # Adopt row locking to account for inconsistencies when next_dagrun_create_after = None - query = ( - select(DagModel).where(DagModel.dag_id == dag_run.dag_id).options(joinedload(DagModel.parent_dag)) - ) - dag_model = session.scalars( - with_row_locks(query, of=DagModel, session=session, skip_locked=True) - ).one_or_none() + dag_model = DM.get_dagmodel(dag_run.dag_id, session) - if not dag: + if not dag or not dag_model: self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id) return callback - if not dag_model: - self.log.info( - "DAG %s scheduling was skipped, probably because the DAG record was locked", dag_run.dag_id - ) - return callback if ( dag_run.start_date