Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DagProcessor Performance Regression #30884

Closed
1 of 2 tasks
george-zubrienko opened this issue Apr 26, 2023 · 17 comments · Fixed by #30899
Closed
1 of 2 tasks

DagProcessor Performance Regression #30884

george-zubrienko opened this issue Apr 26, 2023 · 17 comments · Fixed by #30899
Labels
affected_version:2.5 Issues Reported for 2.5 area:core kind:bug This is a clearly a bug

Comments

@george-zubrienko
Copy link

george-zubrienko commented Apr 26, 2023

Apache Airflow version

2.5.3

What happened

Upgrading from 2.4.3 to 2.5.3 caused a significant increase in dag processing time on standalone dag processor (~1-2s to 60s):

/opt/airflow/dags/ecco_airflow/dags/image_processing/product_image_load.py                                                0          -1  56.68s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/known_consumers/known_consumers.py                                                    0          -1  56.64s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/monitoring/row_counts.py                                                              0          -1  56.67s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/omnichannel/base.py                                                                   0          -1  56.66s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/omnichannel/oc_data.py                                                                0          -1  56.67s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/omnichannel/oc_stream.py                                                              0          -1  56.52s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/reporting/reporting_data_foundation.py                                                0          -1  56.63s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/retail_analysis/retail_analysis_dbt.py                                                0          -1  56.66s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/rfm_segments/rfm_segments.py                                                          0          -1  56.02s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/utils/airflow.py                                                                           0          -1  56.65s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/aad_users_listing.py                                                           1           0  55.51s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/funnel_io.py                                                                   1           0  56.13s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/iar_param.py                                                                   1           0  56.50s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/sfmc_copy.py                                                                   1           0  56.59s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/us_legacy_datawarehouse.py                                                     1           0  55.15s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/cdp/ecco_cdp_auditing.py                                                              1           0  56.54s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/cdp/ecco_cdp_budget_daily_phasing.py                                                  1           0  56.63s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/cdp/ecco_cdp_gold_rm_tests.py                                                         1           0  55.00s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/consumer_entity_matching/graph_entity_matching.py                                     1           0  56.67s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/data_backup/data_backup.py                                                            1           0  56.69s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/hive/adhoc_entity_publish.py                                                          1           0  55.33s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/image_regression/train.py                                                             1           0  56.63s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/maintenance/db_maintenance.py                                                         1           0  56.58s          2023-04-26T12:56:15

Also seeing messages like these

[2023-04-26T12:56:15.322+0000] {manager.py:979} DEBUG - Processor for /opt/airflow/dags/ecco_airflow/dags/bronze/us_legacy_datawarehouse.py finished
[2023-04-26T12:56:15.323+0000] {processor.py:296} DEBUG - Waiting for <ForkProcess name='DagFileProcessor68-Process' pid=116 parent=7 stopped exitcode=0>
[2023-04-26T12:56:15.323+0000] {manager.py:979} DEBUG - Processor for /opt/airflow/dags/ecco_airflow/dags/cdp/ecco_cdp_gold_rm_tests.py finished
[2023-04-26T12:56:15.323+0000] {processor.py:296} DEBUG - Waiting for <ForkProcess name='DagFileProcessor69-Process' pid=122 parent=7 stopped exitcode=0>
[2023-04-26T12:56:15.324+0000] {manager.py:979} DEBUG - Processor for /opt/airflow/dags/ecco_airflow/dags/bronze/streaming/sap_inventory_feed.py finished
[2023-04-26T12:56:15.324+0000] {processor.py:314} DEBUG - Waiting for <ForkProcess name='DagFileProcessor70-Process' pid=128 parent=7 stopped exitcode=-SIGKILL>
[2023-04-26T12:56:15.324+0000] {manager.py:986} ERROR - Processor for /opt/airflow/dags/ecco_airflow/dags/bronze/streaming/sap_inventory_feed.py exited with return code -9.

In 2.4.3:

/opt/airflow/dags/ecco_airflow/dags/image_regression/train.py                                                             1           0  1.34s           2023-04-26T14:19:08
/opt/airflow/dags/ecco_airflow/dags/known_consumers/known_consumers.py                                                    1           0  1.12s           2023-04-26T14:19:00
/opt/airflow/dags/ecco_airflow/dags/maintenance/db_maintenance.py                                                         1           0  0.63s           2023-04-26T14:18:27
/opt/airflow/dags/ecco_airflow/dags/monitoring/row_counts.py                                                              1           0  3.74s           2023-04-26T14:18:45
/opt/airflow/dags/ecco_airflow/dags/omnichannel/oc_data.py                                                                1           0  1.21s           2023-04-26T14:18:47
/opt/airflow/dags/ecco_airflow/dags/omnichannel/oc_stream.py                                                              1           0  1.22s           2023-04-26T14:18:30
/opt/airflow/dags/ecco_airflow/dags/reporting/reporting_data_foundation.py                                                1           0  1.39s           2023-04-26T14:19:08
/opt/airflow/dags/ecco_airflow/dags/retail_analysis/retail_analysis_dbt.py                                                1           0  1.32s           2023-04-26T14:18:51
/opt/airflow/dags/ecco_airflow/dags/rfm_segments/rfm_segments.py                                                          1           0  1.20s           2023-04-26T14:18:34

What you think should happen instead

Dag processing time remains unchanged

How to reproduce

Provision Airflow with the following settings:

Airflow 2.5.3

  • K8s 1.25.6
  • Kubernetes executor
  • Postgres backend (Postgres 11.0)
  • Deploy using Airflow Helm v1.9.0 with image 2.5.3-python3.9
    • pgbouncer enabled
    • standalone dag processort with 3500m cpu / 4000Mi memory, single replica
    • dags and logs mounted from RWM volume (Azure files)

Airflow 2.4.3

  • K8s 1.25.6
  • Kubernetes executor
  • Postgres backend (Postgres 11.0)
  • Deploy using Airflow Helm v1.7.0 with image 2.4.3-python3.9
    • pgbouncer enabled
    • standalone dag processort with 2500m cpu / 2000Mi memory, single replica
    • dags and logs mounted from RWM volume (Azure files)

Image modifications

We use image built from apache/airflow:2.4.3-python3.9, with some dependencies added/reinstalled with different versions.

Poetry dependency spec:

For 2.5.3:

[tool.poetry.dependencies]
python = ">=3.9,<3.11"
authlib = "~1.0.1"
adapta = { version = "==2.2.3", extras = ["azure", "storage"] }
numpy = "==1.23.3"
db-dtypes = "~1.0.4"
gevent = "^21.12.0"
sqlalchemy = ">=1.4,<2.0"
snowflake-sqlalchemy = ">=1.4,<2.0"
esd-services-api-client = "~0.6.0"
apache-airflow-providers-common-sql = "~1.3.1"
apache-airflow-providers-databricks = "~3.1.0"
apache-airflow-providers-google = "==8.4.0"
apache-airflow-providers-microsoft-azure = "~5.2.1"
apache-airflow-providers-datadog = "~3.0.0"
apache-airflow-providers-snowflake = "~3.3.0"
apache-airflow = "==2.5.3"
dataclasses-json = ">=0.5.7,<0.6"

For 2.4.3:

[tool.poetry.dependencies]
python = ">=3.9,<3.11"
authlib = "~1.0.1"
adapta = { version = "==2.2.3", extras = ["azure", "storage"] }
numpy = "==1.23.3"
db-dtypes = "~1.0.4"
gevent = "^21.12.0"
sqlalchemy = ">=1.4,<2.0"
snowflake-sqlalchemy = ">=1.4,<2.0"
esd-services-api-client = "~0.6.0"
apache-airflow-providers-common-sql = "~1.3.1"
apache-airflow-providers-databricks = "~3.1.0"
apache-airflow-providers-google = "==8.4.0"
apache-airflow-providers-microsoft-azure = "~5.2.1"
apache-airflow-providers-datadog = "~3.0.0"
apache-airflow-providers-snowflake = "~3.3.0"
apache-airflow = "==2.4.3"
dataclasses-json = ">=0.5.7,<0.6"

Operating System

Container OS: Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==6.0.0
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.4.0
apache-airflow-providers-common-sql==1.3.4
apache-airflow-providers-databricks==3.1.0
apache-airflow-providers-datadog==3.0.0
apache-airflow-providers-docker==3.2.0
apache-airflow-providers-elasticsearch==4.2.1
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==8.4.0
apache-airflow-providers-grpc==3.0.0
apache-airflow-providers-hashicorp==3.1.0
apache-airflow-providers-http==4.3.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-microsoft-azure==5.2.1
apache-airflow-providers-mysql==3.2.1
apache-airflow-providers-odbc==3.1.2
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-redis==3.0.0
apache-airflow-providers-sendgrid==3.0.0
apache-airflow-providers-sftp==4.1.0
apache-airflow-providers-slack==6.0.0
apache-airflow-providers-snowflake==3.3.0
apache-airflow-providers-sqlite==3.3.2
apache-airflow-providers-ssh==3.2.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

See How-to-reproduce section

Anything else

Occurs by upgrading the helm chart from 1.7.0/2.4.3 to 1.9.0/2.5.3 installation.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@george-zubrienko george-zubrienko added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Apr 26, 2023
@potiuk
Copy link
Member

potiuk commented Apr 26, 2023

Can you please check if 2.6.0rc* version solves your problem?

@potiuk potiuk added pending-response and removed needs-triage label for new issues that we didn't triage yet labels Apr 26, 2023
@potiuk
Copy link
Member

potiuk commented Apr 26, 2023

@george-zubrienko - I have some extra questions that might help to find out the root cause.

  1. Does it happen all the time for all dags parsed or some of the time for some of the dags? Can you please descibe the pattern you see?

  2. Could you please tell more (and possibly share some anonymised examples of) top-level code of the dags of yours that experience this behaviour and whether you use some of those things:

  • excessive or unusual imports?
  • reaching out to external sources (HTTP/Similar) while parsing top-level code?
  • accessing any kind of database while parsing top-level code?
  • acxessing secrets/variables/connections while parsing top-level code?

Generally speaking I would like to know your top-level parsing violates any of the best practices described in https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code ?

  1. Do you see any excessive memory use and swapping while parsing happens for the DAG file processor?

  2. Do you use callbacks in your DAGs that experience the problem?

Having those answers might help in investigating the root cuase - the problem you describe is not reproducible easily - so we need to know what's so special about your DAGS or environment that triggers that behaviour. With the related #305903 it seemsed that the reason might be MySQL but knowing that you have Postgres suggests that either the problem is different or that it is something completely different than we suspected before.

@george-zubrienko
Copy link
Author

george-zubrienko commented Apr 26, 2023

Combining answers here - I'll try with 2.6.0rc or maybe released version if it gets released before I get an hour to do a test.

@george-zubrienko - I have some extra questions that might help to find out the root cause.

  1. Does it happen all the time for all dags parsed or some of the time for some of the dags? Can you please descibe the pattern you see?

I can pinpoint a few things:

  • All dags parse much slower than usual
  • Increasing DAG_FILE_PROCESSOR_TIMEOUT doesn't affect anything
  • Increasing number of parsing processes helps to get number of parsed/active dags up, but overall process goes very slow doing maybe one dag every 30s or so with 128 parsing processes. Initially we see around 20-30/65 dags going through
  • There doesn't see to be any pattern related to the dag structure. Dags with a single task or 100 tasks can fail to process.
  1. Could you please tell more (and possibly share some anonymised examples of) top-level code of the dags of yours that experience this behaviour and whether you use some of those things:
  • excessive or unusual imports?

We always try import only what we need in DAG files, but sometimes there can be imports for typehinting purposes only.

  • reaching out to external sources (HTTP/Similar) while parsing top-level code?

No, never.

  • accessing any kind of database while parsing top-level code?

Infamous Variabels.get(..) is one thing that is against Airflow best-practices that we use in all dags. However, our Airflow is set to only run a parse for every 30s instead of 5s, and we only read a single variable (top-level json configuration for the pipeline)

In some dags we also read 1-2 connections stored in Airflow db. That's it.

  • acxessing secrets/variables/connections while parsing top-level code?

See notes above.

  1. Do you see any excessive memory use and swapping while parsing happens for the DAG file processor?

Memory usage on dag file processor pod starts at around 180mb and grows to around 1.6gb on initial parse and then stabilizes around 700mb and cpu usage drops to ~2-3% from initial ~50%

  1. Do you use callbacks in your DAGs that experience the problem?

No, we also have SLA and email/error callbacks fully disabled.

Please find below a full example of DAG that consists of 1 task only:

"""Manages stream lifecycle for OC logs"""
import json
from datetime import timedelta

import pendulum
from airflow import DAG
from airflow.models import Variable

from airflow.operators.python import PythonOperator
from airflow.providers.datadog.hooks.datadog import DatadogHook

from esd_services_api_client.beast import ArgumentValue, JobSocket

from ecco_airflow.dags.omnichannel.base import activate_log_stream
from ecco_airflow.utils.k8s import executor_config

default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email": ["esdsupport@ecco.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="ocLogStream",
    default_args=default_args,
    description="Activates streaming of OC logs.",
    schedule_interval="0 * * * *",
    max_active_runs=1,
    start_date=pendulum.today().add(days=-2),
    tags=["streaming", "omni_channel", "reporting"],
) as dag:
    # pylint: disable=W0104

    dd_hook = DatadogHook(datadog_conn_id="esd-datadog")
    runtime_conf = Variable.get("omni_channel_logs", deserialize_json=True)

    PythonOperator(
        task_id="stream-oc-logs",
        python_callable=activate_log_stream,
        pool="omnichannel",
        op_kwargs={
            "checkpoint_location": f"'{runtime_conf['checkpoint_location']}'",
            "compact_after": f"'{runtime_conf['compact_after']}'",
            "datadog_config": ArgumentValue(
                value=json.dumps(
                    {
                        "api_key": dd_hook.api_key,
                        "app_key": dd_hook.app_key,
                        "site": "datadoghq.eu",
                    }
                ),
                encrypt=True,
                quote=True,
            ),
            "custom_config": ArgumentValue(
                value=runtime_conf["access_credentials"],
                encrypt=True,
                quote=True,
                is_env=True,
            ),
            "project_name": runtime_conf["project_name"],
            "project_version": runtime_conf["project_version"],
            "project_runnable": runtime_conf["project_runnable"],
            "stream_group": "oc_logs",
            "inputs": [JobSocket.from_dict(src) for src in runtime_conf["sources"]],
            "outputs": [JobSocket.from_dict(trg) for trg in runtime_conf["targets"]],
        },
        executor_config=executor_config(
            nodepool_names=["general"],
            secret_env_vars={
                "RUNTIME_ENCRYPTION_KEY": {
                    "secret_key": "RUNTIME_ENCRYPTION_KEY",
                    "secret_name": "hashicorp-vault-spark-encryption-key",
                },
                "OC_SPARK_CRYSTAL_ACCOUNT_ACCESS": {
                    "secret_key": "OC_SPARK_CRYSTAL_ACCOUNT_ACCESS",
                    "secret_name": "hashicorp-vault-oc-logs",
                },
            },
            cpu_memory_limit={"cpu": "100m", "memory": "500Mi"},
        ),
        retries=3,
        retry_delay=timedelta(seconds=300),
    )

@george-zubrienko
Copy link
Author

george-zubrienko commented Apr 26, 2023

Adding more info:

  • Airflow configs that are different from defaults
resource "kubernetes_config_map" "airflow_additional_config" {
  metadata {
    name      = "${local.application_name}-additional-cfg"
    namespace = kubernetes_namespace.airflow_ns.metadata[0].name
  }
  data = {
    AIRFLOW__KUBERNETES__DELETE_WORKER_PODS                         = "True"
    AIRFLOW__KUBERNETES__DELETE_WORKER_PODS_ON_FAILURE              = terraform.workspace != "production" ? "False" : "True"
    AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE            = terraform.workspace != "production" ? 8 : 16
    AIRFLOW__KUBERNETES__WORKER_PODS_PENDING_TIMEOUT_CHECK_INTERVAL = 300
    AIRFLOW__KUBERNETES__DELETE_OPTION_KWARGS                       = "{\"grace_period_seconds\": 10}"
    AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE                       = "True"
    AIRFLOW__KUBERNETES__TCP_KEEP_IDLE                              = 30
    AIRFLOW__KUBERNETES__TCP_KEEP_INTVL                             = 30
    AIRFLOW__KUBERNETES__TCP_KEEP_CNT                               = 30
    AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE                        = 32
    AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW                     = 8
    AIRFLOW__DATABASE__MAX_DB_RETRIES                               = 16
    AIRFLOW__CORE__PARALLELISM                                      = 64
    AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG                         = 512
    AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG                          = 7
    AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS                         = terraform.workspace != "production" ? "True" : "False"
    AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT                            = 60
    AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT                       = 90
    AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK              = 120
    AIRFLOW__CORE__CHECK_SLAS                                       = "False"
    AIRFLOW__SCHEDULER__PARSING_PROCESSES                           = 2
    AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT                          = "False"
    AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR                    = "True"
    AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION               = "False"
    AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC                           = 60
    AIRFLOW__LOGGING__REMOTE_LOGGING                                = "True"
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID                            = "wasb_logstorage"
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER                        = "wasb-${azurerm_storage_container.airflow_logs.name}"
    AIRFLOW__API__AUTH_BACKENDS                                     = "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
    AIRFLOW__METRICS__STATSD_DATADOG_ENABLED                        = "True"
    AIRFLOW__METRICS__STATSD_PREFIX                                 = "airflow"
    AIRFLOW__METRICS__STATSD_DATADOG_TAGS                           = "environment:${terraform.workspace}"
    AIRFLOW__METRICS__STATSD_ON                                     = "True"
    AIRFLOW__METRICS__STATSD_PORT                                   = "8125"
    AIRFLOW__METRICS__STATSD_HOST                                   = "datadog-statsd.${kubernetes_namespace.airflow_ns.metadata[0].name}.svc.cluster.local"
    AIRFLOW__WEBSERVER__AUTO_REFRESH_INTERVAL                       = 10
  }
}

let me know if you want to know helm values we pass when deploying

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

Very helpful, thanks!

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

Also - I am not sure if you could try it - the observation of @wooklist was that the root cause was this commit #30079 and it is very small. We could not see so far, why it would be the cause but if this would be possibl to modify installed dag file processor code and revert that commit, to see if that helps, that would help to narrow down the diagnosis immensely.

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

And just to explain why - I will (in the meantime) try to reproduce it locally and see if we can observe similar behaviours on our installation, but just knowing that this is the cause could allow us to possibly remediate it with removal of that commit in 2.6.0 rc while we can even if we do not know exactly what is the mechanism behind the slow-down.

@george-zubrienko
Copy link
Author

george-zubrienko commented Apr 27, 2023

I remember looking at that commit, but I agree even if it is causing this behaviour, this is not an easy guess at all. For example, these are metrics from pgbouncer pod that guards our Postgres, when it ran with 2.5.3 - not much load at all, and cluster was not scheduling anything, so that load is webserver+dagprocessor.

image

In production our xact/s is between ~90 and ~300 depending on the number of running tasks.

However, there was one change that stood out, but I could not explain it. After the upgrade, these are key metrics from Postgres it self (Azure single server btw):

image

You can see there is a significant drop in traffic between db and airflow cluster after we rolled out 2.5.3. Maybe it is nothing, but just so you have a more or less complete picture from our end.

Since we already have an image with 2.5.3, I can add a layer on top with modified manager.py and use that image for dag processor specifically - that way we would test exactly the mentioned commit. I'll try to find time today or Friday to do such test.

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

Oh whoa! That's what I call "deep" data (not a big but "deep"). :). Thanks for that - it's super helpful. Quick question though - what was the version you had before 2.5.3 / rolled back to ?

@george-zubrienko
Copy link
Author

george-zubrienko commented Apr 27, 2023

This is the Dockerfile content for our current image - also the one we tried to upgrade from:

FROM apache/airflow:2.4.3-python3.9

COPY requirements.txt /tmp/requirements.txt

RUN pip3 install --user --no-cache -r /tmp/requirements.txt

USER 0

RUN rm /tmp/requirements.txt

USER airflow

I don't have SHA unfortunately, and we do not rebuild images on a regular basis, so it might not be exactly the one available on Dockerhub right now.

Update to 2.5.3 is simple:

FROM apache/airflow:2.5.3-python3.9
...

And poetry dependency spec I have provided in the issue description.

If you want to build the exact same image, you can use this GH workflow:

name: Build Airflow Image

jobs:
  build_image:
    name: Build Container Image
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
        with:
          fetch-depth: 0
      - uses: actions/setup-python@v2
        with:
          python-version: '3.9.x'
          architecture: 'x64'
      - name: Install Poetry and dependencies
        uses: SneaksAndData/github-actions/install_poetry@v0.0.21
        with:
          export_requirements: true
          requirements_path: .container/requirements.txt
          skip_dependencies: true
      - name: Build and Push Image (DEV)
        working-directory: .container
        env:
          AZCR_USER: ${{secrets.MY_ACR_USER}}
          AZCR_TOKEN: ${{secrets.MY_ACR_TOKEN}}
          AZCR_REPO: ${{secrets.MY_ACR_REPO}}
        run: |
          set -e
          echo "$AZCR_TOKEN" | docker login $AZCR_REPO.azurecr.io --username $AZCR_USER --password-stdin 
          version=$(git describe --tags --abbrev=7)
          airflow_version=$(cat Dockerfile | grep FROM | cut -d':' -f2)
          
          docker build . --tag=$AZCR_REPO.<my-cloud-registry.io or similiar>/my-airflow:$airflow_version-esd-$version && docker push $AZCR_REPO.<my-cloud-registry.io or similiar>/my-airflow:$airflow_version-esd-$version

I can provide the lock file if you want to go this way.

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

One more (really kind) request - not now but in case we do not find anything till you have the chance to experiment, but it almost looks like there is something that prevents the Variable.get to obtain the connection to retrieve anything from the database (and looks like this single Variable.get() is blocking everythig. That gives me some ideas to look (and why maybe it has not been experienced by others who have no Variables at the top-level code, but if we do not get there, just removing the Variables.get() and hardcoding the output - might be a quick way to test that hypothesis.

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

I think I found it . completely different. What is your job_heartbeat_sec ?

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#job-heartbeat-sec

@george-zubrienko
Copy link
Author

george-zubrienko commented Apr 27, 2023

AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC = 60

Dag processor in 2.5.3 rollout always showed duration for all parse tasks around 56-60 secs, but I didn't know it is that setting that regulates that btw.

@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

OK. That would explain it then. It is then indeed the same issue as #30593 but the effect of it has been magnified by the fact that your job_heartbeeat sec is THAT long.

Dag processor in 2.5.3 rollout always showed duration for all parse tasks around 56-60 secs, but I didn't know it is that setting that regulates that btw.

Yes. That was a bug I introduced when fixing the liveness check for standalone dag file processor in #30278 - we are using that heartbeat historically for all the jobs (it's also used by local_task_job and triggerer btw) so the section where it is is actually wrong and we should move it out of scheduler section.

Fix is here: #30899

There are few workarounds for that:

potiuk added a commit to potiuk/airflow that referenced this issue Apr 27, 2023
The standalone file processor as of apache#30278 introduced accidentally
an artifficial delay between dag processing by adding heartbeat
but missing to set "only_if_necessary" flag to True.

If your dag file processing has been fast (faster than the
scheduler job_heartbeat_sec) this introduced unnecessary pause
between the next dag file processor loop (up until the time
passed), it also introduced inflation of the
dag_processing_last_duration metrics (it would always show minimum
job_heartbeat_sec)

Adding "only_if_necessary" flag fixes the problem.

Fixes: apache#30593
Fixes: apache#30884
@george-zubrienko
Copy link
Author

george-zubrienko commented Apr 27, 2023

decrease the job_heartbeat_sec -> 60 seconds seems a bit excessive, scheduler should rarely run scheduler lopp for longer than a few seconds, do you know why you have that long of a heartbeat expectations ?

As I recall, we used to have a lot of issues with tasks (k8s pods) receiving SIGTERM in high load periods because they didn't heartbeat fast enough (at least that was our theory), then we figured out that was somehow related to database communication. First we tried to play around with pgbouncer settings - increasing number of allowed client connections and pool size, bumped database from 2 core / 8g to 4 core / 16g and base iops from like 100 to >2000, which helped for like a couple of months. Then people produced more dags, more tasks and it seemed unreasonable that we needed to bump database again, and pgbouncer was already running with 1k connection pool size.

Then we found out that increasing that setting significantly reduces number of database sessions and our problems with dying tasks were resolved. I should probably have opened an issue on that as well, but we were so happy our models can be trained again, we sort of let it slip somewhere in the backlog.

@george-zubrienko
Copy link
Author

george-zubrienko commented Apr 27, 2023

I don't know if our Variable.get can contribute to that as well? We actually now mount variables alongside dags on the same RWM mount, so midterm goal for me is to get everybody out of reading vars in that way and instead just get them from local fs. Recommended jinja approach doesn't work very well for us, unfortunately, as we store quite complex configs in vars that define dag layouts that are now in some places deserialized into a config dataclass right away - which we find way more convenient than using plaintext jinja pieces in dag code.

I'd be happy to try out 2.6.0 when it releases then, as I'm not so sure about changing that setting, at least at the moment - until I understand better how it affects task execution.

@eladkal eladkal added affected_version:2.5 Issues Reported for 2.5 and removed pending-response labels Apr 27, 2023
potiuk added a commit that referenced this issue Apr 27, 2023
The standalone file processor as of #30278 introduced accidentally
an artifficial delay between dag processing by adding heartbeat
but missing to set "only_if_necessary" flag to True.

If your dag file processing has been fast (faster than the
scheduler job_heartbeat_sec) this introduced unnecessary pause
between the next dag file processor loop (up until the time
passed), it also introduced inflation of the
dag_processing_last_duration metrics (it would always show minimum
job_heartbeat_sec)

Adding "only_if_necessary" flag fixes the problem.

Fixes: #30593
Fixes: #30884
ephraimbuddy pushed a commit that referenced this issue Apr 27, 2023
The standalone file processor as of #30278 introduced accidentally
an artifficial delay between dag processing by adding heartbeat
but missing to set "only_if_necessary" flag to True.

If your dag file processing has been fast (faster than the
scheduler job_heartbeat_sec) this introduced unnecessary pause
between the next dag file processor loop (up until the time
passed), it also introduced inflation of the
dag_processing_last_duration metrics (it would always show minimum
job_heartbeat_sec)

Adding "only_if_necessary" flag fixes the problem.

Fixes: #30593
Fixes: #30884
(cherry picked from commit 00ab45f)
@potiuk
Copy link
Member

potiuk commented Apr 27, 2023

I'd be happy to try out 2.6.0 when it releases then, as I'm not so sure about changing that setting, at least at the moment - until I understand better how it affects task execution.

Cool. We will get RC3 soon

Then we found out that increasing that setting significantly reduces number of database sessions and our problems with dying tasks were resolved. I should probably have opened an issue on that as well, but we were so happy our models can be trained again, we sort of let it slip somewhere in the backlog.

I think this is actually a very good point and comment. That actually makes me think that we should have DIFFERENT heartbeat expectations for local task (generally for each job type we have - as mentioned above historically they are the same)

I created an issue for that and I will implement it during some of the tasks I am working on soon as part of our AIP-44 work. Issue here #30908

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.5 Issues Reported for 2.5 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants