Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save scheduler execution time by caching dags #30704

Merged
merged 8 commits into from May 18, 2023

Conversation

AutomationDev85
Copy link
Contributor

Hi airflow community,
this is my second PR and be happy to work on the scheduler runtime again. We faced an issue with slow scheduler execution time by having millions of queued dag_runs for one DAG.

This PR will add at 2 points in the code a caching of the dag. This saved a lot of scheduler runtime during scheduling many dag_runs for the same dag. The code currently reads the dag out of the DB and if you have a lot of short running tasks this is executed a lot. E.g. we wanted to schedule a DAG with:
max_active_tasks=60,
max_active_runs=180,
and most tasks with an execution time of 2 sec and 1 million in queued state. With the caching we were able to increase scheduler performance. Because the time on our slow DB to query the dag took between 50ms and 250ms and if you execute this only once or 60 times during one scheduler loop run this makes a big change.

@vandonr-amz fyi, as discussed with @jens-scheffler-bosch

@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Apr 18, 2023
airflow/jobs/scheduler_job_runner.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job_runner.py Outdated Show resolved Hide resolved
airflow/utils/helpers.py Outdated Show resolved Hide resolved
Copy link
Contributor

@vandonr-amz vandonr-amz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me gusta

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like nice optimisation.. I cannot see any bad side-effects of it.

But I'd love another maintainer's opinion.

@uranusjr
Copy link
Member

uranusjr commented Apr 26, 2023

This functionality is basically functools.lru_cache.

@vandonr-amz
Copy link
Contributor

This functionality is basically functools.lru_cache.

yes and no because this is an in-function cache. functools.lru_cache is intended to provide a cache that is persisted at the object level I think ?

@potiuk
Copy link
Member

potiuk commented Apr 26, 2023

This functionality is basically functools.lru_cache.

yes and no because this is an in-function cache. functools.lru_cache is intended to provide a cache that is persisted at the object level I think ?

Not realy. @uranusjr is right. lru_cache caches the value of function keyed by arguments of the function (the only condition is that the arguments must be hashable). So it does precisely what the extra (unneded) code does in this case..

@vandonr-amz
Copy link
Contributor

lru_cache caches the value of function keyed by arguments of the function (the only condition is that the arguments must be hashable). So it does precisely what the extra (unneded) code does in this case..

ok, but where is that value cached ?
If the annotated method is global, the cache is global I suppose ? If it's an class method, the cache is stored with the object instance ?
Here we want to cache just within the scope of the function, so we could write an annotated method as a sub-method, but then we have to write it twice, in both locations where it's used ?

We can certainly do that, but I suggested extracting the duplicated code to a function in the first place to avoid this ^^

Or maybe there is a third way I'm missing ? With an lru_cache that is not duplicated and that doesn't cache too much ?

@jscheffl
Copy link
Contributor

This functionality is basically functools.lru_cache.

yes and no because this is an in-function cache. functools.lru_cache is intended to provide a cache that is persisted at the object level I think ?

Not realy. @uranusjr is right. lru_cache caches the value of function keyed by arguments of the function (the only condition is that the arguments must be hashable). So it does precisely what the extra (unneded) code does in this case..

Only being a 95% Python expert (need to earn some stars), removing code with functools sounds good. But also I suppose adding a @lru_cache to self.dagbag.get_dag() on a general level is not what we want - so to only locally optimize I assume the best approach would be also using a lambda like proposed in answer #1 here? https://stackoverflow.com/questions/10270360/python-use-lru-cache-on-lambda-function-or-other-ways-to-create-cache-for-lamb

@vandonr-amz
Copy link
Contributor

oh nice, I didn't know it could be used like that ! Looks great to me !

Copy link
Contributor Author

@AutomationDev85 AutomationDev85 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea to use lru_cache. Thanks for helping newbie to cross the street.

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

Or maybe there is a third way I'm missing ? With an lru_cache that is not duplicated and that doesn't cache too much ?

  1. Make a class/object methods wrapping the global method call
  2. Annotate it with @lru_cache

Saves about 80% of the code.

This is another way without using lambda.

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

BTW. If you really want, you could even annotate it within the non-anonymous non-lambda (ie. function inside function. Function in Python can easily be declared and run inside another function (or method)


class Class:

      def method(self, .....):
      
          @lru_cache()...
          def cached_call():
               # body of cached call
               ....
          
          # body of object method 
          ....

@vandonr-amz
Copy link
Contributor

I like the lambda solution, I think it looks cleaner than having a function inside the function

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

I like the lambda solution, I think it looks cleaner than having a function inside the function

Yes it has an appeal. the most important is that we are building of lru_cache, rather than trying to do again what it does

@potiuk
Copy link
Member

potiuk commented Apr 29, 2023

Second pair of eyes needed here , as this is core part.

@potiuk
Copy link
Member

potiuk commented May 1, 2023

Second pair of 👀 and 🙌 needed :)

@@ -1083,8 +1084,11 @@ def _do_scheduling(self, session: Session) -> int:
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

# Send the callbacks after we commit to ensure the context is up to date when it gets run
# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable = lru_cache()(lambda dag_id: self.dagbag.get_dag(dag_id, session=session))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could benefit from using functools.partial instead. Also it’d be a good idea to improve the type hint here so when the callable is called its return value can also be typed instead of being a useless Any.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to think about how the functools.partial could make it better. The proposed statement is a one-liner in adding the @lru_cache decorator inline w/o a local additional function declaration. How do you mean it would be better?

In my VSCode at least it shows type hints from parsing python, resulting callable is hinted and the returned dag variable in line 1090 is displayed as (variable) dag: DAG | None - is it displayed w/o type hints in your IDE?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VS Code uses Pylance, which uses additional guessing to infer types. The heuristics mostly work well and are very helpful, but are not always flawless. Correctly annotating the callable would be more friendly for people using other tools.

Mypy identifies the type to be Any (using typing.reveal_type() to inspect):

$ mypy airflow/jobs/scheduler_job_runner.py 
airflow/jobs/scheduler_job_runner.py:1092: note: Revealed type is "Any"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the proposal and thinking a moment about this, it makes the code really "nicer" now. As @AutomationDev85 is off on vacation I'm pushing the update "on behalf".

@uranusjr
Copy link
Member

Pending CI

@jscheffl
Copy link
Contributor

Pending CI

CI is done now :-D

@potiuk potiuk merged commit e065f6a into apache:main May 18, 2023
42 checks passed
@boring-cyborg
Copy link

boring-cyborg bot commented May 18, 2023

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

potiuk added a commit to potiuk/airflow that referenced this pull request May 19, 2023
potiuk added a commit that referenced this pull request May 19, 2023
potiuk added a commit to potiuk/airflow that referenced this pull request May 19, 2023
potiuk added a commit that referenced this pull request May 19, 2023
* Revert "Revert "Save scheduler execution time by caching dags (#30704)" (#31413)"

This reverts commit e6f2117.

* Revert "Save scheduler execution time by adding new Index idea for dag_run (#30827)"

This reverts commit c63b777.
@ephraimbuddy ephraimbuddy added the type:improvement Changelog: Improvements label Jul 6, 2023
@ephraimbuddy ephraimbuddy added this to the Airflow 2.7.0 milestone Jul 6, 2023
@ashb
Copy link
Member

ashb commented Oct 28, 2023

@uranusjr @jens-scheffler-bosch @potiuk This PR makes no sense btw -- DagBag.get_dag already has a cache:

if dag_id not in self.dags:
# Load from DB if not (yet) in the bag
self._add_dag_from_db(dag_id=dag_id, session=session)
return self.dags.get(dag_id)

ashb added a commit that referenced this pull request Oct 28, 2023
Two things here:

1. By the ponit we are looking at the "callbacks" `dagrun.dag` will
   already be set, (the `or dagbag.get_dag` is a safety precaution. It
   might not be required or worth it)
2. DagBag already _is_ a cache. We don't need an extra caching layer on
   top of it.

This "soft reverts" #30704 and removes the lru_cache
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants