-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Description
Apache Airflow Provider(s)
elasticsearch
Versions of Apache Airflow Providers
Latest version of the ElasticSearch provider
Apache Airflow version
2.9 but all others as well
Operating System
Debian
Deployment
Astronomer
Deployment details
Affects any environment where multiple Airflow systems depend on a monolith ElasticSearch. Not specific to a deployment pattern beyond the fact that the Astronomer model creates indices at a per-day level. I believe this is the same in OSS Airflow Helm chart w/ fluentd.
What happened
The Airflow query filter to fetch logs from ElasticSearch searches '_all' indices from the underlying ElasticSearch for a specific string pattern that is a concatenation of dag_run_id, task_run_id, and a few other details. This results in a tremendous amount of stress on the underlying ElasticSearch because the Webserver is not being specific about its queries. The ElasticSearch model parallelizes the search across all the indices and across it's multiple nodes but at scale this results in significant latency when fetching logs as the number on indices scales. If there are multiple Airflow clusters using the same ElasticSearch, this issue is further compounded. The inbuilt index_patterns parameter that Airflow exposes is not enough to limit the query radius in all cases. The static filter means an operator cannot dynamically choose a day/month filter for a given task_log. In cases where logging is configured to create a new index per day, filter patterns can still end up querying hundreds of indices.
What you think should happen instead
Airflow Webserver should be using a more pointed query filter to specifically query only the relevant index for a given log query. While I understand that Airflow is not responsible for shipping logs to the underlying logging store, it could help if Airflow could be configured or make certain assumptions about the index pattern that it needs to ask for a given log. Alternatively, reindexing on a dag_id (with a secondary task_id) could make queries much more scoped and severely reduce the amount of cluster load (not an Airflow responsibility). But allowing a user to set a function that can provide an index pattern for a given task_run_id would give the most flexibility in constraining queries.
How to reproduce
Create multiple Airflow systems using Astronomer or the OSS Chart with FluentD. Configure FluentD to push to a different index-prefix per Airflow. For a minimal ElasticSearch cluster with just 3 nodes, the default number of indices is 2000 per node. At around 5000~ indices, any log query will result in 100% CPU utilization on the Elasticsearch.
Anything else
At the right scale of Airflow + ElasticSearch this issue occurs every time a user views the task logs of any given task.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct