Skip to content

Commit

Permalink
Fixed #194: Made async_to_sync work inside a fork
Browse files Browse the repository at this point in the history
sync_to_async was keeping a threadlocal for the main event loop that was
invalid, but still present, after a process fork. This makes
async_to_sync check for the PID for that parent loop as well as the mere
presence of it, fixing this problem and letting it make a new loop in
the subprocess.
  • Loading branch information
andrewgodwin committed Sep 11, 2020
1 parent 4ab9d8e commit 66a6e68
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
13 changes: 11 additions & 2 deletions asgiref/sync.py
Expand Up @@ -61,9 +61,17 @@ def __init__(self, awaitable, force_new_loop=False):
except RuntimeError:
# There's no event loop in this thread. Look for the threadlocal if
# we're inside SyncToAsync
self.main_event_loop = getattr(
SyncToAsync.threadlocal, "main_event_loop", None
main_event_loop_pid = getattr(
SyncToAsync.threadlocal, "main_event_loop_pid", None
)
# We make sure the parent loop is from the same process - if
# they've forked, this is not going to be valid any more (#194)
if main_event_loop_pid and main_event_loop_pid == os.getpid():
self.main_event_loop = getattr(
SyncToAsync.threadlocal, "main_event_loop", None
)
else:
self.main_event_loop = None

def __call__(self, *args, **kwargs):
# You can't call AsyncToSync from a thread with a running event loop
Expand Down Expand Up @@ -312,6 +320,7 @@ def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs):
"""
# Set the threadlocal for AsyncToSync
self.threadlocal.main_event_loop = loop
self.threadlocal.main_event_loop_pid = os.getpid()
# Set the task mapping (used for the locals module)
current_thread = threading.current_thread()
if AsyncToSync.launch_map.get(source_task) == current_thread:
Expand Down
30 changes: 30 additions & 0 deletions tests/test_sync.py
@@ -1,6 +1,7 @@
import asyncio
import threading
import time
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from unittest import TestCase
Expand Down Expand Up @@ -415,3 +416,32 @@ def sync_func():

assert not asyncio.iscoroutinefunction(sync_to_async)
assert asyncio.iscoroutinefunction(sync_to_async(sync_func))


@pytest.mark.asyncio
async def test_multiprocessing():
"""
Tests that a forked process can use async_to_sync without it looking for
the event loop from the parent process.
"""

test_queue = multiprocessing.Queue()

async def async_process():
test_queue.put(42)

def sync_process():
"""Runs async_process synchronously"""
async_to_sync(async_process)()

def fork_first():
"""Forks process before running sync_process"""
fork = multiprocessing.Process(target=sync_process)
fork.start()
fork.join(3)
# Force cleanup in failed test case
if fork.is_alive():
fork.terminate()
return test_queue.get(True, 1)

assert await sync_to_async(fork_first)() == 42

0 comments on commit 66a6e68

Please sign in to comment.