From 62ebbfb6896fbef168511a2b68331a4096adc4ae Mon Sep 17 00:00:00 2001 From: Uchenna <110002944+I-am-Uchenna@users.noreply.github.com> Date: Tue, 2 Jun 2026 14:37:19 +0100 Subject: [PATCH 1/2] Fix OpenLineage DeprecationWarning about fork() on Python 3.12+ Use forkserver mp_context for ProcessPoolExecutor and suppress DeprecationWarning in _fork_execute where os.fork() is intentional. Closes apache#47160 --- .../providers/openlineage/plugins/listener.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 2d4d74e828f09..962501875a786 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -17,8 +17,10 @@ from __future__ import annotations import logging +import multiprocessing import os import sys +import warnings from concurrent.futures import ProcessPoolExecutor from concurrent.futures.process import BrokenProcessPool from datetime import datetime @@ -834,7 +836,17 @@ def _terminate_with_wait(self, process: psutil.Process): def _fork_execute(self, callable, callable_name: str): self.log.debug("Will fork to execute OpenLineage process.") - pid = os.fork() + with warnings.catch_warnings(): + # On Python 3.12+, os.fork() in a multi-threaded process emits a + # DeprecationWarning. The fork here is intentional and the child + # takes precautions (ORM reconfiguration, os._exit) so the warning + # is safe to suppress. + warnings.filterwarnings( + "ignore", + message=".*use of fork\\(\\) may lead to deadlocks in the child", + category=DeprecationWarning, + ) + pid = os.fork() if pid: process = psutil.Process(pid) try: @@ -879,6 +891,7 @@ def executor(self) -> ProcessPoolExecutor: self._executor = ProcessPoolExecutor( max_workers=conf.dag_state_change_process_pool_size(), initializer=_executor_initializer, + mp_context=multiprocessing.get_context("forkserver"), ) return self._executor From 7912a81d3a6494825e07aa4dbb365bd9e5139c5f Mon Sep 17 00:00:00 2001 From: Uchenna <110002944+I-am-Uchenna@users.noreply.github.com> Date: Tue, 2 Jun 2026 16:59:10 +0100 Subject: [PATCH 2/2] Replace os.fork() with threading.Thread in _fork_execute Replace _fork_execute (which called os.fork()) with _thread_execute using a daemon thread. This eliminates the DeprecationWarning on Python 3.12+ without suppressing it. Threads share the parent's address space so closures with non-picklable ORM objects work without serialization, and join(timeout) provides the same timeout protection as the old fork path. --- .../providers/openlineage/plugins/listener.py | 94 ++++++++----------- 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 962501875a786..c259eb2963e43 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -20,7 +20,7 @@ import multiprocessing import os import sys -import warnings +import threading from concurrent.futures import ProcessPoolExecutor from concurrent.futures.process import BrokenProcessPool from datetime import datetime @@ -821,69 +821,57 @@ def _on_task_instance_manual_state_change( def _execute(self, callable, callable_name: str, use_fork: bool = False): if use_fork: - self._fork_execute(callable, callable_name) + self._thread_execute(callable, callable_name) else: callable() - def _terminate_with_wait(self, process: psutil.Process): - process.terminate() - try: - # Waiting for max 3 seconds to make sure process can clean up before being killed. - process.wait(timeout=3) - except psutil.TimeoutExpired: - # If it's not dead by then, then force kill. - process.kill() - - def _fork_execute(self, callable, callable_name: str): - self.log.debug("Will fork to execute OpenLineage process.") - with warnings.catch_warnings(): - # On Python 3.12+, os.fork() in a multi-threaded process emits a - # DeprecationWarning. The fork here is intentional and the child - # takes precautions (ORM reconfiguration, os._exit) so the warning - # is safe to suppress. - warnings.filterwarnings( - "ignore", - message=".*use of fork\\(\\) may lead to deadlocks in the child", - category=DeprecationWarning, + + def _thread_execute(self, callable, callable_name: str): + """Execute callable in a daemon thread with timeout. + + Replaces the previous ``os.fork()`` approach to avoid the + ``DeprecationWarning`` on Python 3.12+ about forking in + multi-threaded processes (which can also lead to deadlocks + when a thread holds a lock at fork time). + + A daemon thread shares the parent's address space, so the + callable's closures (which capture non-picklable ORM models + and extractors) work without serialization. ``join(timeout)`` + provides the same timeout protection as the old fork path. + """ + self.log.debug("Will execute OpenLineage callable in thread.") + + def _target(): + self.log.debug( + "Executing OpenLineage process - %s - thread %s", + callable_name, + threading.current_thread().name, ) - pid = os.fork() - if pid: - process = psutil.Process(pid) - try: - self.log.debug("Waiting for process %s", pid) - process.wait(conf.execution_timeout()) - except psutil.TimeoutExpired: - self.log.warning( - "OpenLineage process with pid `%s` expired and will be terminated by listener. " - "This has no impact on actual task execution status.", - pid, - ) - self._terminate_with_wait(process) - except BaseException: - # Kill the process directly. - self._terminate_with_wait(process) - self.log.debug("Process with pid %s finished - parent", pid) - else: - setproctitle(getproctitle() + " - OpenLineage - " + callable_name) - if not AIRFLOW_V_3_0_PLUS: - configure_orm(disable_connection_pool=True) - self.log.debug("Executing OpenLineage process - %s - pid %s", callable_name, os.getpid()) try: callable() - self.log.debug("Process with current pid finishes after %s", callable_name) + self.log.debug("Thread finishes after %s", callable_name) except Exception: self.log.warning( - "OpenLineage %s process failed. This has no impact on actual task execution status.", + "OpenLineage %s thread failed. This has no impact on actual task execution status.", callable_name, exc_info=True, ) - finally: - # os._exit(0) bypasses Python's atexit/stdio flush. Explicitly shut down - # logging so buffered records (including any warnings above) are flushed - # before the process exits. Without this, the final log lines are silently - # dropped, making failures invisible. - logging.shutdown() - os._exit(0) + + thread = threading.Thread( + target=_target, + name=f"OpenLineage-{callable_name}", + daemon=True, + ) + thread.start() + thread.join(timeout=conf.execution_timeout()) + if thread.is_alive(): + self.log.warning( + "OpenLineage thread %r did not finish within %s seconds. " + "Continuing without waiting. " + "This has no impact on actual task execution status.", + callable_name, + conf.execution_timeout(), + ) @property def executor(self) -> ProcessPoolExecutor: