Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix - uvloop freeze when something other than uvloop invoke process f…
…ork and execute non-python process Problem: Uvloop for each loop register `atFork` handler that is called after fork is executed by forked child. This handler works fine when fork was invoked by uvloop. In case fork is invoked by something else (such as external library) uvloop freeze in this handler because: - GIL is acquired inside `atFork` handler -> in case forked child does not contain python runtime `atFork` handler freeze at obtaining GIL - when compiled in debug mode (`make debug`) cython trace calls are inserted inside compiled `atFork` handler -> in case forked child does not contain python runtime `atFork` handler freeze at providing trace call Solution: This fix solve described problems by implementing `atFork` handler as C function so that forked child can call it safely whether or not contains python runtime.
- Loading branch information
1 parent
1a0d657
commit fde5d14
Showing
5 changed files
with
140 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
import asyncio | ||
import ctypes.util | ||
import logging | ||
from concurrent.futures import ThreadPoolExecutor | ||
from threading import Thread | ||
from unittest import TestCase | ||
|
||
import uvloop | ||
|
||
|
||
class ProcessSpawningTestCollection(TestCase): | ||
|
||
def test_spawning_external_process(self): | ||
"""Test spawning external process (using `popen` system call) that | ||
cause loop freeze.""" | ||
|
||
async def run(loop): | ||
event = asyncio.Event(loop=loop) | ||
|
||
dummy_workers = [simulate_loop_activity(loop, event) | ||
for _ in range(5)] | ||
spawn_worker = spawn_external_process(loop, event) | ||
done, pending = await asyncio.wait([spawn_worker] + dummy_workers, | ||
loop=loop) | ||
exceptions = [result.exception() | ||
for result in done if result.exception()] | ||
if exceptions: | ||
raise exceptions[0] | ||
|
||
return True | ||
|
||
async def simulate_loop_activity(loop, done_event): | ||
"""Simulate loop activity by busy waiting for event.""" | ||
while True: | ||
try: | ||
await asyncio.wait_for(done_event.wait(), | ||
timeout=0.1, loop=loop) | ||
except asyncio.TimeoutError: | ||
pass | ||
|
||
if done_event.is_set(): | ||
return None | ||
|
||
async def spawn_external_process(loop, event): | ||
executor = ThreadPoolExecutor() | ||
try: | ||
call = loop.run_in_executor(executor, spawn_process) | ||
await asyncio.wait_for(call, loop=loop, timeout=3600) | ||
finally: | ||
event.set() | ||
executor.shutdown(wait=False) | ||
return True | ||
|
||
BUFFER_LENGTH = 1025 | ||
BufferType = ctypes.c_char * (BUFFER_LENGTH - 1) | ||
|
||
def run_echo(popen, fread, pclose): | ||
fd = popen('echo test'.encode('ASCII'), 'r'.encode('ASCII')) | ||
try: | ||
while True: | ||
buffer = BufferType() | ||
data = ctypes.c_void_p(ctypes.addressof(buffer)) | ||
|
||
# -> this call will freeze whole loop in case of bug | ||
read = fread(data, 1, BUFFER_LENGTH, fd) | ||
if not read: | ||
break | ||
except Exception: | ||
logging.getLogger().exception('read error') | ||
raise | ||
finally: | ||
pclose(fd) | ||
|
||
def spawn_process(): | ||
"""Spawn external process via `popen` system call.""" | ||
|
||
stdio = ctypes.CDLL(ctypes.util.find_library('c')) | ||
|
||
# popen system call | ||
popen = stdio.popen | ||
popen.argtypes = (ctypes.c_char_p, ctypes.c_char_p) | ||
popen.restype = ctypes.c_void_p | ||
|
||
# pclose system call | ||
pclose = stdio.pclose | ||
pclose.argtypes = (ctypes.c_void_p,) | ||
pclose.restype = ctypes.c_int | ||
|
||
# fread system call | ||
fread = stdio.fread | ||
fread.argtypes = (ctypes.c_void_p, ctypes.c_size_t, | ||
ctypes.c_size_t, ctypes.c_void_p) | ||
fread.restype = ctypes.c_size_t | ||
|
||
for iteration in range(1000): | ||
t = Thread(target=run_echo, | ||
args=(popen, fread, pclose), | ||
daemon=True) | ||
t.start() | ||
t.join(timeout=10.0) | ||
if t.is_alive(): | ||
raise Exception('process freeze detected at {}' | ||
.format(iteration)) | ||
|
||
return True | ||
|
||
loop = uvloop.new_event_loop() | ||
proc = loop.run_until_complete(run(loop)) | ||
self.assertTrue(proc) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
|
||
typedef void (*OnForkHandler)(); | ||
|
||
OnForkHandler __forkHandler = NULL; | ||
|
||
/* Auxiliary function to call global fork handler if defined. | ||
Note: Fork handler needs to be in C (not cython) otherwise it would require | ||
GIL to be present, but some forks can exec non-python processes. | ||
*/ | ||
void handleAtFork() { | ||
if (__forkHandler != NULL) { | ||
__forkHandler(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters