Skip to content

Edge worker _launch_job corrupts import state on Python 3.12 — fork() in multi-threaded process inherits stale import locks #65942

@diogosilva30

Description

@diogosilva30

Under which category would you file this issue?

Providers

Apache Airflow version

3.0+ (tested with providers built from main branch)

What happened and how to reproduce it?

Issue Description

Edge worker tasks intermittently fail at startup with ModuleNotFoundError cascades — every plugin and DAG import fails even though the modules exist on disk. The forked child process inherits corrupted import lock / sys.modules state from the multi-threaded parent, producing broken Python import machinery.

In more severe cases (higher thread contention at fork time), the child process can deadlock permanently — hanging with no error output until the scheduler zombie task threshold marks the task as failed. However, in our production environment the observed failure mode is exclusively the corrupted import state variant, not full deadlocks.

Root cause

The edge worker main process runs 22+ threads at runtime (asyncio event loop, ThreadPoolExecutor, httpx/httpcore connection pools, API server HTTP clients).

EdgeWorker._launch_job() in providers/edge3/src/airflow/providers/edge3/cli/worker.py uses multiprocessing.Process with the default fork start method:

def _launch_job(self, workload: ExecuteTask) -> tuple[Process, Queue[Exception]]:
    results_queue: Queue[Exception] = Queue()
    process = Process(
        target=self._run_job_via_supervisor,
        kwargs={"workload": workload, "results_queue": results_queue},
    )
    process.start()   # ← os.fork() here, copies 22 threads' lock state
    return process, results_queue

os.fork() copies the entire address space — including import locks held by the other 21 threads. Since only the forking thread survives in the child, those locks are never released. Depending on timing, this causes:

  1. Corrupted import state (primary production failure): The child inherits a partially-consistent sys.modules / importlib._bootstrap._module_locks snapshot. Plugin and DAG imports fail with ModuleNotFoundError even though the modules exist. Tasks fail with exit_code=1 after exhausting reschedule attempts.
  2. Full deadlock (theoretical / synthetic): If a thread held an import lock at the exact moment of os.fork(), any subsequent import in the child that needs that lock blocks permanently. Reproducible synthetically with high thread contention but not observed in our production environment.

Python 3.12 emits a deprecation warning on every task launch:

DeprecationWarning: This process (pid=XXXXXX) is multi-threaded, use of fork() may lead to deadlocks in the child.

Production failure pattern (corrupted import state)

Logs from a forked task child process (parent PID 1 → child PID 1435804):

INFO  - Stats instance was created in PID 1 but accessed in PID 1435804. Re-initializing.
        source=airflow.stats

ERROR - Failed to import plugin plugins/common/operators/mypackage/operator_a.py
        source=airflow.plugins_manager loc=plugins_manager.py:298
ModuleNotFoundError: No module named 'common'
  File "airflow/plugins_manager.py", line 291 in load_plugins_from_plugin_directory
  File "<frozen importlib._bootstrap_external>", line 999 in exec_module
  File "<frozen importlib._bootstrap>", line 488 in _call_with_frames_cleaned

ERROR - Failed to import plugin plugins/mypackage/cache.py
        source=airflow.plugins_manager loc=plugins_manager.py:298
ModuleNotFoundError: No module named 'mypackage'
  ...

This cascades into DAG import failure and task death:

ERROR - Failed to import: dags/my_dag.py
        source=airflow.models.dagbag.BundleDagBag loc=dagbag.py:415
ModuleNotFoundError: No module named 'common'

ERROR - Dag not found during start up
        source=task loc=task_runner.py:664

ERROR - Startup reschedule limit exceeded  reschedule_count=3 max_reschedules=3
        source=task loc=task_runner.py:631

WARNING - Process exited abnormally  exit_code=1

Key observations:

  • "Stats instance was created in PID 1 but accessed in PID 1435804" confirms the child was forked from the edge worker main process
  • Plugin imports fail because import lock / sys.modules state inherited from the 22-thread parent is corrupted
  • The task fails with exit code 1 after exhausting reschedule attempts (3/3)
  • This happens intermittently — some forks get lucky and inherit consistent state, others don't

Steps to reproduce

  1. Deploy edge workers with concurrency >= 2 on Linux (Python 3.12)
  2. Trigger multiple tasks to the edge worker queue
  3. Observe intermittent task failures — tasks fail at import time with ModuleNotFoundError for modules that exist, or (less commonly) hang permanently

Observed on live edge worker pod:

$ kubectl exec deploy/airflow-worker -- python3 -c "import threading; print(threading.active_count())"
22

$ kubectl exec deploy/airflow-worker -- python3 -c "import multiprocessing; print(multiprocessing.get_start_method())"
fork

Reproduction script (demonstrates the deadlock variant under high thread contention):

import os, sys, threading, time, signal

# Simulate edge worker's thread profile
stop = threading.Event()
def import_loop():
    while not stop.is_set():
        import importlib
        importlib.import_module('json')
        importlib.import_module('email.mime.text')
        importlib.import_module('http.client')
        time.sleep(0.001)

threads = [threading.Thread(target=import_loop, daemon=True) for _ in range(20)]
for t in threads:
    t.start()
time.sleep(0.5)

success = 0
for i in range(10):
    r, w = os.pipe()
    pid = os.fork()
    if pid == 0:
        os.close(r)
        signal.alarm(3)  # 3s timeout to detect deadlock
        try:
            from airflow.sdk.execution_time.supervisor import supervise
            os.write(w, b'1')
        except:
            os.write(w, b'0')
        os._exit(0)
    else:
        os.close(w)
        result = os.read(r, 1)
        os.close(r)
        os.waitpid(pid, 0)
        if result == b'1':
            success += 1

stop.set()
print(f'fork: {success}/10 succeeded')
# Typical result: 0-1/10 (9-10 deadlocked, killed by SIGALRM)

Notes on why existing mitigations don't apply:

  • AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER is only read by the Celery executor path (celery_executor_utils.py). The edge3 _run_job_via_supervisorsupervise()ActivitySubprocess.start() path never consults it.
  • ActivitySubprocess.start() has its own fork+exec for macOS (_FORK_EXEC_PLATFORMS) but that's the inner fork (supervisor → task), not the outer fork (_launch_job → supervisor) where the 22-thread problem occurs.
  • forkserver/spawn start methods can't be a drop-in fix because _launch_job passes self._run_job_via_supervisor (a bound method on EdgeWorker which carries unpicklable state: HTTP clients, running-job dicts, multiprocessing.Queue).

What you think should happen instead?

_launch_job should not fork() from a multi-threaded process. The fix is to replace multiprocessing.Process with subprocess.Popen using the existing airflow.sdk.execution_time.execute_workload CLI entrypoint. This creates a fresh Python interpreter — no inherited locks, no corrupted sys.modules.

This is the approach taken in PR #65943.

Operating System

Linux (Debian-based container, Python 3.12.13)

Versions of Apache Airflow Providers

apache-airflow-providers-edge3 (latest, 2026-04-27)

Official Helm Chart version

Not Applicable

Kubernetes Version

v1.30 (RKE2)

Helm Chart configuration

Not Applicable — issue is in the edge3 provider Python code, not Helm chart.

Docker Image customizations

Custom image based on official Airflow image with additional pip packages. The issue is independent of image customizations — it's inherent to the edge3 provider's use of multiprocessing.Process with fork start method in a multi-threaded process.

Anything else?

  • Production failure rate: ~30-50% of task launches fail with corrupted import state (varies with cluster load / thread activity at fork time)
  • Python 3.14 impact: Will change the default multiprocessing start method away from fork, which will also break edge workers since EdgeWorker is not picklable for spawn/forkserver
  • Related prior art in the codebase: ActivitySubprocess.start() already handles fork+exec for macOS via _FORK_EXEC_PLATFORMS; Celery executor has execute_tasks_new_python_interpreter

Are you willing to submit PR?

Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:providerskind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions