Skip to content

Commit

Permalink
[AIRFLOW-6881] Bulk fetch DAGRun for create_dag_run (#7502)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Feb 29, 2020
1 parent e936159 commit cc562dd
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,20 +504,27 @@ def update_import_errors(session, dagbag):

# pylint: disable=too-many-return-statements,too-many-branches
@provide_session
def create_dag_run(self, dag, session=None):
def create_dag_run(self, dag, dag_runs=None, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval.
Returns DagRun if one is scheduled. Otherwise returns None.
"""
# pylint: disable=too-many-nested-blocks
if dag.schedule_interval and conf.getboolean('scheduler', 'USE_JOB_SCHEDULE'):
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False,
session=session
)
if dag_runs is None:
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False,
session=session
)
else:
active_runs = [
dag_run
for dag_run in dag_runs
if not dag_run.external_trigger
]
# return if already reached maximum active runs and no timeout setting
if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
return None
Expand Down Expand Up @@ -716,7 +723,7 @@ def _process_dags(self, dags: List[DAG], session=None):
# Only creates DagRun for DAGs that are not subdag since
# DagRun of subdags are created when SubDagOperator executes.
if not dag.is_subdag:
dag_run = self.create_dag_run(dag)
dag_run = self.create_dag_run(dag, dag_runs=dag_runs_for_dag)
if dag_run:
dag_runs_for_dag.append(dag_run)
expected_start_date = dag.following_schedule(dag_run.execution_date)
Expand Down

0 comments on commit cc562dd

Please sign in to comment.