Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save scheduler execution time by caching dags #30704

Merged
merged 8 commits into from
May 18, 2023
14 changes: 12 additions & 2 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,8 +1083,13 @@ def _do_scheduling(self, session: Session) -> int:
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

# Send the callbacks after we commit to ensure the context is up to date when it gets run
# cache saves time during scheduling of many dag_runs for same dag
cached_dags: dict = {}
for dag_run, callback_to_run in callback_tuples:
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if dag_run.dag_id not in cached_dags.keys():
cached_dags[dag_run.dag_id] = self.dagbag.get_dag(dag_run.dag_id, session=session)

dag = cached_dags[dag_run.dag_id]
AutomationDev85 marked this conversation as resolved.
Show resolved Hide resolved
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
Expand Down Expand Up @@ -1347,9 +1352,14 @@ def _update_state(dag: DAG, dag_run: DagRun):
schedule_delay,
tags={"dag_id": dag.dag_id},
)
# cache saves time during scheduling of many dag_runs for same dag
cached_dags: dict = {}

for dag_run in dag_runs:
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if dag_run.dag_id not in cached_dags.keys():
cached_dags[dag_run.dag_id] = self.dagbag.get_dag(dag_run.dag_id, session=session)

dag = dag_run.dag = cached_dags[dag_run.dag_id]
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
Expand Down