Skip to content

DagFileProcessorManager IPC bottleneck: child processes block on Variable.get() during parsing #65369

@rahulchheda

Description

@rahulchheda

Apache Airflow version

3.0.0+

What happened

When DAG files call Variable.get() during parsing, the forked child process blocks on socket.recv() waiting for the parent DagFileProcessorManager to service the IPC request. However, IPC is only serviced once per main loop iteration in _service_processor_sockets() (manager.py line 398). This means child processes can be blocked for the entire duration of the parent's loop — which includes _refresh_dag_bundles(), _collect_results(), _scan_stale_dags(), etc.

With multiple parsing processes running and a non-trivial number of DAG files, the parent loop takes significant time per iteration. During this time, any child process that calls Variable.get() is completely blocked, turning a sub-second DAG parse into a multi-minute wait.

Profiling confirms the bottleneck: cProfile of the dag-processor shows _parse_file() time is dominated by a single socket.recv() call inside CommsDecoder._get_response(), waiting for the parent to respond to the GetVariable IPC request.

This is a regression from Airflow 2.x where Variable.get() was a direct DB query with no IPC dependency.

What you think should happen instead

The dag-processor should continuously service IPC requests from child processes, not just once per main loop iteration.

For comparison, WatchedSubprocess._monitor_subprocess() in the task execution path has a tight loop that services IPC continuously — the dag-processor should have similar behavior.

One approach: run _service_processor_sockets() in a dedicated background thread during the phases of the main loop where child processes may be actively parsing (i.e., between _start_new_processes() and _collect_results()). This ensures child IPC requests are serviced immediately regardless of what the parent's main loop is doing.

How to reproduce

  1. Create multiple DAG files that call Variable.get() during module-level or top-level DAG definition
  2. Run the standalone dag-processor with multiple parsing_processes
  3. Profile the dag-processor — observe that _parse_file() duration is dominated by socket.recv() waiting for IPC response
  4. The more DAG files and parsing processes, the worse the contention becomes

Operating System

Any

Versions of Apache Airflow Providers

N/A

Deployment

Other

Deployment details

Standalone dag-processor with multiple parsing processes

Anything else

Root cause in code:

In _run_parsing_loop() (manager.py), _service_processor_sockets() is called once at line 398. Between _start_new_processes() (line 396) and _service_processor_sockets() (line 398) there is no gap, but the issue is that after servicing sockets once, the loop continues with _collect_results(), _scan_stale_dags(), _cleanup_stale_bundle_versions(), etc. — and during all of that, no IPC is being serviced. If any of these steps take time (especially _refresh_dag_bundles() on the next iteration), child processes are blocked.

The problem compounds because _service_processor_sockets() only does a single select() call — it doesn't drain all pending requests, just whatever is ready at that instant.

Contrast with task execution path:

WatchedSubprocess._monitor_subprocess() in supervisor.py has a dedicated tight loop that continuously services IPC for running tasks. The dag-processor lacks this — its IPC servicing is interleaved with many other operations in the main loop.

Related issues: #56446, #42205

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions