Skip to content

Airflow workers not finding DAGs code randomly when deployed separately #30263

@ginwakeup

Description

@ginwakeup

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

We have the following Airflow installation using Airflow 2.5.0:

  • airflow is installed using helm chart with workers replicas set to 0
  • the helm chart uses gitsync to sync dags for the webserver, scheduler and triggerer:
dags:
  persistence:
    # Enable persistent volume for storing dags
    enabled: false
    # Volume size for dags
    size: 1Gi
    # If using a custom storageClass, pass name here
    storageClassName:
    # access mode of the persistent volume
    accessMode: ReadWriteOnce
    ## the name of an existing PVC to use
    existingClaim:
    ## optional subpath for dag volume mount
    subPath: ~
  gitSync:
    enabled: true
    repo: ssh:<redacted>
    branch: dev
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: ""
   
    sshKeySecret: airflow-ssh-secret
 
    wait: 30
    containerName: git-sync
    uid: 65533

    # When not set, the values defined in the global securityContext will be used
    securityContext: {}
    #  runAsUser: 65533
    #  runAsGroup: 0

    extraVolumeMounts: []
    env:
      - name: GIT_SYNC_DEST
        value: "repo"
    resources: {}

The DAGs are correctly processed by webserver and other parts.

Separately, we instantiate workers with standalone deployment files:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: airflow-worker-redacted
  namespace: "airflow"
  labels:
    app: airflow-worker-redacted
spec:
  replicas: 3
  serviceName: airflow-worker-redacted
  selector:
    matchLabels:
      app.kubernetes.io/name: airflow-worker-redacted
      app.kubernetes.io/component: airflow-worker-redacted
  template:
    metadata:
      labels:
        app.kubernetes.io/name: airflow-worker-redacted
        app.kubernetes.io/component: airflow-worker-redacted
    spec:
      serviceAccountName: airflow-worker

      containers:
        - name: airflow-worker
          image: <redacted>
          imagePullPolicy: "Always"
          env:
            - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
              valueFrom:
                secretKeyRef:
                  name: airflow-airflow-metadata
                  key: connection
            - name: AIRFLOW__CELERY__BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: airflow-broker-url
                  key: connection
            - name: AIRFLOW__CORE__EXECUTOR
              value: CeleryExecutor
            - name: AIRFLOW__WEBSERVER__SECRET_KEY
              valueFrom:
                secretKeyRef:
                  name: airflow-webserver-secret-key
                  key: webserver-secret-key
            - name: AIRFLOW__CORE__DAGS_FOLDER
              value: "/opt/git-sync/repo"
            - name: AIRFLOW__CELERY__WORKER_CONCURRENCY
              value: "1"
             

          volumeMounts:
            - name: airflow-dags
              mountPath: /opt/git-sync
              readOnly: true

          command:
            - airflow
          args:
            - celery
            - worker
            - -q
            - <redacted>

        - name: gitsync
          image: k8s.gcr.io/git-sync/git-sync:v3.4.0
          imagePullPolicy: "Always"
          securityContext:
            runAsUser: 65533
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - name: airflow-dags
              mountPath: /tmp/git

            - name: ssh-key
              readOnly: true
              mountPath: "/etc/git-secret/ssh"
          env:
            - name: GIT_SYNC_REPO
              value: <redacted>
            - name: GIT_SYNC_DEST
              value: "repo"
            - name: GIT_SYNC_ROOT
              value: "/tmp/git"
            - name: GIT_SYNC_SSH
              value: "true"
            - name: GIT_SSH_KEY_FILE
              value: "/etc/git-secret/ssh/gitSshKey"
            - name: GIT_KNOWN_HOSTS
              value: "false"
            - name: GIT_SYNC_REV
              value: "HEAD"
            - name: GIT_SYNC_BRANCH
              value: "dev"
            - name: GIT_SYNC_DEPTH
              value: "1"
            - name: GIT_SYNC_ADD_USER
              value: "true"
            - name: GIT_SYNC_WAIT
              value: "30"
            - name: GIT_SYNC_MAX_SYNC_FAILURES
              value: "0"

          lifecycle:
            postStart:
              exec:
                command: [ "/bin/sh", "-c", "git config --global --add safe.directory '/tmp/git'" ]

      volumes:
        - name: airflow-dags
          emptyDir: { }
        - name: ssh-key
          secret:
            secretName: airflow-ssh-secret
        - name: tmp-ssh-key
          secret:
            secretName: airflow-tmp-ssh-secret

These workers start correctly and pick tasks from their queue.
They also have their own git sync to sync the dags from GitHub.

The bug is that sometimes, randomly, some tasks fails even before they start. This does not happen to all the DAG tasks, just to some that are not being picked with any visible criteria.

The error does not even appear in the task logs (because they do not start), so I have to fetch it from the worker pod itself, and it basically fails to load dags as it cannot find some custom modules to import that live in the dags folder and are synced by GitSync.

These modules actually exist in the pod itself, but it just fails to import them for some reason.

I wonder if there might be some issues having different GitSync between the helm chart and the workers?
Should I maybe condensate the two GitSync in one single job that runs separately from all the airflow parts?

Interestingly, if I get the task and re-run it manually from the UI by clearing its state, the task will run correctly.

Thanks

What you think should happen instead

The Airflow worker has the DAGs GitSynced, so it should be able to find them.

How to reproduce

Run Airflow through the helm chart with 0 workers, use the deployment to run the workers in parallel with their gitsync, then finally trigger a DAG run using API or Web.

The error does not appear systematically.

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

Helm chart for webserver/triggerer/scheduler. Workers replica set to 0.

Custom separate deployment for workers which live in the same namespace and have their own volume and gitsync for the dags.

Anything else

The problem occurs randomly without any apparent reason.

One thing I thought it could be, is that there might be a moment in which DAGs do not live on the workers because of the sync?

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions