Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't get DAG out of DagBag when we already have it #35243

Closed
wants to merge 1 commit into from

Conversation

ashb
Copy link
Member

@ashb ashb commented Oct 28, 2023

Two things here:

  1. By the ponit we are looking at the "callbacks" dagrun.dag will already be set, (the or dagbag.get_dag is a safety precaution. It might not be required or worth it)
  2. DagBag already is a cache. We don't need an extra caching layer on top of it.
    if dag_id not in self.dags:
    # Load from DB if not (yet) in the bag
    self._add_dag_from_db(dag_id=dag_id, session=session)
    return self.dags.get(dag_id)

This "soft reverts" #30704 and removes the lru_cache

Two things here:

1. By the ponit we are looking at the "callbacks" `dagrun.dag` will
   already be set, (the `or dagbag.get_dag` is a safety precaution. It
   might not be required or worth it)
2. DagBag already _is_ a cache. We don't need an extra caching layer on
   top of it.

This "soft reverts" #30704 and removes the lru_cache
@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Oct 28, 2023
Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the other PR description:

With the caching we were able to increase scheduler performance. Because the time on our slow DB to query the dag took between 50ms and 250ms and if you execute this only once or 60 times during one scheduler loop run this makes a big change.

We don't use the dagbag cache directly, instead we check if we need to update the dag and reload it from the DB:

# If DAG is in the DagBag, check the following
# 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
# 2. check the last_updated and hash columns in SerializedDag table to see if
# Serialized DAG is updated
# 3. if (2) is yes, fetch the Serialized DAG.
# 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag
# if it exists and return None.

So, I wonder if this refresh for some dags is necessary in our case (if so, your PR will be a bug fix) or if we need a local LRU cache to avoid reloading some dags from the DB.
(I'm talking about the revert of the second method _get_next_dagruns_to_examine and not the one which uses dag_run.dag)

for dag_run, callback_to_run in callback_tuples:
dag = cached_get_dag(dag_run.dag_id)
dag = dag_run.dag or self.dagbag.get_dag(dag_run.dag_id, session=session)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, just before this loop are these two calls:

            dag_runs = self._get_next_dagruns_to_examine(DagRunState.RUNNING, session)
            # Bulk fetch the currently active dag runs for the dags we are
            # examining, rather than making one query per DagRun
            callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

Both of those get the dag out of the dagbag which weren't affected by an LRU cache, but every dagrun we have here must have been in the call to _schedule_all_dag_runs.

@ashb
Copy link
Member Author

ashb commented Oct 28, 2023

Because the time on our slow DB to query the dag took between 50ms and 250ms and if you execute this only once or 60 times during one scheduler loop run this makes a big change.

The point is that dagbag.get_dag is already this cache. There were no numbers provided in that PR that show the PR actually makes any difference. My assumption is that this doesn't actually save anything (as not every call to dagbag.get_dag in the scheduler loop was replaced by a cache.

@jscheffl
Copy link
Contributor

Yes, in deep I was also scratching my head. Obviously there is a kind of basic caching but also with expiry check. The main driver for the lru_cache was the use in _get_next_dagruns_to_examine for the case if 200+ times the same DAG is queued. Then we don't need to check during one iteration over-and-over whether the DAG changed in between. That was the intend.
But the in-deep-investigation - and therefore the previous PR was made by @AutomationDev85 - who is probably back online on Monday.

@ashb
Copy link
Member Author

ashb commented Oct 30, 2023

The difference between an LRU cache and the cache in dagbag is that the later does a datetime.now() call (more or less).

Additionally the change here to dag = dag_run.dag or self.dagbag.get_dag(dag_run.dag_id, session=session) should negate even that call in 99% of cases without needing the extra cache.

@AutomationDev85
Copy link
Contributor

During the runtime measurement a few month ago I run into issue that these lines consumed a lot of time. When we schedule DAG with many DAG runs. I was not aware that there is some basic caching but my measurement looked like it was not working for our use case :( Any idea why it was not working if you try to schedule 200 DAG Runs of the same DAG?

@jscheffl
Copy link
Contributor

jscheffl commented Oct 30, 2023

While sitting in the train failing to build the Airflow container via breeze I was re-inspecting the code. I believe I now saw the root cause for the performance problem we had and why @AutomationDev85 added the cache around it.

There are multiple dicts used in DagBag to cache the DAG objects and the timestamps and versions. The attribute self.dags_last_fetched stores when last time a DAG was fetched and checks for the standard of 10 seconds to ensure cache is not too old. But in DagBag.get_dag() (current main in https://github.com/apache/airflow/blob/main/airflow/models/dagbag.py#L207) it always hits the DB with a query if serialized cached value smells like too old. But actually the last check time is only updated if the DAG has been re-parsed in between (see https://github.com/apache/airflow/blob/main/airflow/models/dagbag.py#L221). Otherwise the date marker when last time checked is not touched. This is inconsistent and means if performance problems hit the DAG parsing (or the DAG parser is running in the scheduler and the scheduler loop hits some performance problems >10 seconds) then this increases DB load.
So when removing the lru cache as by previous PR #30704 then we would need to fix the caching logic and at least update the self.dags_last_fetched[dag_id] to the current time (and not the time of last DAG parsing == sd_last_updated_datetime).

But I feel like the code in this section has grown over time and it took me three times to understand the logic. Comparing to an LRU cache this is looking very complex. Maybe a round of refactoring for DB Caching would be good - Maybe we can add something like LRU cache with a timeout and move the complexity out to a caching utility rather than implementing custom logic in DagBag?

Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 15, 2023
@github-actions github-actions bot closed this Dec 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants