Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MappedTasks: Very slow execution :: taskinstance.py seems to be a bottleneck. #35267

Closed
1 of 2 tasks
a-meledin opened this issue Oct 30, 2023 · 20 comments
Closed
1 of 2 tasks
Labels
area:core area:performance kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet

Comments

@a-meledin
Copy link

a-meledin commented Oct 30, 2023

Apache Airflow version

2.7.2

What happened

We have a Dag with 116 tasks each having some number of MappedTasks. Overall number of tasks instances is ~6000. Problem is that we encounter very slow execution. Investigation and testing with the code below has shown that we have ~110 open connections with SQL: SELECT dag_run.state AS dag_run_state, ... FROM dag_run WHERE dag_run.dag_id = ... AND dag_run.run_id = ... FOR UPDATE .

In logs it's seen as delay caused by taskinstance.py:
image

import logging
from datetime import datetime
import pandas as pd, numpy as np
from airflow.decorators import dag, task, task_group
from airflow.operators.python import get_current_context
from airflow.models.param import Param

@dag(start_date=datetime(2023, 10, 29), schedule=None,max_active_runs=1, max_active_tasks=16,tags=["TESTS", "MAPPED TASKS"], description="",params = {"MappedNumber": Param(50,type="integer",title="Number of dynamically created Mapped tasks",},
def mapped_tests():

    @task(trigger_rule="none_failed")
    def mapped_instantiator():
        context = get_current_context()
        MappedNumber= context["params"]["MappedNumber"]
        return [x for x in range(MappedNumber)]

    @task(trigger_rule="none_failed")
    def mapped_instance(num):
        print(f"!!!!!!!!!!!!!!!!!!!!!!!! I'M NUMBER: {num} !!!!!!!!!!!!!!!!!!!!!!!")
        return num
    
    @task
    def join_all():
        num = 1
        print(f"MY NUM: {num}")
        
    first = mapped_instantiator.override(task_id="first")()    
    join_all = join_all()

    mapped_one = []
    cascade_groups = []    
    for x in range(8):
        mapped_one.append(mapped_instance.override(task_id=f"first_level_{x}").expand(num=first))    

        # do cascading
        @task_group(group_id = f"cascade_group_{x}")
        def cascade_group():
            for y in range(3):
                mapped_instance.override(task_id=f"cascade_{x}_{y}").expand(num=mapped_one[x])    
        cascade_groups.append(cascade_group())
        mapped_one[x] >> cascade_groups[x]
        
        cascade_groups[x] >> join_all
    
dag_id = "mapped_tests"
globals()[dag_id] = mapped_tests()

What you think should happen instead

Invetigation required on taskinstance.py code.

How to reproduce

See code above. With pgbouncer allow 150 connections. Then check query stat by:
select query,wait_event_type,wait_event,count(*), min(now() - backend_start), max(now() - backend_start) from pg_stat_activity WHERE usename = 'airflow_user'::name GROUP BY 1,2,3 ORDER BY 4 DESC;

Operating System

Ubuntu on Linux and under WSL2

Versions of Apache Airflow Providers

2.7.2

Deployment

Docker-Compose

Deployment details

Tested on Docker, Python 3.8-310, under Celery and LocalExecutor.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@a-meledin a-meledin added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Oct 30, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 30, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@Taragolis
Copy link
Contributor

Invetigation required on taskinstance.py code.

Airflow is OSS and developed by different contributors (most of them also users of Airflow), so this required if someone spend free time and figure out how this could be optimised on the 3 different DB backends: Postgres, MySQL and SQLite.

@phi-friday
Copy link
Contributor

I think you can limit it with config

@potiuk
Copy link
Member

potiuk commented Oct 31, 2023

I think indeed we need more details an investigation and answers (from your side @a-meledin ) .

  • Is it only with mapped tasks in that number you see the problem?
  • Have you checked if you have enough resources to run your database with this setup (I.e. handling 150 opened connections) ?
  • Have you try to set lower/higher values? What happens them?

Generally speaking when you allow PGBouncer to allow 150 connections to your database, then it will open all 150 connections and Postgres creates a separate process for each connection. So if your postress does not have enough resources (for example memory - but it can be CPU or other resources ) to run all your processes, it will slow down to a crawl as those process will compete for those resources.

You need to look at the resource usage and see what your bottleneck are.

Finally the last question and request:

  • Did you see a regression comparing to other airflow versions or is it the only version of Airflow you tested it with? Can you try with 2.7.1 and upcoming (will be released tomorrow likely 2.7.3rc) and see if there are any differences. There were some changes in 2.7.2 that should decrease the number of connections used comparing to 2.7.1 but I wonder if this did not have some side effects, so if you can check those, that would be great.

@potiuk
Copy link
Member

potiuk commented Oct 31, 2023

I think you can limit it with config

That will not work - each task established its own connection and runs in separate process - sqlalchemy pool only work per process.

@a-meledin
Copy link
Author

a-meledin commented Oct 31, 2023

I think you can limit it with config

I. First, I installed pgbouncer and added pool 20, then increased to 150. There were ~104 active connections waiting for lock and no waiting processes in pgbouncer (so 120-130 connection pool was enough).
II. Second, I removed "minischeduling" feature by setting:

  • schedule_after_task_execution = False
  • it also can be removed by "use_row_level_locking = False", but then just single Scheduler can work.

This settings added 2 times speed up, but it seems as this DAG gets more new dynamic mapped taskinstances, the slower processing becomes.

  • Is it only with mapped tasks in that number you see the problem?

As I mentioned above, if running with MappedNumber=108 param, execution of 3460 task instances takes 20 minutes. Celery flower shows runtime between 1.7 - 4.5 sec for task instance. I used one Celery Worker with 8 threads on Core i5 11 (12 CPU) machine and 32 Gb Mem, Ubuntu under WSL2. Memory is enough. CPU load is almost 100%.

  • Have you checked if you have enough resources to run your database with this setup (I.e. handling 150 opened connections)

I've used pgbouncer. And with schedule_after_task_execution = False airflow doesn't use more than 30-35 connections as I observed.

  • Have you try to set lower/higher values? What happens them?

If set pgbouncer lower pool limits (e.g. 20) then there are processes waiting for connection (For situation when schedule_after_task_execution = True and use_row_level_locking = True).. Sure, airflow and postgres consumed less resouces, but DAG's execution time slowed down even more.

Generally speaking when you allow PGBouncer to allow 150 connections to your database, then it will open all 150 connections and Postgres creates a separate process for each connection. So if your postress does not have enough resources (for example memory - but it can be CPU or other resources ) to run all your processes, it will slow down to a crawl as those process will compete for those resources.

I observed this. Swap file was minimal. The problem was with locks. See above explanation.

  • Did you see a regression comparing to other airflow versions or is it the only version of Airflow you tested it with? Can you try with 2.7.1 and upcoming (will be released tomorrow likely 2.7.3rc) and see if there are any differences. There were some changes in 2.7.2 that should decrease the number of connections used comparing to 2.7.1 but I wonder if this did not have some side effects, so if you can check those, that would be great.

Tested only 2.7.2. with LocalExecutor and CeleryExecutor+Reddis+PG backend. A bit problematic to test under 2.7.1.

@potiuk
Copy link
Member

potiuk commented Oct 31, 2023

Tested only 2.7.2. with LocalExecutor and CeleryExecutor+Reddis+PG backend. A bit problematic to test under 2.7.1.

It would be great if you could try this and compare. As mentioned there was a change in 2.7.2 that was around that (connections for mapped tasks) but it was supposed to improve number of connectiosn (lowering them by half essentially), but maybe there are some unforeseen side effects the change triggered and it would be great to narrow-down investigation, whether this is somewhat a problem with the deployment or something that is caused by that change.

@a-meledin
Copy link
Author

a-meledin commented Nov 1, 2023 via email

@potiuk
Copy link
Member

potiuk commented Nov 1, 2023

Jarec, Thank you for your answer. You could run the code above on 2.7.1 too. Actually I don't have such a possibility now.

It's. not the code, it's the scale. I am contributor, not user, so I do not have Airflow installation to test it on.

@VladimirYushkevich
Copy link
Contributor

I'm having similar issue with Dynamically Mapped Tasks for 2.8.4. What is a best way to fix it?

@kaxil
Copy link
Member

kaxil commented May 15, 2024

@uranusjr Any ideas or other instance of such things you have noticed from other users?

@uranusjr
Copy link
Member

I don’t recall anything similar reported elsewhere. I can imagine the FOR UPDATE part causing this though since it can saturate the database with all workers doing it at the same time, especially if you don’t have a good combination of the database connection limit and worker count.

Is is possible to get the full SELECT ... FOR UPDATE query you mentioned in the top message? That would help a lot pin down the problem.

@VladimirYushkevich
Copy link
Contributor

I managed to spot this problem a bit. When our DAG starts with 1000+ Dynamically Mapped Tasks then we are experiencing significant performance issues. UI is slow, warnings that scheduler or triggerer are not available. I found that only impacted metrics are from pgbpuncer and our postgres SQL instance(on GCP) started to report about DB locks. Below spikes correlate with the time when this DAG with Dynamically Mapped Tasks is running:
Screenshot 2024-05-14 at 20 32 05
I selected what are the long running queries from pg_stat_activity:
Screenshot 2024-05-14 at 20 57 44
The one of the frequently running query looks like:

SELECT dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.queued_at AS dag_run_queued_at, dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.creating_job_id AS dag_run_creating_job_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.run_type AS dag_run_run_type, dag_run.conf AS dag_run_conf, dag_run.data_interval_start AS dag_run_data_interval_start, dag_run.data_interval_end AS dag_run_data_interval_end, dag_run.last_scheduling_decision AS dag_run_last_scheduling_decision, dag_run.dag_hash AS dag_run_dag_hash, dag_run.log_template_id AS dag_run_log_template_id, dag_run.updated_at AS dag_run_updated_at, dag_run.clear_number AS dag_run_clear_number
FROM dag_run
WHERE dag_run.dag_id = 'retryable_dag' AND dag_run.run_id = 'scheduled__2024-05-14T08:00:00+00:00' FOR UPDATE;

I tried to run this query with and without FOR UPDATE(doesn't really matter). When dag is running it takes 3-4 min, most of the time it is waiting for the lock. When I pause the DAG exactly the same query takes ~1s.
We have hundreds of other DAGs running at the same time and haven't seen such issue.
My suspicion is: running Dynamically Mapped Tasks are the source for locks in DB

@dstandish
Copy link
Contributor

dstandish commented May 15, 2024

This could be related to this PR, and looks like it was released in 2.9.1:

#38914

We observed that when you have a bunch of mapped tasks, running the "mini scheduler" can result in too many processes waiting for the same lock on the same table. With that change, it won't wait if it's already locked.

@VladimirYushkevich
Copy link
Contributor

@dstandish, thanks a lot. I see that after upgrading to 2.9.1 my issue is gone. All queries are fast again and I don't see locks in DB.

@dstandish
Copy link
Contributor

@dstandish, thanks a lot. I see that after upgrading to 2.9.1 my issue is gone. All queries are fast again and I don't see locks in DB.

Great , glad to hear it

@kaxil
Copy link
Member

kaxil commented May 16, 2024

Going to close it as resolved in 2.9.1

@kaxil kaxil closed this as completed May 16, 2024
@potiuk
Copy link
Member

potiuk commented May 16, 2024

@dstandish, thanks a lot. I see that after upgrading to 2.9.1 my issue is gone. All queries are fast again and I don't see locks in DB.

Great , glad to hear it

Cool. Thanks @dstandish for the pointer !

@VladimirYushkevich
Copy link
Contributor

VladimirYushkevich commented May 17, 2024

I think 2.9.1 has another issue. We started to see:

2024-05-17 09:49:19.191 UTC [3586765]: [131-1] db=airflow,user=airflow-cloudsqlproxy@service-dev.iam ERROR:  could not obtain lock on row in relation "dag_run"

in DB logs after upgrading to 2.9.1.
P.S. I'm fine to open new issue.

@kaxil
Copy link
Member

kaxil commented May 17, 2024

Yes, please open a new issue and if possible please also add details of any impact on performance or stability

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:performance kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet
Projects
None yet
Development

No branches or pull requests

8 participants