fix: Service IPC sockets during dag bundle refresh to prevent child process blocking#65370
Open
rahulchheda wants to merge 1 commit intoapache:mainfrom
Open
fix: Service IPC sockets during dag bundle refresh to prevent child process blocking#65370rahulchheda wants to merge 1 commit intoapache:mainfrom
rahulchheda wants to merge 1 commit intoapache:mainfrom
Conversation
…locking Child parsing processes that call Variable.get() during DAG parsing block on socket.recv() waiting for the parent DagFileProcessorManager to service the IPC request. However, IPC is only serviced once per main loop iteration in _service_processor_sockets(). During _refresh_dag_bundles() and other main loop operations, child processes are completely blocked — turning sub-second parses into multi-minute waits. This adds a background thread that continuously services IPC sockets while _refresh_dag_bundles() runs, ensuring child processes get immediate responses to Variable.get() and Connection.get() calls. Closes apache#65369
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Service IPC sockets in a background thread during
_refresh_dag_bundles()so child parsing processes don't block onVariable.get()/Connection.get()calls.Why
In
_run_parsing_loop(), IPC is only serviced once per iteration via_service_processor_sockets(). While the main loop executes_refresh_dag_bundles()and other operations, child processes that callVariable.get()block onsocket.recv()waiting for the parent to respond. With many DAG files and multiple parsing processes, this turns sub-second parses into multi-minute waits.This is a regression from Airflow 2.x where
Variable.get()was a direct DB query with no IPC dependency.For comparison,
WatchedSubprocess._monitor_subprocess()in the task execution path has a tight loop that continuously services IPC — the dag-processor lacks this.How
_ipc_service_thread()context manager that runs_service_processor_sockets()in a daemon thread with a 100ms poll interval_refresh_dag_bundles()with this context manager — the longest-running phase where child processes are likely to be blocked_service_processor_sockets()callThe change is minimal and non-invasive — existing IPC servicing in the main loop is untouched.
Closes #65369
^ Add meaningful description above
Read the Pull Request Contribution Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{pr_number}.significant.md, in newsfragments.