diff --git a/asgiref/sync.py b/asgiref/sync.py index a46f7d4c..476604ad 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -61,9 +61,17 @@ def __init__(self, awaitable, force_new_loop=False): except RuntimeError: # There's no event loop in this thread. Look for the threadlocal if # we're inside SyncToAsync - self.main_event_loop = getattr( - SyncToAsync.threadlocal, "main_event_loop", None + main_event_loop_pid = getattr( + SyncToAsync.threadlocal, "main_event_loop_pid", None ) + # We make sure the parent loop is from the same process - if + # they've forked, this is not going to be valid any more (#194) + if main_event_loop_pid and main_event_loop_pid == os.getpid(): + self.main_event_loop = getattr( + SyncToAsync.threadlocal, "main_event_loop", None + ) + else: + self.main_event_loop = None def __call__(self, *args, **kwargs): # You can't call AsyncToSync from a thread with a running event loop @@ -312,6 +320,7 @@ def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs): """ # Set the threadlocal for AsyncToSync self.threadlocal.main_event_loop = loop + self.threadlocal.main_event_loop_pid = os.getpid() # Set the task mapping (used for the locals module) current_thread = threading.current_thread() if AsyncToSync.launch_map.get(source_task) == current_thread: diff --git a/tests/test_sync.py b/tests/test_sync.py index 5a4b3423..b6ebd595 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -1,6 +1,7 @@ import asyncio import threading import time +import multiprocessing from concurrent.futures import ThreadPoolExecutor from functools import wraps from unittest import TestCase @@ -415,3 +416,32 @@ def sync_func(): assert not asyncio.iscoroutinefunction(sync_to_async) assert asyncio.iscoroutinefunction(sync_to_async(sync_func)) + + +@pytest.mark.asyncio +async def test_multiprocessing(): + """ + Tests that a forked process can use async_to_sync without it looking for + the event loop from the parent process. + """ + + test_queue = multiprocessing.Queue() + + async def async_process(): + test_queue.put(42) + + def sync_process(): + """Runs async_process synchronously""" + async_to_sync(async_process)() + + def fork_first(): + """Forks process before running sync_process""" + fork = multiprocessing.Process(target=sync_process) + fork.start() + fork.join(3) + # Force cleanup in failed test case + if fork.is_alive(): + fork.terminate() + return test_queue.get(True, 1) + + assert await sync_to_async(fork_first)() == 42