# Cookbook `threaded_async`

###  Deploying async code in the background.

Entering the context of an AsyncRunner creates a new background thread with its
own event loop. Threaded code can create asynchronous tasks on this event loop
and use a blocking thread interface to access their results.

In [1]:
import asyncio

import threaded_async

async def foo() -> int:
  await asyncio.sleep(0.1)
  return 10

with threaded_async.AsyncRunner() as runner:
  # Deploy the coroutine as a task on the loop
  background_task = runner.create_task(foo())
  # Block the current thread waiting for the task to complete.
  print(background_task.wait())

10


### Using events.

Events can be used for bidirectional communication between threaded code
and async coroutines. The following shows an async function that uses an event
to wait for the main thread to perform initialization of a global variable.

In [2]:
from typing import Optional
import threaded_async

global_var: Optional[int] = None

async def increment_global(global_initialized: threaded_async.Event) -> int:
  await global_initialized.as_async_event().wait()
  assert global_var is not None
  return global_var + 1

with threaded_async.AsyncRunner() as runner:
  global_initialized = threaded_async.Event(runner)
  background_task = runner.create_task(increment_global(global_initialized))
  global_var = 10
  global_initialized.set()
  print(background_task.wait())

11


### Using queues

The `threaded_async.Queue` class provides both threaded and async interfaces
for adding or removing items from the queue. The code below shows an async
worker that receives tasks and reports results to the main thread.

In [3]:
import threaded_async

async def increment_worker(
    in_queue: threaded_async.Queue[int],
    out_queue: threaded_async.Queue[int]):
  """Increment integers from in_queue and put them in output_queue."""
  while True:
    number = await in_queue.get()
    await out_queue.put(number + 1)

with threaded_async.AsyncRunner() as runner:
  in_queue = threaded_async.Queue[int](runner)
  out_queue = threaded_async.Queue[int](runner)
  background_task = runner.create_task(increment_worker(in_queue, out_queue))
  for i in range(10):
    in_queue.put_wait(i)
    print(out_queue.get_wait())
  background_task.cancel()  # Tasks are also auto-canceled when leaving context.

1
2
3
4
5
6
7
8
9
10


### Control inversion

In some circumstances it can be useful to treat async code as a client
making requests against the main thread as if it was a server. We use the term
_control\_inversion_ to refer to this pattern, since from the perspective of
the asynchronous client code it appears that the client making requests against
the server, but in practice, the client code will sleep until the server
decides to handle the request. 

In [37]:
from threaded_async.control_inversion import ExecutionRequest
from threaded_async.threaded_async import Future


class Stub(threaded_async.Client):
  """The interface between async coroutine and main thread."""
  async def increment(self, number: int) -> int:
    return await self.execute(Stub.increment, number)


class MyServer(threaded_async.Server):
  """A server that processes increment requests."""

  def _handle_request(
      self, request: ExecutionRequest[int], future: Future[int]):
    if request.fun == Stub.increment:
      # Handle increment request.
      (number,) = request.args
      future.set_result(number + 1)
    else:
      assert False, f"Unknown function {request.fun}"


async def client_code(client: Stub):
  for i in range(10):
    print(f'Client got: {await client.increment(i)}')


server = MyServer()
with server:
  client = Stub(server)
  server.create_background_task(client_code(client))
  for i in range(3):
    print('Processing new client requests')
    server.process()


Processing new client requests
Client got: 1
Processing new client requests
Client got: 2
Processing new client requests
Client got: 3


### Error handling

Background tasks will reraise exceptions in the main thread when `wait()` is
called on them. If at the time the `AsyncRunner` context is exited, there are
outstanding exceptions that have not been retrieved yet, these will be raised
as an `ExceptionGroup`.

In [40]:
async def raises_error():
  raise ValueError('This is an error')

try:
  with threaded_async.AsyncRunner() as runner:
    for _ in range(3):
      runner.create_task(raises_error())
except threaded_async.ExceptionGroup as e:
  print(f'Got the following exceptions: {e.exceptions}')

Got the following exceptions: (ValueError('This is an error'), ValueError('This is an error'), ValueError('This is an error'))


### Stuck event loops

Asyncio event loops can get stuck when a coroutine doesn't yield control by 
using the await keyword. This will manifest in an EventLoopTimeoutError and 
will prevent a clean shutdown of the loop.

If `AsyncRunner` detects that the loop is stuck on exit, it will try and bring
down the thread forcefully, by calling directly into the python C API. Note
that this may leave the python runtime in an inconsistent state.

In [45]:
async def hang_forever():
  """A function that does not await anything and blocks the event loop."""
  await asyncio.sleep(0.1)
  while True:
    pass

try:
  with threaded_async.AsyncRunner() as runner:
    runner.create_task(hang_forever()).wait(1.0)
except threaded_async.EventLoopTimeoutError:
  print('Event loop was stuck.')

print(f'Runner shutdown type: {runner.shutdown_type.name}')

Exception in thread Thread-385:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/leo/dev/threaded_async/.venv/lib/python3.8/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/leo/dev/threaded_async/src/threaded_async/threaded_async.py", line 245, in _run_event_loop
    self._event_loop.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
    handle._run()
  File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/leo/dev/threaded_async/src/threaded_async/threaded_async.py", line 751, in _run
    self._exec_tracker.set_result(await self._aw

Event loop was stuck.
Runner shutdown type: FORCE_STOPPED
