Skip to content

Commit

Permalink
sdf
Browse files Browse the repository at this point in the history
  • Loading branch information
excitoon committed May 30, 2023
1 parent 9950ac2 commit 33633cc
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 2 deletions.
166 changes: 166 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import unittest
import sys


if sys.version_info >= (3, 8):
IsolatedAsyncioTestCase = unittest.IsolatedAsyncioTestCase

else:
# This is a `unittest.async_case` from Python 3.8.
import asyncio
import inspect

class IsolatedAsyncioTestCase(unittest.TestCase):
# Names intentionally have a long prefix
# to reduce a chance of clashing with user-defined attributes
# from inherited test case
#
# The class doesn't call loop.run_until_complete(self.setUp()) and family
# but uses a different approach:
# 1. create a long-running task that reads self.setUp()
# awaitable from queue along with a future
# 2. await the awaitable object passing in and set the result
# into the future object
# 3. Outer code puts the awaitable and the future object into a queue
# with waiting for the future
# The trick is necessary because every run_until_complete() call
# creates a new task with embedded ContextVar context.
# To share contextvars between setUp(), test and tearDown() we need to execute
# them inside the same task.

# Note: the test case modifies event loop policy if the policy was not instantiated
# yet.
# asyncio.get_event_loop_policy() creates a default policy on demand but never
# returns None
# I believe this is not an issue in user level tests but python itself for testing
# should reset a policy in every test module
# by calling asyncio.set_event_loop_policy(None) in tearDownModule()

def __init__(self, methodName="runTest"):
super().__init__(methodName)
self._asyncioTestLoop = None
self._asyncioCallsQueue = None

async def asyncSetUp(self):
pass

async def asyncTearDown(self):
pass

def addAsyncCleanup(self, func, /, *args, **kwargs):
# A trivial trampoline to addCleanup()
# the function exists because it has a different semantics
# and signature:
# addCleanup() accepts regular functions
# but addAsyncCleanup() accepts coroutines
#
# We intentionally don't add inspect.iscoroutinefunction() check
# for func argument because there is no way
# to check for async function reliably:
# 1. It can be "async def func()" iself
# 2. Class can implement "async def __call__()" method
# 3. Regular "def func()" that returns awaitable object
self.addCleanup(*(func, *args), **kwargs)

def _callSetUp(self):
self.setUp()
self._callAsync(self.asyncSetUp)

def _callTestMethod(self, method):
self._callMaybeAsync(method)

def _callTearDown(self):
self._callAsync(self.asyncTearDown)
self.tearDown()

def _callCleanup(self, function, *args, **kwargs):
self._callMaybeAsync(function, *args, **kwargs)

def _callAsync(self, func, /, *args, **kwargs):
assert self._asyncioTestLoop is not None
ret = func(*args, **kwargs)
assert inspect.isawaitable(ret)
fut = self._asyncioTestLoop.create_future()
self._asyncioCallsQueue.put_nowait((fut, ret))
return self._asyncioTestLoop.run_until_complete(fut)

def _callMaybeAsync(self, func, /, *args, **kwargs):
assert self._asyncioTestLoop is not None
ret = func(*args, **kwargs)
if inspect.isawaitable(ret):
fut = self._asyncioTestLoop.create_future()
self._asyncioCallsQueue.put_nowait((fut, ret))
return self._asyncioTestLoop.run_until_complete(fut)
else:
return ret

async def _asyncioLoopRunner(self, fut):
self._asyncioCallsQueue = queue = asyncio.Queue()
fut.set_result(None)
while True:
query = await queue.get()
queue.task_done()
if query is None:
return
fut, awaitable = query
try:
ret = await awaitable
if not fut.cancelled():
fut.set_result(ret)
except (SystemExit, KeyboardInterrupt):
raise
except (BaseException, asyncio.CancelledError) as ex:
if not fut.cancelled():
fut.set_exception(ex)

def _setupAsyncioLoop(self):
assert self._asyncioTestLoop is None
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)
self._asyncioTestLoop = loop
fut = loop.create_future()
self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))
loop.run_until_complete(fut)

def _tearDownAsyncioLoop(self):
assert self._asyncioTestLoop is not None
loop = self._asyncioTestLoop
self._asyncioTestLoop = None
self._asyncioCallsQueue.put_nowait(None)
loop.run_until_complete(self._asyncioCallsQueue.join())

try:
# cancel all tasks
to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return

for task in to_cancel:
task.cancel()

loop.run_until_complete(asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))

for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during test shutdown",
"exception": task.exception(),
"task": task,
}
)
# shutdown asyncgens
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
loop.close()

def run(self, result=None):
self._setupAsyncioLoop()
try:
return super().run(result)
finally:
self._tearDownAsyncioLoop()
3 changes: 2 additions & 1 deletion tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import asgiref.sync

import django_threaded_sync_to_async
import tests


class TestExecutor(unittest.IsolatedAsyncioTestCase):
class TestExecutor(tests.IsolatedAsyncioTestCase):
async def testConcurrent(self):
workers = 50
timeout = 0.1
Expand Down
3 changes: 2 additions & 1 deletion tests/test_shared_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import asgiref.sync

import django_threaded_sync_to_async
import tests


class TestSharedExecutor(unittest.IsolatedAsyncioTestCase):
class TestSharedExecutor(tests.IsolatedAsyncioTestCase):
async def testSimple(self):
async with django_threaded_sync_to_async.SharedExecutor("simple_common") as executor:
pass
Expand Down

0 comments on commit 33633cc

Please sign in to comment.