-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.10.1
What happened?
We use dynamic dag generation to generate dags in our Airflow environment. We have one base dag definition file, we will call big_dag.py, generating >1500 dags. Recently, after the introduction of a handful more dags generated from big_dag.py, all the big_dag.py generated dags have disappeared from UI and reappear randomly in a loop.
We noticed that if we restart our env a couple times, we could randomly achieve stability. We started to believe some timing issue was at play.
What you think should happen instead?
Goal State: Dags that generate >1500 dags should not cause any disruptions to environment, given appropriate timeouts.
After checking the dag_process_manager log stream we noticed a prevalence of this error:
psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "serialized_dag_pkey" DETAIL: Key (dag_id)=(<dag_name>)
I believe the issue is on this line of the write_dag function of the SerializedDagModel:
This code is from the main branch, I believe the issue is still present in main
airflow/airflow/models/serialized_dag.py
Line 197 in 7bfe283
| select(literal(True)).where( |
The check for if a serialized dag should be updated or not is NOT ATOMIC, which leads to the issue where more than 1 scheduler runs into a race condition while trying to update serialization.
I believe a "check-then-update" atomic action should be used here through a mechanism like the row level SELECT ... FOR UPDATE.
How to reproduce
You can reproduce this by having an environment with multiple schedulers/standalone_dag_file_processors and dag files that dynamically generate > 1500 dags. Time for a full processing of a >1500 dag file should be ~200 seconds (make sure timeout accommodates this).
To increase the likelihood the duplicate serialized pkey issue happens, reduce min_file_process_interval to like 30 seconds.
Operating System
Amazon Linux 2023
Versions of Apache Airflow Providers
No response
Deployment
Amazon (AWS) MWAA
Deployment details
2.10.1
2 Schedulers
xL Environment Size:
min_file_process_interval= 600
standalone_dag_processor = True (we believe MWAA creates one per scheduler)
dag_file_processor_timeout = 900
dagbag_import_timeout = 900
Anything else?
I am not sure why the timing works out when dag definitio files are generating <<1500 dags, but could just be the speed of the environment is finishing all work before a race condition can occur.
(Update)
After splitting the base dag into thousands of seperate .py dag definition files, the issue still persists.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
