# Introduction to `asyncio`
Tom Manderson

# Who does this guy think he is anyway?

- Final sem software engineering/maths dual student (6th year 😭)
- Professional Python developer for 3.5 years
- Currently a freelance consultant
- Secretly loves Haskell more than Python
- Ex-President of UQCS


# What’s he going on about now?

- Some theory (concurrency, parallelism, asynchrony)
- Asynchronous coroutines
  - What they are
  - How to run them
- Tasks in `asyncio`
  - Using them for concurrency
  - Asyncio weirdness
- Dealing with threading
  - `asyncio` executors
  - Threadsafe async
  - How to wrap a non-async library
- Example funtimes

All these slides are a Jupyter notebook and I will be running the code live! 😱



# Please ask me questions throughout the talk

If asking a question will help further _your_ understanding, it'll undoubtedly help someone else understand better too. No question is too stupid or too complex.

In [1]:
# The topic of this talk
import asyncio

# Types are good, use them
from typing import *

# So we can use pretty pictures
from IPython.display import Image, display, display_html

# For synchronous examples
import requests
import time

# For async examples
import aiohttp

In [2]:
%autoawait on
asyncio.get_event_loop().set_debug(False)

# Concurrency vs Parallelism
_Patting your head while rubbing your tummy_

## Concurrency

Doing two processes at once

## Parallelism

Taking two actions at once

A "process" is made up of multiple actions

# Example

To steal an example from Go, let's burn a bunch of Go programming language manuals.

![A gopher burns books](gopher-book-burning.jpg)

### Concurrency
The gopher puts the books in the fire, and while they are burning goes back and fetches more books.

### Parallelism
The gopher brings a friend along to help shovel the books into the fire

Quote myself: I'm loath to take anything from Go but this example is a good one

# Synchronous vs Asynchronous
_Useful impatience_

### Synchronous programming
Start doing something, wait for it to finish, then move on to the next thing

### Asynchronous programming
Start doing something, continue with other work, then wait for it to finish when you need the result

# Example

![A gopher burns books](gopher-book-burning.jpg)

### Synchronous programming

1. Load up books
2. Take books to the fire
3. Burn books
4. Go back to book pile

### Asynchronous programming

1. Load up the books
2. Take books to the fire
3. Wait for the fire to be available
4. Put books in the fire
5. Go back to the book pile


Specifically note that adding more gophers makes the synchronous version concurrent, but it's still synchronous

You can add more workers, but the worker uses the same paradigm

# So what's `asyncio`?
- Part of the Python standard library since Python 3.4
- Tolerable since Python 3.5
- Performant since Python 3.6
- Good since Python 3.7
- Standardises asynchronous programming in Python

# Coroutines

- Short for "cooperative routine"
- A process (assume it's a function) that can be paused and resumed
- Used for "non-preemptive multitasking" or "cooperative multitasking"

## Preemptive multitasking

![Dictionary definition of pre-empt](preempt-def.png)

>Preemptive multitasking involves the use of an interrupt mechanism which suspends the currently executing process and invokes a scheduler to determine which process should execute next.
-- Wikipedia: "Preemption (Computing)"

## Cooperative multitasking

- Instead of being preempted, a coroutine yields control to the caller
- Called "cooperative" because the task keeps executing until it agrees to be paused
- Uncooperative tasks can hog all the computing time

The best part? You've probably already seen coroutines in Python!

In [None]:
def fibonacci() -> Generator[int, None, None]:
    x, y = 1, 1
    yield 1
    yield 1
    while True:
        x, y = y, x + y
        yield y

coroutine = fibonacci()

In [None]:
print(next(coroutine))
print(next(coroutine))
print(next(coroutine))
print("The coroutine is in an infite loop but it agreed to be paused")
print(next(coroutine))

Smash that execute button multiple times - we have the same coroutine and it's still paused

# and now, for something completely different!

(but not really)

# Actually using `asyncio`!
Let's make a HTTP request!

In [3]:
def display_image(data, **kwargs):
    display(Image(data=data, **kwargs))

image_url = "https://emojipedia-us.s3.dualstack.us-west-1.amazonaws.com/thumbs/160/google/146/t-rex_1f996.png"

## Synchronous version

In [None]:
def fetch_image_sync():
    response = requests.get(image_url)
    return response.content

image_data = fetch_image_sync()
print(type(image_data))
display_image(image_data, width="100px") 

Mention that I've pre-defined `display_image`

# Asynchronous version

In [None]:
def fetch_image_sync():
    response = requests.get(image_url)
    return response.content

async def fetch_image_async() -> bytes:
    with aiohttp.ClientSession() as session:
        response = session.get(img_url)
        return response.read()

image_data_sync = fetch_image_sync()
image_data_async = fetch_image_async()
print('sync:', type(image_data_sync))
print('async:', type(image_data_async))
image_data_async

So uhhhh, how the hell do we actually run the thing...?

In [None]:
coro = fetch_image_async()

# In Python 3.7 it's super easy, you just:
asyncio.run(coro)

# In Python 3.6 it's still kinda easy but less good:
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)

# OR, if you're in another `async` function
async def other_func():
    await coro

**DISCLAIMER:** Due to weirdness with how I've put these slides together (Jupyter Notebook) I don't run async stuff the normal way. Instead, I get to do this:

In [None]:
await fetch_image_async()

## So uhh, that wasn't supposed to happen...

Pretty weird error... Let's review the code

In [None]:
async def fetch_image_async() -> None:
    with aiohttp.ClientSession() as session:
        response = session.get(img_url)
        return response.read()

### How it's actually done

In [None]:
async def fetch_image_async() -> None:
    async with aiohttp.ClientSession() as session:
        response = await session.get(image_url)
        return (await response.read())

image_data_async = await fetch_image_async()
print(type(image_data_async))
display_image(image_data_async, width="100px")

# Takeaways
- async context managers are a thing too, and they're different
  - I didn't show you but async for loops are a thing too
  - `async for x in ...:` is when async is used to generate the things you're iterating over
- you have to write `await` a lot

In [None]:
async def fetch_image_async() -> None:
    async with aiohttp.ClientSession() as session:
        response = await session.get(image_url)
        return (await response.read())

# Reminder: how to start coroutines
This is a part people forget a lot so I'm just reminding you quickly

In [None]:
# Python 3.7
asyncio.run(coro)

# Python 3.6
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)

# When defining another coroutine
async def other_func():
    await coro

# Tasks
- Actually using `asyncio` for concurrency
- Some potentially unintuitive behaviour
- Cats

In [None]:
async def fetch_cats_async() -> None:
    async with aiohttp.ClientSession() as session:
        for cat in [200, 418, 500]:
            start_time = time.time()
            response = await session.get(f"http://http.cat/{cat}")
            cat_pic = await response.read()
            display_image(cat_pic, width="200px")
            print(time.time() - start_time)
start_time = time.time()
await fetch_cats_async()
print(time.time() - start_time)

# Why don't we do it concurrently?

In [None]:
async def fetch_a_cat(session, number) -> None:
    start_time = time.time()
    response = await session.get(f"http://http.cat/{number}")
    cat_pic = await response.read()
    display_image(cat_pic, width="200px")
    print(time.time() - start_time)

async def fetch_cats_async() -> None:
    async with aiohttp.ClientSession() as session:
        coros = [fetch_a_cat(session, cat) for cat in [200, 418, 500]]
        for coro in coros:
            await coro

start = time.time()
await fetch_cats_async()
print('Total', time.time() - start)

`asyncio` does not start coroutines until they are awaited! Instead you have to make them a task.

In [None]:
async def fetch_a_cat(session, number) -> None:
    start_time = time.time()
    response = await session.get(f"http://http.cat/{number}")
    cat_pic = await response.read()
    display_image(cat_pic, width="200px")
    print(time.time() - start_time)

async def fetch_cats_async() -> None:
    async with aiohttp.ClientSession() as session:
        coros = [fetch_a_cat(session, cat) for cat in [200, 418, 500]]
        # in Python 3.6 use asyncio.ensure_future instead
        coros = [asyncio.create_task(coro) for coro in coros]
        for coro in coros:
            await coro
        
start = time.time()
await fetch_cats_async()
print('Total', time.time() - start)

So now we can start things concurrently, but do we really have to...
1. create the coroutine
2. turn it into a task
3. await the task?

In [None]:
async def fetch_cats_async() -> None:
    async with aiohttp.ClientSession() as session:
        coros = [fetch_a_cat(session, cat) for cat in [200, 418, 500]]
        await asyncio.gather(*coros)
        
start = time.time()
await fetch_cats_async()
print('Total', time.time() - start)

This is the "introducing `asyncio.gather`" slide

# Dealing with threading
  - `asyncio` executors
  - Threadsafe async
  - How to wrap a non-async library

In [None]:
def fetch_a_cat_sync(number) -> None:
    start_time = time.time()
    response = requests.get(f"http://http.cat/{number}")
    display_image(response.content, width="200px")
    print(time.time() - start_time)

async def fetch_cats_threadpool() -> None:
    loop = asyncio.get_running_loop()
    async with aiohttp.ClientSession() as session:
        tasks = []
        for cat in [200, 418, 500]:
            tasks.append(loop.run_in_executor(None, fetch_a_cat_sync, cat))
        for task in tasks:
            await task

start = time.time()
await fetch_cats_async()
print('Total', time.time() - start)

# What is an executor?
- Provided by Python's `concurrent.futures` module since Python 2
- Two versions by default
  - `ThreadPoolExecutor`
  - `ProcessPoolExecutor`
- Provides an abstract `Executor` you can extend yourself
- Relies around the concept of "futures"

# What's a future?
- AKA as a "Promise" in JS land
- It's a marker for something that will eventually be a value
- In `asyncio` they're used as a bridge between coroutines and synchronous code

In [None]:
class Future():
    def cancel(): pass
    def cancelled(): pass
    def running(): pass
    def done(): pass
    def result(timeout=None): pass
    def exception(timeout=None): pass
    def add_done_callback(fn): pass
    def set_running_or_notify_cancel(): pass
    def set_result(result): pass
    def set_exception(exception): pass

In [None]:
def worker(queue):
    for future, fn, args, kwargs iter(queue, None):
        if future.set_running_or_notify_cancel():
            continue
        try:
            result = fn(*args, **kwargs)
        except Exception as err:
            future.set_exception(err)
            continue
        future.set_result(result)

class OneThreadExecutor(Executor):

# Questions?

This isn't your last chance, we're going to go over some examples of more complex stuff after this

# Example time

- Webapp using sanic
- TCP echo server
- Bridging a callback-based library for Kafka with `asyncio`

In [None]:
from sanic import Sanic
from sanic import response as res

app = Sanic(__name__)


@app.route("/")
async def test(req):
    return res.text("I\'m a teapot", status=418)


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

In [None]:
import asyncio

async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

if __name__ == "__main__":
    asyncio.run(main())

In [None]:
class AsyncKafkaProducer(Producer):
    def __init__(self, config, loop: asyncio.AbstractEventLoop=None):
        super().__init__(config)
        if loop is None:
            loop = asyncio.get_event_loop()
        self._poll_handle = loop.call_soon(self._poll_loop)

    def _poll_loop(self):
        loop = asyncio.get_running_loop()
        self.poll(0)
        self._poll_handle = loop.call_later(1, self._poll_loop)

    def _send_callback(self, future: asyncio.Future, err, msg):
        if err is not None:
            future.set_exception(err)
        else:
            future.set_result(None)

    async def send(self, topic: str, message: dict) -> None:
        loop = asyncio.get_running_loop()
        fut = loop.create_future()
        on_delivery_cb = functools.partial(self._send_callback, fut)
        self.produce(topic, message, callback=on_delivery_cb)
        loop.call_soon(self.poll, 0)
        return await fut

    def __del__(self):
        self._poll_handle.cancel()