Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 916566 returnproc migration #1251

Merged
merged 8 commits into from
Feb 7, 2024
4 changes: 2 additions & 2 deletions lib/_emerge/EbuildMetadataPhase.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def _start(self):
mydbapi=self.portdb,
tree="porttree",
fd_pipes=fd_pipes,
returnpid=True,
returnproc=True,
)
settings.pop("PORTAGE_PIPE_FD", None)

Expand All @@ -137,7 +137,7 @@ def _start(self):
self._async_wait()
return

self._proc = portage.process.Process(retval[0])
self._proc = retval

def _output_handler(self):
while True:
Expand Down
4 changes: 3 additions & 1 deletion lib/_emerge/SpawnProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ def _pipe(self, fd_pipes):
got_pty, master_fd, slave_fd = _create_pty_or_pipe(copy_term_size=stdout_pipe)
return (master_fd, slave_fd)

def _spawn(self, args: list[str], **kwargs) -> portage.process.Process:
def _spawn(
self, args: list[str], **kwargs
) -> portage.process.MultiprocessingProcess:
spawn_func = portage.process.spawn

if self._selinux_type is not None:
Expand Down
47 changes: 37 additions & 10 deletions lib/portage/package/ebuild/doebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tempfile
from textwrap import wrap
import time
from typing import Union
import warnings
import zlib

Expand Down Expand Up @@ -246,14 +247,21 @@ def _doebuild_spawn(phase, settings, actionmap=None, **kwargs):


def _spawn_phase(
phase, settings, actionmap=None, returnpid=False, logfile=None, **kwargs
phase,
settings,
actionmap=None,
returnpid=False,
returnproc=False,
logfile=None,
**kwargs,
):
if returnpid:
if returnproc or returnpid:
return _doebuild_spawn(
phase,
settings,
actionmap=actionmap,
returnpid=returnpid,
returnproc=returnproc,
logfile=logfile,
**kwargs,
)
Expand Down Expand Up @@ -725,7 +733,8 @@ def doebuild(
prev_mtimes=None,
fd_pipes=None,
returnpid=False,
):
returnproc=False,
) -> Union[int, portage.process.MultiprocessingProcess, list[int]]:
"""
Wrapper function that invokes specific ebuild phases through the spawning
of ebuild.sh
Expand Down Expand Up @@ -762,9 +771,15 @@ def doebuild(
for example.
@type fd_pipes: Dictionary
@param returnpid: Return a list of process IDs for a successful spawn, or
an integer value if spawn is unsuccessful. NOTE: This requires the
caller clean up all returned PIDs.
an integer value if spawn is unsuccessful. This parameter is supported
supported only when mydo is "depend". NOTE: This requires the caller clean
up all returned PIDs.
@type returnpid: Boolean
@param returnproc: Return a MultiprocessingProcess instance for a successful spawn, or
an integer value if spawn is unsuccessful. This parameter is supported
supported only when mydo is "depend". NOTE: This requires the caller to
asynchronously wait for the MultiprocessingProcess instance.
@type returnproc: Boolean
@rtype: Boolean
@return:
1. 0 for success
Expand Down Expand Up @@ -867,17 +882,25 @@ def doebuild(
writemsg("\n", noiselevel=-1)
return 1

if returnpid and mydo != "depend":
if (returnproc or returnpid) and mydo != "depend":
# This case is not supported, since it bypasses the EbuildPhase class
# which implements important functionality (including post phase hooks
# and IPC for things like best/has_version and die).
if returnproc:
raise NotImplementedError(f"returnproc not implemented for phase {mydo}")
warnings.warn(
"portage.doebuild() called "
"with returnpid parameter enabled. This usage will "
"not be supported in the future.",
DeprecationWarning,
UserWarning,
stacklevel=2,
)
elif returnpid:
warnings.warn(
"The portage.doebuild() returnpid parameter is deprecated and replaced by returnproc",
UserWarning,
stacklevel=1,
)

if mydo == "fetchall":
fetchall = 1
Expand Down Expand Up @@ -1027,10 +1050,14 @@ def doebuild(

# get possible slot information from the deps file
if mydo == "depend":
if not returnpid:
raise TypeError("returnpid must be True for depend phase")
if not (returnproc or returnpid):
raise TypeError("returnproc or returnpid must be True for depend phase")
return _spawn_phase(
mydo, mysettings, fd_pipes=fd_pipes, returnpid=returnpid
mydo,
mysettings,
fd_pipes=fd_pipes,
returnpid=returnpid,
returnproc=returnproc,
)

if mydo == "nofetch":
Expand Down
134 changes: 99 additions & 35 deletions lib/portage/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from dataclasses import dataclass
from functools import lru_cache
from typing import Any, Optional, Callable
from typing import Any, Optional, Callable, Union

from portage import os
from portage import _encodings
Expand All @@ -28,6 +28,7 @@

portage.proxy.lazyimport.lazyimport(
globals(),
"portage.util._async.ForkProcess:ForkProcess",
"portage.util._eventloop.global_event_loop:global_event_loop",
"portage.util.futures:asyncio",
"portage.util:dump_traceback,writemsg,writemsg_level",
Expand Down Expand Up @@ -296,12 +297,19 @@ def send_signal(self, sig):

class Process(AbstractProcess):
"""
An object that wraps OS processes created by spawn.
In the future, spawn will return objects of a different type
but with a compatible interface to this class, in order
to encapsulate implementation-dependent objects like
multiprocessing.Process which are designed to manage
the process lifecycle and need to persist until it exits.
An object that wraps OS processes which do not have an
associated multiprocessing.Process instance. Ultimately,
we need to stop using os.fork() to create these processes
because it is unsafe for threaded processes as discussed
in https://github.com/python/cpython/issues/84559.

Note that if subprocess.Popen is used without pass_fds
or preexec_fn parameters, then it avoids using os.fork()
by instead using posix_spawn. This approach is not used
by spawn because it needs to execute python code prior
to exec, so it instead uses multiprocessing.Process,
which only uses os.fork() when the multiprocessing start
method is fork.
"""

def __init__(self, pid: int):
Expand Down Expand Up @@ -461,7 +469,7 @@ def spawn(
unshare_mount=False,
unshare_pid=False,
warn_on_large_env=False,
):
) -> Union[int, MultiprocessingProcess, list[int]]:
"""
Spawns a given command.

Expand All @@ -479,8 +487,8 @@ def spawn(
@param returnpid: Return the Process IDs for a successful spawn.
NOTE: This requires the caller clean up all the PIDs, otherwise spawn will clean them.
@type returnpid: Boolean
@param returnproc: Return a Process object for a successful spawn (conflicts with logfile parameter).
NOTE: This requires the caller to asynchronously wait for the Process.
@param returnproc: Return a MultiprocessingProcess instance (conflicts with logfile parameter).
NOTE: This requires the caller to asynchronously wait for the MultiprocessingProcess instance.
@type returnproc: Boolean
@param uid: User ID to spawn as; useful for dropping privilages
@type uid: Integer
Expand Down Expand Up @@ -578,10 +586,10 @@ def spawn(

# Create a tee process, giving it our stdout and stderr
# as well as the read end of the pipe.
mypids.extend(
mypids.append(
spawn(
("tee", "-i", "-a", logfile),
returnpid=True,
returnproc=True,
fd_pipes={0: pr, 1: fd_pipes[1], 2: fd_pipes[2]},
)
)
Expand Down Expand Up @@ -626,7 +634,9 @@ def spawn(
# fork, so that the result is cached in the main process.
bool(groups)

pid = _start_fork(
start_func = _start_proc if returnproc or not returnpid else _start_fork

pid = start_func(
_exec_wrapper,
args=(
binary,
Expand All @@ -652,7 +662,11 @@ def spawn(
close_fds=close_fds,
)

if not isinstance(pid, int):
if returnproc:
# _start_proc returns a MultiprocessingProcess instance.
return pid

if returnpid and not isinstance(pid, int):
raise AssertionError(f"fork returned non-integer: {repr(pid)}")

# Add the pid to our local and the global pid lists.
Expand All @@ -666,15 +680,14 @@ def spawn(
# If the caller wants to handle cleaning up the processes, we tell
# it about all processes that were created.
if returnpid:
if not portage._internal_caller:
warnings.warn(
"The portage.process.spawn returnpid paramenter is deprecated and replaced by returnproc",
UserWarning,
stacklevel=1,
)
warnings.warn(
"The portage.process.spawn returnpid parameter is deprecated and replaced by returnproc",
UserWarning,
stacklevel=1,
)
return mypids
if returnproc:
return Process(mypids[0])

loop = global_event_loop()

# Otherwise we clean them up.
while mypids:
Expand All @@ -684,25 +697,22 @@ def spawn(
pid = mypids.pop(0)

# and wait for it.
retval = os.waitpid(pid, 0)[1]
retval = loop.run_until_complete(pid.wait())

if retval:
# If it failed, kill off anything else that
# isn't dead yet.
for pid in mypids:
# With waitpid and WNOHANG, only check the
# first element of the tuple since the second
# element may vary (bug #337465).
if os.waitpid(pid, os.WNOHANG)[0] == 0:
os.kill(pid, signal.SIGTERM)
os.waitpid(pid, 0)

# If it got a signal, return the signal that was sent.
if retval & 0xFF:
return (retval & 0xFF) << 8
waiter = asyncio.ensure_future(pid.wait(), loop)
try:
loop.run_until_complete(
asyncio.wait_for(asyncio.shield(waiter), 0.001)
)
except (TimeoutError, asyncio.TimeoutError):
pid.terminate()
loop.run_until_complete(waiter)

# Otherwise, return its exit code.
return retval >> 8
return retval

# Everything succeeded
return 0
Expand Down Expand Up @@ -1373,6 +1383,60 @@ def _start_fork(
return pid


class _setup_pipes_after_fork:
def __init__(self, target, fd_pipes):
self._target = target
self._fd_pipes = fd_pipes

def __call__(self, *args, **kwargs):
for fd in set(self._fd_pipes.values()):
os.set_inheritable(fd, True)
_setup_pipes(self._fd_pipes, close_fds=False, inheritable=True)
return self._target(*args, **kwargs)


def _start_proc(
target: Callable[..., None],
args: Optional[tuple[Any, ...]] = (),
kwargs: Optional[dict[str, Any]] = {},
fd_pipes: Optional[dict[int, int]] = None,
close_fds: Optional[bool] = False,
) -> MultiprocessingProcess:
"""
Execute the target function using multiprocess.Process.
If the close_fds parameter is True then NotImplementedError
is raised, since it is risky to forcefully close file
descriptors that have references (bug 374335), and PEP 446
should ensure that any relevant file descriptors are
non-inheritable and therefore automatically closed on exec.
"""
if close_fds:
raise NotImplementedError(
"close_fds is not supported (since file descriptors are non-inheritable by default for exec)"
)

# Manage fd_pipes inheritance for spawn/exec (bug 923755),
# which ForkProcess does not handle because its target
# function does not necessarily exec.
if fd_pipes and multiprocessing.get_start_method() == "fork":
target = _setup_pipes_after_fork(target, fd_pipes)
fd_pipes = None

proc = ForkProcess(
scheduler=global_event_loop(),
target=target,
args=args,
kwargs=kwargs,
fd_pipes=fd_pipes,
create_pipe=False, # Pipe creation is delegated to the caller (see bug 923750).
)
proc.start()

# ForkProcess conveniently holds a MultiprocessingProcess
# instance that is suitable to return here.
return proc._proc


def find_binary(binary):
"""
Given a binary name, find the binary in PATH
Expand Down
Loading
Loading