Skip to content

Commit

Permalink
Rewriting the server module to eliminate races and zombies (#23)
Browse files Browse the repository at this point in the history
* test(server): apply os.setpgrp() as well here
* test(server): Rewrite the test suite
* test: Suppresss bogus warnings
* fix: UnboundLocalError in _fork_posix()
* fix/breaking: Rename fork.fork() to fork.afork() to avoid naming conflicts
  • Loading branch information
achimnol committed Jan 11, 2021
1 parent e664a71 commit f735b70
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 577 deletions.
1 change: 1 addition & 0 deletions changes/23.breaking
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**server:** The `use_threading` argument for `start_server()` is completed deprecated.
1 change: 1 addition & 0 deletions changes/23.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**server:** Completely rewrote the module using the new `fork` module with handling of various edge cases such as async failures of sibiling child processes
7 changes: 5 additions & 2 deletions src/aiotools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from . import (
context,
defer as _defer,
fork as _fork,
func,
iter as _iter,
timer,
taskgroup,
server,
taskgroup,
timer,
)

import pkgutil
Expand All @@ -19,6 +20,7 @@
__all__ = (
*context.__all__,
*_defer.__all__,
*_fork.__all__,
*func.__all__,
*_iter.__all__,
*server.__all__,
Expand All @@ -29,6 +31,7 @@

from .context import * # noqa
from .defer import * # noqa
from .fork import * # noqa
from .func import * # noqa
from .iter import * # noqa
from .taskgroup import * # noqa
Expand Down
50 changes: 35 additions & 15 deletions src/aiotools/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@
c_void_p,
cast,
)
from typing import Callable, Optional, Tuple
from typing import Callable, Tuple

from .compat import get_running_loop

logger = logging.getLogger(__package__)
__all__ = (
'AbstractChildProcess',
'PosixChildProcess',
'PidfdChildProcess',
'afork',
)

logger = logging.getLogger(__name__)

_libc = ctypes.CDLL(None)
_syscall = _libc.syscall
Expand Down Expand Up @@ -57,8 +64,17 @@ class PosixChildProcess(AbstractChildProcess):

def __init__(self, pid: int) -> None:
self._pid = pid
self._terminated = False

def send_signal(self, signum: int) -> None:
if self._terminated:
logger.warning(
"PosixChildProcess(%d).send_signal(%d): "
"The process has already terminated.",
self._pid,
signum,
)
return
os.kill(self._pid, signum)

async def wait(self) -> int:
Expand All @@ -80,6 +96,8 @@ async def wait(self) -> int:
self._returncode = os.WEXITSTATUS(status)
else:
self._returncode = status
finally:
self._terminated = True
return self._returncode


Expand All @@ -90,8 +108,18 @@ def __init__(self, pid: int, pidfd: int) -> None:
self._pidfd = pidfd
self._returncode = None
self._wait_event = asyncio.Event()
self._terminated = False

def send_signal(self, signum: int) -> None:
if self._terminated:
logger.warning(
"PidfdChildProcess(%d, %d).send_signal(%d): "
"The process has already terminated.",
self._pid,
self._pidfd,
signum,
)
return
signal.pidfd_send_signal(self._pidfd, signum) # type: ignore

def _do_wait(self):
Expand Down Expand Up @@ -125,6 +153,7 @@ def _do_wait(self):
finally:
loop.remove_reader(self._pidfd)
os.close(self._pidfd)
self._terminated = True
self._wait_event.set()

async def wait(self) -> int:
Expand All @@ -138,10 +167,6 @@ async def wait(self) -> int:
def _child_main(init_func, init_pipe, child_func: Callable[[], int]) -> int:
if init_func is not None:
init_func()
signal.pthread_sigmask(
signal.SIG_UNBLOCK,
(signal.SIGCHLD,)
)
# notify the parent that the child is ready to execute the requested function.
os.write(init_pipe, b"\0")
os.close(init_pipe)
Expand All @@ -154,14 +179,13 @@ async def _fork_posix(child_func: Callable[[], int]) -> int:
init_event = asyncio.Event()
loop.add_reader(init_pipe[0], init_event.set)

signal.pthread_sigmask(
signal.SIG_BLOCK,
(signal.SIGCHLD,),
)
pid = os.fork()
if pid == 0:
ret = 0
try:
ret = _child_main(None, init_pipe[1], child_func)
except KeyboardInterrupt:
ret = -signal.SIGINT
finally:
os._exit(ret)

Expand Down Expand Up @@ -197,10 +221,6 @@ async def _clone_pidfd(child_func: Callable[[], int]) -> Tuple[int, int]:
)
)
stack_top = c_void_p(cast(stack, c_void_p).value + stack_size) # type: ignore
signal.pthread_sigmask(
signal.SIG_BLOCK,
(signal.SIGCHLD,),
)
ctypes.pythonapi.PyOS_BeforeFork()
# The flag value is CLONE_PIDFD from linux/sched.h
pid = _libc.clone(func, stack_top, 0x1000, 0, byref(fd))
Expand All @@ -217,7 +237,7 @@ async def _clone_pidfd(child_func: Callable[[], int]) -> Tuple[int, int]:
return pid, fd.value


async def fork(child_func: Callable[[], int]) -> Optional[AbstractChildProcess]:
async def afork(child_func: Callable[[], int]) -> AbstractChildProcess:
if _has_pidfd:
pid, pidfd = await _clone_pidfd(child_func)
return PidfdChildProcess(pid, pidfd)
Expand Down

0 comments on commit f735b70

Please sign in to comment.