[AIRFLOW-6921] Fetch celery states in bulk#7542
Conversation
9538e8a to
9377e21
Compare
490f219 to
0c2a1e1
Compare
|
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
38b6b25 to
4a6d2ed
Compare
|
CC: @KevinYang21 @saguziel Can you look at this? |
airflow/executors/celery_executor.py
Outdated
There was a problem hiding this comment.
| exception_traceback = "Celery Task ID: {}\n{}".format(async_result, traceback.format_exc()) | |
| exception_traceback = f"Celery Task ID: {async_result}\n{traceback.format_exc()}" |
KevinYang21
left a comment
There was a problem hiding this comment.
Elegant solution, nice. Do we have any performance data also to share?
|
Backend: <class 'celery.backends.database.DatabaseBackend'> (Postgres) Multiprocessing: Diff: -3087 ms (114 00%) Backend: <class 'celery.backends.redis.RedisBackend'> Multiprocessing: Diff: -775 ms (258 33 %) Detailsimport time
from airflow.executors.celery_executor import BulkStateFetcher, execute_command, app
def create_tasks(task_count: int):
tasks = [["airflow", "version"] for _ in range(task_count)]
async_tasks = []
for task in tasks:
async_tasks.append(execute_command.apply_async(args=[task]))
time.sleep(1)
return async_tasks
from perf_kit.repeat_and_time import timing, repeat
REPEAT_COUNT = 5
print("Backend: ", type(app.backend))
tasks = create_tasks(5000)
@timing(REPEAT_COUNT)
@repeat(REPEAT_COUNT)
@timing()
def case():
fetcher = BulkStateFetcher(sync_parralelism=16)
fetcher.get_many({})
case()
@timing(REPEAT_COUNT)
@repeat(REPEAT_COUNT)
@timing()
def case():
fetcher = BulkStateFetcher(sync_parralelism=16)
fetcher._get_many_using_multiprocessing(tasks)
case() |
|
This is a test when all components are local. In a real environment, network latency has a big impact on the result because many round-trips. I mainly try to limit the number of queries to limit the number of round-trips. |
|
@mik-laj big kudos for this change! Should we merge it? :) |
|
Yes. I did rebase today and i waited for the result. |
|
great job! |
|
Result for 500 tasks (A more common case): My solution: Multiprocessing: Diff: -469 ms (22 33%) Backend: <class 'celery.backends.redis.RedisBackend'> Multiprocessing: Diff: -256 ms (128 00%) |
|
For 1 task: Multiprocessing Diff: 117 ms Backend: <class 'celery.backends.database.DatabaseBackend'> (Postgres) Multiprocessing: Diff: -122 ms |
|
Nice one, using multiprocessing pool there always felt a bit wrong. |
|
@auvipy speaking as a celery dev: everything here is using a "public" API right? You don't see any long-term problems with this? (It looks all good to my eyes, but I'm not familiar with internals of Celery) |
Before:
For each task, we send
nqueryAfter:
When we use BaseKeyValueStoreBackend or DatabaseBackend, we send only one query
When we use other backends, we send n queries (old behavior)
This change will be particularly significant if someone is using a backend that is on a different node.
Issue link: AIRFLOW-6921
Make sure to mark the boxes below before creating PR: [x]
[AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID** For document-only changes commit message can start with
[AIRFLOW-XXXX].In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.