Skip to content

Commit

Permalink
Revert "Fix future DagRun rarely triggered by race conditions when ma…
Browse files Browse the repository at this point in the history
…x_active_runs reached its upper limit. (#31414)" (#37596)

This reverts commit b53e2ae.
  • Loading branch information
ephraimbuddy committed Feb 21, 2024
1 parent 52d2032 commit b38d59b
Showing 1 changed file with 3 additions and 14 deletions.
17 changes: 3 additions & 14 deletions airflow/jobs/scheduler_job_runner.py
Expand Up @@ -33,7 +33,7 @@

from sqlalchemy import and_, delete, func, not_, or_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, selectinload
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression

from airflow import settings
Expand Down Expand Up @@ -1418,22 +1418,11 @@ def _schedule_dag_run(
callback: DagCallbackRequest | None = None

dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
# Adopt row locking to account for inconsistencies when next_dagrun_create_after = None
query = (
select(DagModel).where(DagModel.dag_id == dag_run.dag_id).options(joinedload(DagModel.parent_dag))
)
dag_model = session.scalars(
with_row_locks(query, of=DagModel, session=session, skip_locked=True)
).one_or_none()
dag_model = DM.get_dagmodel(dag_run.dag_id, session)

if not dag:
if not dag or not dag_model:
self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id)
return callback
if not dag_model:
self.log.info(
"DAG %s scheduling was skipped, probably because the DAG record was locked", dag_run.dag_id
)
return callback

if (
dag_run.start_date
Expand Down

0 comments on commit b38d59b

Please sign in to comment.