Fix OpenLineage DeprecationWarning about fork() on Python 3.12+#67901
Fix OpenLineage DeprecationWarning about fork() on Python 3.12+#67901I-am-Uchenna wants to merge 2 commits into
Conversation
Use forkserver mp_context for ProcessPoolExecutor and suppress DeprecationWarning in _fork_execute where os.fork() is intentional. Closes apache#47160
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
| warnings.filterwarnings( | ||
| "ignore", | ||
| message=".*use of fork\\(\\) may lead to deadlocks in the child", | ||
| category=DeprecationWarning, | ||
| ) |
There was a problem hiding this comment.
This doesn't fix the issue it just hides the warning or am I wrong?
There was a problem hiding this comment.
Yes, I'd use silencing as a last resort here. We should probably look into possible workaround so that the code does not raise this warning in the first place. cc @I-am-Uchenna , if you'd like to work on this, I'd suggest this way forward.
cc @mobuchowski
There was a problem hiding this comment.
The approach I'm considering: replace _fork_execute with a threading.Thread-based implementation. A daemon thread with join(timeout=...) gives the same timeout protection as the current fork, threads share memory so no pickling is needed, and there is no os.fork() call to trigger the warning.
The trade-off is weaker isolation compared to a subprocess (a crash in the extractor could affect the parent), but the existing code already catches exceptions, and the real risk of os.fork() in a multi-threaded process (deadlock from copied lock state) is arguably worse.
I'll push an updated commit shortly. Does this direction make sense, or would you prefer a different approach?
There was a problem hiding this comment.
This is something I'd need to see truly well tested.
The forking approach stems from some real issues we had when sharing memory - for example, deadlocks on Snowflake library that had a bug. The issue with this class of bugs, is that they basically brick Airflow installation - and when it's repeatable, the only way would be to remove OL integration. Which is not good for other reasons...
I believe the real solution would be to split the integration in two - first part, running on the same process as the task, would "collect" the data in some serializable format, and then, the second part - running on separate process (or as initially thought - separate Airflow component like triggerer, but not doable in edge-executor like environment) would parse those, perform network requests, build OL events, and emit them to configured backend. Issue with that solution is that it's giant and basically a total rework.
There was a problem hiding this comment.
Thanks for the context @mobuchowski. I dug deeper into this rather than retreating to the warning suppression.
Looking at the code, the "giant rework" you described might actually be more contained than it seems. The two-phase pattern already exists in _on_task_instance_manual_state_change: it extracts all primitives from ORM objects in the parent process, then submits a picklable function (_emit_manual_state_change_event) to the ProcessPoolExecutor. The task instance event handlers (running/success/failed/skipped) just need the same split.
The closures passed to _fork_execute do two things in sequence:
- Collect data (extraction + facet building):
extractor_manager.extract_metadata(...),get_airflow_run_facet(...), date/ID computations -- all of this needs live ORM objects - Build + emit the OL event:
adapter.start_task()/complete_task()/fail_task()-- at this point everything is serializable (OperatorLineage, facet dicts, string IDs, ISO timestamps)
The refactor would run step 1 in the parent process (where ORM objects are available), then submit step 2 to the existing ProcessPoolExecutor (with forkserver context) using only the extracted, picklable data. The adapter's bound methods are already proven to work through the pool (on_dag_run_running passes self.adapter.dag_started through submit_callable).
This is actually safer than the current fork for the Snowflake-class bugs you mentioned. os.fork() in a multi-threaded process copies locked mutexes into the child where they can never be released (the holding thread does not exist in the child). That is the exact deadlock class the Python 3.12 warning exists for. The forkserver-backed pool avoids this entirely by starting workers from a clean, single-threaded server process.
The trade-off: extraction runs in the worker process rather than a forked child. If an extractor hangs, we can wrap it with the existing timeout() context manager (already used in before_stopping) and fall back to an empty OperatorLineage, matching the graceful-degradation behavior of _on_task_instance_manual_state_change. If an extractor corrupts memory, it affects the worker -- but extractors are read-only metadata operations, and the fork's "isolation" against memory corruption was never complete since os.fork() shares file descriptors and socket state anyway.
I can push this refactor if you think the direction is right. The scope would be:
- Refactor
_on_task_instance_running,_on_task_instance_success,_on_task_instance_failed, and_on_task_instance_skippedto extract metadata + build facets in the parent, thensubmit_callablewith picklable args - Add a module-level
_emit_task_instance_eventfunction (mirroring_emit_manual_state_change_event) - Remove
_fork_execute,_terminate_with_wait, and the_executefork dispatch - Keep the
forkservermp_context onProcessPoolExecutor
One thing I'd want to verify during implementation: that OperatorLineage (the extraction result) pickles cleanly across the pool boundary. It should, since it is designed for JSON serialization via Serde.to_json(), but I would add a defensive fallback to empty OperatorLineage() if pickling fails.
Replace _fork_execute (which called os.fork()) with _thread_execute using a daemon thread. This eliminates the DeprecationWarning on Python 3.12+ without suppressing it. Threads share the parent's address space so closures with non-picklable ORM objects work without serialization, and join(timeout) provides the same timeout protection as the old fork path.
Closes #47160
Problem
On Python 3.12+, the OpenLineage listener emits a
DeprecationWarningwhen running in a multi-threaded process:This warning comes from two sources in
listener.py:_fork_executecallsos.fork()directly to run OL event emission in a child processProcessPoolExecutorin theexecutorproperty defaults to theforkstart method on LinuxFix
ProcessPoolExecutor: Changed to use
forkservermultiprocessing context viamp_context=multiprocessing.get_context("forkserver"). The forkserver starts a clean, single-threaded server process that handles subsequent forks safely. All callables and arguments passed throughsubmit_callableare already picklable (module-level functions and basic types), so this is a drop-in change._fork_execute: Added a
warnings.catch_warnings()context around theos.fork()call to suppress the specificDeprecationWarning. The fork here is intentional by design: the child runs user-provided extractors that may not be picklable, ruling out spawn/forkserver for this code path. The existing code already takes proper precautions for safe forking, including ORM reconfiguration with disabled connection pool,os._exit(0)to bypass atexit handlers, and logging shutdown before exit.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_id}.significant.rst, in newsfragments.