diff --git a/lib/_emerge/EbuildMetadataPhase.py b/lib/_emerge/EbuildMetadataPhase.py index a7c9650d74..f4f685e81c 100644 --- a/lib/_emerge/EbuildMetadataPhase.py +++ b/lib/_emerge/EbuildMetadataPhase.py @@ -124,7 +124,7 @@ def _start(self): mydbapi=self.portdb, tree="porttree", fd_pipes=fd_pipes, - returnpid=True, + returnproc=True, ) settings.pop("PORTAGE_PIPE_FD", None) @@ -137,7 +137,7 @@ def _start(self): self._async_wait() return - self._proc = portage.process.Process(retval[0]) + self._proc = retval def _output_handler(self): while True: diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py index 716e94d665..b63afae01c 100644 --- a/lib/_emerge/SpawnProcess.py +++ b/lib/_emerge/SpawnProcess.py @@ -241,7 +241,9 @@ def _pipe(self, fd_pipes): got_pty, master_fd, slave_fd = _create_pty_or_pipe(copy_term_size=stdout_pipe) return (master_fd, slave_fd) - def _spawn(self, args: list[str], **kwargs) -> portage.process.Process: + def _spawn( + self, args: list[str], **kwargs + ) -> portage.process.MultiprocessingProcess: spawn_func = portage.process.spawn if self._selinux_type is not None: diff --git a/lib/portage/package/ebuild/doebuild.py b/lib/portage/package/ebuild/doebuild.py index e10b884e08..1c89af5ac8 100644 --- a/lib/portage/package/ebuild/doebuild.py +++ b/lib/portage/package/ebuild/doebuild.py @@ -19,6 +19,7 @@ import tempfile from textwrap import wrap import time +from typing import Union import warnings import zlib @@ -246,14 +247,21 @@ def _doebuild_spawn(phase, settings, actionmap=None, **kwargs): def _spawn_phase( - phase, settings, actionmap=None, returnpid=False, logfile=None, **kwargs + phase, + settings, + actionmap=None, + returnpid=False, + returnproc=False, + logfile=None, + **kwargs, ): - if returnpid: + if returnproc or returnpid: return _doebuild_spawn( phase, settings, actionmap=actionmap, returnpid=returnpid, + returnproc=returnproc, logfile=logfile, **kwargs, ) @@ -725,7 +733,8 @@ def doebuild( prev_mtimes=None, fd_pipes=None, returnpid=False, -): + returnproc=False, +) -> Union[int, portage.process.MultiprocessingProcess, list[int]]: """ Wrapper function that invokes specific ebuild phases through the spawning of ebuild.sh @@ -762,9 +771,15 @@ def doebuild( for example. @type fd_pipes: Dictionary @param returnpid: Return a list of process IDs for a successful spawn, or - an integer value if spawn is unsuccessful. NOTE: This requires the - caller clean up all returned PIDs. + an integer value if spawn is unsuccessful. This parameter is supported + supported only when mydo is "depend". NOTE: This requires the caller clean + up all returned PIDs. @type returnpid: Boolean + @param returnproc: Return a MultiprocessingProcess instance for a successful spawn, or + an integer value if spawn is unsuccessful. This parameter is supported + supported only when mydo is "depend". NOTE: This requires the caller to + asynchronously wait for the MultiprocessingProcess instance. + @type returnproc: Boolean @rtype: Boolean @return: 1. 0 for success @@ -867,17 +882,25 @@ def doebuild( writemsg("\n", noiselevel=-1) return 1 - if returnpid and mydo != "depend": + if (returnproc or returnpid) and mydo != "depend": # This case is not supported, since it bypasses the EbuildPhase class # which implements important functionality (including post phase hooks # and IPC for things like best/has_version and die). + if returnproc: + raise NotImplementedError(f"returnproc not implemented for phase {mydo}") warnings.warn( "portage.doebuild() called " "with returnpid parameter enabled. This usage will " "not be supported in the future.", - DeprecationWarning, + UserWarning, stacklevel=2, ) + elif returnpid: + warnings.warn( + "The portage.doebuild() returnpid parameter is deprecated and replaced by returnproc", + UserWarning, + stacklevel=1, + ) if mydo == "fetchall": fetchall = 1 @@ -1027,10 +1050,14 @@ def doebuild( # get possible slot information from the deps file if mydo == "depend": - if not returnpid: - raise TypeError("returnpid must be True for depend phase") + if not (returnproc or returnpid): + raise TypeError("returnproc or returnpid must be True for depend phase") return _spawn_phase( - mydo, mysettings, fd_pipes=fd_pipes, returnpid=returnpid + mydo, + mysettings, + fd_pipes=fd_pipes, + returnpid=returnpid, + returnproc=returnproc, ) if mydo == "nofetch": diff --git a/lib/portage/process.py b/lib/portage/process.py index b223ecb887..a33e7b4747 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -19,7 +19,7 @@ from dataclasses import dataclass from functools import lru_cache -from typing import Any, Optional, Callable +from typing import Any, Optional, Callable, Union from portage import os from portage import _encodings @@ -28,6 +28,7 @@ portage.proxy.lazyimport.lazyimport( globals(), + "portage.util._async.ForkProcess:ForkProcess", "portage.util._eventloop.global_event_loop:global_event_loop", "portage.util.futures:asyncio", "portage.util:dump_traceback,writemsg,writemsg_level", @@ -296,12 +297,19 @@ def send_signal(self, sig): class Process(AbstractProcess): """ - An object that wraps OS processes created by spawn. - In the future, spawn will return objects of a different type - but with a compatible interface to this class, in order - to encapsulate implementation-dependent objects like - multiprocessing.Process which are designed to manage - the process lifecycle and need to persist until it exits. + An object that wraps OS processes which do not have an + associated multiprocessing.Process instance. Ultimately, + we need to stop using os.fork() to create these processes + because it is unsafe for threaded processes as discussed + in https://github.com/python/cpython/issues/84559. + + Note that if subprocess.Popen is used without pass_fds + or preexec_fn parameters, then it avoids using os.fork() + by instead using posix_spawn. This approach is not used + by spawn because it needs to execute python code prior + to exec, so it instead uses multiprocessing.Process, + which only uses os.fork() when the multiprocessing start + method is fork. """ def __init__(self, pid: int): @@ -461,7 +469,7 @@ def spawn( unshare_mount=False, unshare_pid=False, warn_on_large_env=False, -): +) -> Union[int, MultiprocessingProcess, list[int]]: """ Spawns a given command. @@ -479,8 +487,8 @@ def spawn( @param returnpid: Return the Process IDs for a successful spawn. NOTE: This requires the caller clean up all the PIDs, otherwise spawn will clean them. @type returnpid: Boolean - @param returnproc: Return a Process object for a successful spawn (conflicts with logfile parameter). - NOTE: This requires the caller to asynchronously wait for the Process. + @param returnproc: Return a MultiprocessingProcess instance (conflicts with logfile parameter). + NOTE: This requires the caller to asynchronously wait for the MultiprocessingProcess instance. @type returnproc: Boolean @param uid: User ID to spawn as; useful for dropping privilages @type uid: Integer @@ -578,10 +586,10 @@ def spawn( # Create a tee process, giving it our stdout and stderr # as well as the read end of the pipe. - mypids.extend( + mypids.append( spawn( ("tee", "-i", "-a", logfile), - returnpid=True, + returnproc=True, fd_pipes={0: pr, 1: fd_pipes[1], 2: fd_pipes[2]}, ) ) @@ -626,7 +634,9 @@ def spawn( # fork, so that the result is cached in the main process. bool(groups) - pid = _start_fork( + start_func = _start_proc if returnproc or not returnpid else _start_fork + + pid = start_func( _exec_wrapper, args=( binary, @@ -652,7 +662,11 @@ def spawn( close_fds=close_fds, ) - if not isinstance(pid, int): + if returnproc: + # _start_proc returns a MultiprocessingProcess instance. + return pid + + if returnpid and not isinstance(pid, int): raise AssertionError(f"fork returned non-integer: {repr(pid)}") # Add the pid to our local and the global pid lists. @@ -666,15 +680,14 @@ def spawn( # If the caller wants to handle cleaning up the processes, we tell # it about all processes that were created. if returnpid: - if not portage._internal_caller: - warnings.warn( - "The portage.process.spawn returnpid paramenter is deprecated and replaced by returnproc", - UserWarning, - stacklevel=1, - ) + warnings.warn( + "The portage.process.spawn returnpid parameter is deprecated and replaced by returnproc", + UserWarning, + stacklevel=1, + ) return mypids - if returnproc: - return Process(mypids[0]) + + loop = global_event_loop() # Otherwise we clean them up. while mypids: @@ -684,25 +697,22 @@ def spawn( pid = mypids.pop(0) # and wait for it. - retval = os.waitpid(pid, 0)[1] + retval = loop.run_until_complete(pid.wait()) if retval: # If it failed, kill off anything else that # isn't dead yet. for pid in mypids: - # With waitpid and WNOHANG, only check the - # first element of the tuple since the second - # element may vary (bug #337465). - if os.waitpid(pid, os.WNOHANG)[0] == 0: - os.kill(pid, signal.SIGTERM) - os.waitpid(pid, 0) - - # If it got a signal, return the signal that was sent. - if retval & 0xFF: - return (retval & 0xFF) << 8 + waiter = asyncio.ensure_future(pid.wait(), loop) + try: + loop.run_until_complete( + asyncio.wait_for(asyncio.shield(waiter), 0.001) + ) + except (TimeoutError, asyncio.TimeoutError): + pid.terminate() + loop.run_until_complete(waiter) - # Otherwise, return its exit code. - return retval >> 8 + return retval # Everything succeeded return 0 @@ -1373,6 +1383,60 @@ def _start_fork( return pid +class _setup_pipes_after_fork: + def __init__(self, target, fd_pipes): + self._target = target + self._fd_pipes = fd_pipes + + def __call__(self, *args, **kwargs): + for fd in set(self._fd_pipes.values()): + os.set_inheritable(fd, True) + _setup_pipes(self._fd_pipes, close_fds=False, inheritable=True) + return self._target(*args, **kwargs) + + +def _start_proc( + target: Callable[..., None], + args: Optional[tuple[Any, ...]] = (), + kwargs: Optional[dict[str, Any]] = {}, + fd_pipes: Optional[dict[int, int]] = None, + close_fds: Optional[bool] = False, +) -> MultiprocessingProcess: + """ + Execute the target function using multiprocess.Process. + If the close_fds parameter is True then NotImplementedError + is raised, since it is risky to forcefully close file + descriptors that have references (bug 374335), and PEP 446 + should ensure that any relevant file descriptors are + non-inheritable and therefore automatically closed on exec. + """ + if close_fds: + raise NotImplementedError( + "close_fds is not supported (since file descriptors are non-inheritable by default for exec)" + ) + + # Manage fd_pipes inheritance for spawn/exec (bug 923755), + # which ForkProcess does not handle because its target + # function does not necessarily exec. + if fd_pipes and multiprocessing.get_start_method() == "fork": + target = _setup_pipes_after_fork(target, fd_pipes) + fd_pipes = None + + proc = ForkProcess( + scheduler=global_event_loop(), + target=target, + args=args, + kwargs=kwargs, + fd_pipes=fd_pipes, + create_pipe=False, # Pipe creation is delegated to the caller (see bug 923750). + ) + proc.start() + + # ForkProcess conveniently holds a MultiprocessingProcess + # instance that is suitable to return here. + return proc._proc + + def find_binary(binary): """ Given a binary name, find the binary in PATH diff --git a/lib/portage/sync/modules/rsync/rsync.py b/lib/portage/sync/modules/rsync/rsync.py index 175c7f2e8e..5d442d2626 100644 --- a/lib/portage/sync/modules/rsync/rsync.py +++ b/lib/portage/sync/modules/rsync/rsync.py @@ -1,4 +1,4 @@ -# Copyright 1999-2023 Gentoo Authors +# Copyright 1999-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import datetime @@ -708,48 +708,47 @@ def _do_rsync(self, syncuri, timestamp, opts): command.append(syncuri.rstrip("/") + "/metadata/timestamp.chk") command.append(tmpservertimestampfile) content = None - pids = [] + proc = None + proc_waiter = None + loop = asyncio.get_event_loop() try: # Timeout here in case the server is unresponsive. The # --timeout rsync option doesn't apply to the initial # connection attempt. try: - if self.rsync_initial_timeout: - portage.exception.AlarmSignal.register(self.rsync_initial_timeout) - - pids.extend( - portage.process.spawn(command, returnpid=True, **self.spawn_kwargs) + proc = portage.process.spawn( + command, returnproc=True, **self.spawn_kwargs + ) + proc_waiter = asyncio.ensure_future(proc.wait(), loop) + future = ( + asyncio.wait_for( + asyncio.shield(proc_waiter), self.rsync_initial_timeout + ) + if self.rsync_initial_timeout + else proc_waiter ) - exitcode = os.waitpid(pids[0], 0)[1] + exitcode = loop.run_until_complete(future) if self.usersync_uid is not None: portage.util.apply_permissions( tmpservertimestampfile, uid=os.getuid() ) content = portage.grabfile(tmpservertimestampfile) finally: - if self.rsync_initial_timeout: - portage.exception.AlarmSignal.unregister() try: os.unlink(tmpservertimestampfile) except OSError: pass - except portage.exception.AlarmSignal: + except (TimeoutError, asyncio.TimeoutError): # timed out print("timed out") # With waitpid and WNOHANG, only check the # first element of the tuple since the second # element may vary (bug #337465). - if pids and os.waitpid(pids[0], os.WNOHANG)[0] == 0: - os.kill(pids[0], signal.SIGTERM) - os.waitpid(pids[0], 0) + if proc_waiter and not proc_waiter.done(): + proc.terminate() + loop.run_until_complete(proc_waiter) # This is the same code rsync uses for timeout. exitcode = 30 - else: - if exitcode != os.EX_OK: - if exitcode & 0xFF: - exitcode = (exitcode & 0xFF) << 8 - else: - exitcode = exitcode >> 8 if content: try: @@ -758,7 +757,6 @@ def _do_rsync(self, syncuri, timestamp, opts): ) except (OverflowError, ValueError): pass - del command, pids, content if exitcode == os.EX_OK: if (servertimestamp != 0) and (servertimestamp == timestamp): diff --git a/lib/portage/tests/util/futures/test_retry.py b/lib/portage/tests/util/futures/test_retry.py index a5b56bdc7f..6bd3f4b64a 100644 --- a/lib/portage/tests/util/futures/test_retry.py +++ b/lib/portage/tests/util/futures/test_retry.py @@ -1,4 +1,4 @@ -# Copyright 2018-2023 Gentoo Authors +# Copyright 2018-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 from concurrent.futures import Future, ThreadPoolExecutor @@ -9,7 +9,6 @@ import weakref import time -import portage from portage.tests import TestCase from portage.util._eventloop.global_event_loop import global_event_loop from portage.util.backoff import RandomExponentialBackoff @@ -229,16 +228,19 @@ def tearDown(self): @contextlib.contextmanager def _wrap_coroutine_func(self, coroutine_func): + uses_subprocess = isinstance(self._executor, ForkExecutor) parent_loop = global_event_loop() - parent_pid = portage.getpid() pending = weakref.WeakValueDictionary() # Since ThreadPoolExecutor does not propagate cancellation of a # parent_future to the underlying coroutine, use kill_switch to # propagate task cancellation to wrapper, so that HangForever's # thread returns when retry eventually cancels parent_future. - def wrapper(kill_switch): - if portage.getpid() == parent_pid: + if uses_subprocess: + wrapper = _run_coroutine_in_subprocess(coroutine_func) + else: + + def wrapper(kill_switch): # thread in main process def done_callback(result): result.cancelled() or result.exception() or result.result() @@ -262,22 +264,19 @@ def start_coroutine(future): else: return future.result().result() - # child process - loop = global_event_loop() - try: - return loop.run_until_complete(coroutine_func()) - finally: - loop.close() - def execute_wrapper(): - kill_switch = threading.Event() + # Use kill_switch for threads because they can't be killed + # like processes. Do not pass kill_switch to subprocesses + # because it is not picklable. + kill_switch = None if uses_subprocess else threading.Event() + wrapper_args = [kill_switch] if kill_switch else [] parent_future = asyncio.ensure_future( - parent_loop.run_in_executor(self._executor, wrapper, kill_switch), + parent_loop.run_in_executor(self._executor, wrapper, *wrapper_args), loop=parent_loop, ) def kill_callback(parent_future): - if not kill_switch.is_set(): + if kill_switch is not None and not kill_switch.is_set(): kill_switch.set() parent_future.add_done_callback(kill_callback) @@ -298,6 +297,19 @@ def kill_callback(parent_future): future.cancelled() or future.exception() or future.result() +class _run_coroutine_in_subprocess: + def __init__(self, coroutine_func): + self._coroutine_func = coroutine_func + + def __call__(self): + # child process + loop = global_event_loop() + try: + return loop.run_until_complete(self._coroutine_func()) + finally: + loop.close() + + class RetryThreadExecutorTestCase(RetryForkExecutorTestCase): def _setUpExecutor(self): self._executor = ThreadPoolExecutor(max_workers=1) diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index cb240d0712..ebcbd94107 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -153,15 +153,24 @@ def _send_fd_pipes(self): This performs blocking IO, intended for invocation via run_in_executor. """ fd_list = list(set(self._fd_pipes.values())) - self._files.connection.send( - (self._fd_pipes, fd_list), - ) - for fd in fd_list: - multiprocessing.reduction.send_handle( - self._files.connection, - fd, - self.pid, + try: + self._files.connection.send( + (self._fd_pipes, fd_list), ) + for fd in fd_list: + multiprocessing.reduction.send_handle( + self._files.connection, + fd, + self.pid, + ) + except BrokenPipeError as e: + # This case is triggered by testAsynchronousLockWaitCancel + # when the test case terminates the child process while + # this thread is still sending the fd_pipes (bug 923852). + # Even if the child terminated abnormally, then there is + # no harm in suppressing the exception here, since the + # child error should have gone to stderr. + raise asyncio.CancelledError from e # self._fd_pipes contains duplicates that must be closed. for fd in fd_list: diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py index a5a6cb3a5b..8f1b8e8275 100644 --- a/lib/portage/util/futures/_asyncio/__init__.py +++ b/lib/portage/util/futures/_asyncio/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2018-2021 Gentoo Authors +# Copyright 2018-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 __all__ = ( @@ -15,9 +15,11 @@ "set_child_watcher", "get_event_loop_policy", "set_event_loop_policy", + "shield", "sleep", "Task", "wait", + "wait_for", ) import types @@ -33,7 +35,9 @@ FIRST_EXCEPTION, Future, InvalidStateError, + shield, TimeoutError, + wait_for, ) import threading diff --git a/lib/portage/util/futures/executor/fork.py b/lib/portage/util/futures/executor/fork.py index 61ad6aecfb..1e3d010724 100644 --- a/lib/portage/util/futures/executor/fork.py +++ b/lib/portage/util/futures/executor/fork.py @@ -1,4 +1,4 @@ -# Copyright 2018 Gentoo Foundation +# Copyright 2018-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 __all__ = ("ForkExecutor",) @@ -40,7 +40,9 @@ def submit(self, fn, *args, **kwargs): """ future = self._loop.create_future() proc = AsyncFunction( - target=functools.partial(self._guarded_fn_call, fn, args, kwargs) + target=functools.partial(self._guarded_fn_call, fn, args, kwargs), + # Directly inherit stdio streams and run in the foreground with no log. + create_pipe=False, ) self._submit_queue.append((future, proc)) self._schedule() diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py index fedb8599d5..6c68ff4106 100644 --- a/lib/portage/util/socks5.py +++ b/lib/portage/util/socks5.py @@ -1,10 +1,9 @@ # SOCKSv5 proxy manager for network-sandbox -# Copyright 2015-2021 Gentoo Authors +# Copyright 2015-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import errno import os -import signal import socket import portage.data @@ -22,7 +21,8 @@ class ProxyManager: def __init__(self): self.socket_path = None - self._pids = [] + self._proc = None + self._proc_waiter = None def start(self, settings): """ @@ -51,9 +51,9 @@ def start(self, settings): spawn_kwargs.update( uid=portage_uid, gid=portage_gid, groups=userpriv_groups, umask=0o077 ) - self._pids = spawn( + self._proc = spawn( [_python_interpreter, server_bin, self.socket_path], - returnpid=True, + returnproc=True, **spawn_kwargs, ) @@ -61,12 +61,19 @@ def stop(self): """ Stop the SOCKSv5 server. """ - for p in self._pids: - os.kill(p, signal.SIGINT) - os.waitpid(p, 0) + if self._proc is not None: + self._proc.terminate() + loop = asyncio.get_event_loop() + if self._proc_waiter is None: + self._proc_waiter = asyncio.ensure_future(self._proc.wait(), loop) + if loop.is_running(): + self._proc_waiter.add_done_callback(lambda future: future.result()) + else: + loop.run_until_complete(self._proc_waiter) self.socket_path = None - self._pids = [] + self._proc = None + self._proc_waiter = None def is_running(self): """ @@ -80,16 +87,11 @@ async def ready(self): """ Wait for the proxy socket to become ready. This method is a coroutine. """ + if self._proc_waiter is None: + self._proc_waiter = asyncio.ensure_future(self._proc.wait()) while True: - try: - wait_retval = os.waitpid(self._pids[0], os.WNOHANG) - except OSError as e: - if e.errno == errno.EINTR: - continue - raise - - if wait_retval is not None and wait_retval != (0, 0): + if self._proc_waiter.done(): raise OSError(3, "No such process") try: