Skip to content

Commit

Permalink
Don't get DAG out of DagBag when we already have it
Browse files Browse the repository at this point in the history
Two things here:

1. By the ponit we are looking at the "callbacks" `dagrun.dag` will
   already be set, (the `or dagbag.get_dag` is a safety precaution. It
   might not be required or worth it)
2. DagBag already _is_ a cache. We don't need an extra caching layer on
   top of it.

This "soft reverts" #30704 and removes the lru_cache
  • Loading branch information
ashb committed Oct 28, 2023
1 parent 3592ff4 commit 2352f35
Showing 1 changed file with 3 additions and 13 deletions.
16 changes: 3 additions & 13 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
from collections import Counter
from dataclasses import dataclass
from datetime import timedelta
from functools import lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
from typing import TYPE_CHECKING, Any, Collection, Iterable, Iterator

from sqlalchemy import and_, delete, func, not_, or_, select, text, update
from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -1064,12 +1063,8 @@ def _do_scheduling(self, session: Session) -> int:
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

# Send the callbacks after we commit to ensure the context is up to date when it gets run
# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
partial(self.dagbag.get_dag, session=session)
)
for dag_run, callback_to_run in callback_tuples:
dag = cached_get_dag(dag_run.dag_id)
dag = dag_run.dag or self.dagbag.get_dag(dag_run.dag_id, session=session)
if dag:
# Sending callbacks there as in standalone_dag_processor they are adding to the database,
# so it must be done outside of prohibit_commit.
Expand Down Expand Up @@ -1373,13 +1368,8 @@ def _update_state(dag: DAG, dag_run: DagRun):
tags={"dag_id": dag.dag_id},
)

# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
partial(self.dagbag.get_dag, session=session)
)

for dag_run in dag_runs:
dag = dag_run.dag = cached_get_dag(dag_run.dag_id)
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)

if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
Expand Down

0 comments on commit 2352f35

Please sign in to comment.