# An overview of asyncio
source: https://realpython.com/async-io-python/

### The 10,000-Foot View of Async IO

#### Parallelism: 
- performing multiple operations at the same time. Multiprocessing is a means to effect parallelism, and it entails spreading tasks over a computer’s central processing units (CPUs, or cores). Multiprocessing is well-suited for CPU-bound tasks: tightly bound for loops and mathematical computations usually fall into this category.

- Async IO is NOT parallelism 

#### Concurrency
- multiple tasks have the ability to run in an overlapping manner. (There’s a saying that concurrency does not imply parallelism.)

#### Threading
- Threading is a concurrent execution model whereby multiple threads take turns executing tasks. 
- What’s important to know about threading is that it’s better for IO-bound tasks. While a CPU-bound task is characterized by the computer’s cores continually working hard from start to finish, an IO-bound job is dominated by a lot of waiting on input/output to complete.

To recap the above, concurrency encompasses both multiprocessing (ideal for CPU-bound tasks) and threading (suited for IO-bound tasks). Multiprocessing is a form of parallelism, with parallelism being a specific type (subset) of concurrency. The Python standard library has offered longstanding support for both of these through its multiprocessing, threading, and concurrent.futures packages.


- async IO is not threading, nor is it multiprocessing.
- Asynchronous routines are able to “pause” while waiting on their ultimate result and let other routines run in the meantime.
- Asynchronous code, through the mechanism above, facilitates concurrent execution. To put it differently, asynchronous code gives the look and feel of concurrency.

#### Intuitive async IO explanation
Imagine a chess player who is playing multiple opponents

- Synchronous version: Judit plays one game at a time, never two at the same time, until the game is complete. Each game takes (55 + 5) * 30 == 1800 seconds, or 30 minutes. The entire exhibition takes 24 * 30 == 720 minutes, or 12 hours.

- Asynchronous version: Judit moves from table to table, making one move at each table. She leaves the table and lets the opponent make their next move during the wait time. One move on all 24 games takes Judit 24 * 5 == 120 seconds, or 2 minutes. The entire exhibition is now cut down to 120 * 30 == 3600 seconds, or just 1 hour. (Source)

So, cooperative multitasking is a fancy way of saying that a program’s event loop (more on that later) communicates with multiple tasks to let each take turns running at the optimal time.

Async IO takes long waiting periods in which functions would otherwise be blocking and allows other functions to run during that downtime. (A function that blocks effectively forbids others from running from the time that it starts until the time that it returns.)


### The async/await Syntax and Native Coroutines

- a coroutine is a function that can suspend its execution before reaching return, and it can indirectly pass control to another coroutine for some time.

In [1]:
import time
import asyncio

async def count():
    
    print('One')
    await asyncio.sleep(1)
    print('Two')
    
async def main():
    
    await asyncio.gather(count(), count(), count())


s = time.perf_counter()
#asyncio.run(main()) - script version
await main() # Jupyter version
elapsed = time.perf_counter() - s
print(f" executed in {elapsed:0.2f} seconds.")

One
One
One
Two
Two
Two
 executed in 1.00 seconds.


Talking to each of the calls to count() is a single event loop, or coordinator. When each task reaches await asyncio.sleep(1), the function yells up to the event loop and gives control back to it, saying, “I’m going to be sleeping for 1 second. Go ahead and let something else meaningful be done in the meantime.”

### The Rules of Async IO

- The syntax async def introduces either a native coroutine or an asynchronous generator. The expressions async with and async for are also valid, and you’ll see them later on.

- The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g(), this is how await tells the event loop, “Suspend execution of g() until whatever I’m waiting on—the result of f()—is returned. In the meantime, go let something else run.”

In [2]:
async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r

- Just like it’s a SyntaxError to use yield outside of a def function, it is a SyntaxError to use await outside of an async def coroutine. You can only use await in the body of coroutines.

In [3]:
async def f(x):
    y = await z(x)  # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x  # OK - this is an async generator

async def m(x):
    yield from gen(x)  # No - SyntaxError

def m(x):
    y = await z(x)  # Still no - SyntaxError (no `async def` here)
    return y

SyntaxError: 'yield from' inside async function (1355037374.py, line 9)

Finally, when you use await f(), it’s required that f() be an object that is awaitable. Well, that’s not very helpful, is it? For now, just know that an awaitable object is either (1) another coroutine or (2) an object defining an .__await__() dunder method that returns an iterator. If you’re writing a program, for the large majority of purposes, you should only need to worry about case #1.

Here’s one example of how async IO cuts down on wait time: given a coroutine makerandom() that keeps producing random integers in the range [0, 10], until one of them exceeds a threshold, you want to let multiple calls of this coroutine not need to wait for each other to complete in succession. You can largely follow the patterns from the two scripts above, with slight changes:

In [4]:
import asyncio
import random

# ANSI colors
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
)

async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = random.randint(0, 10)
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx + 1)
        i = random.randint(0, 10)
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i

async def main():
    res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
    return res


random.seed(444)
#r1, r2, r3 = asyncio.run(main())
r1, r2, r3 = await main()
print()
print(f"r1: {r1}, r2: {r2}, r3: {r3}")

[36mInitiated makerandom(0).
[36mmakerandom(0) == 4 too low; retrying.
[91mInitiated makerandom(1).
[91mmakerandom(1) == 4 too low; retrying.
[35mInitiated makerandom(2).
[35mmakerandom(2) == 0 too low; retrying.
[36mmakerandom(0) == 4 too low; retrying.
[91mmakerandom(1) == 7 too low; retrying.
[36mmakerandom(0) == 4 too low; retrying.
[35mmakerandom(2) == 4 too low; retrying.
[36mmakerandom(0) == 8 too low; retrying.
[91m---> Finished: makerandom(1) == 10[0m
[36mmakerandom(0) == 7 too low; retrying.
[36mmakerandom(0) == 8 too low; retrying.
[35mmakerandom(2) == 4 too low; retrying.
[36mmakerandom(0) == 7 too low; retrying.
[36mmakerandom(0) == 1 too low; retrying.
[36mmakerandom(0) == 6 too low; retrying.
[35m---> Finished: makerandom(2) == 9[0m
[36mmakerandom(0) == 3 too low; retrying.
[36mmakerandom(0) == 9 too low; retrying.
[36mmakerandom(0) == 7 too low; retrying.
[36m---> Finished: makerandom(0) == 10[0m

r1: 10, r2: 10, r3: 9


While “making random integers” (which is CPU-bound more than anything) is maybe not the greatest choice as a candidate for asyncio, it’s the presence of asyncio.sleep() in the example that is designed to mimic an IO-bound process where there is uncertain wait time involved. For example, the asyncio.sleep() call might represent sending and receiving not-so-random integers between two clients in a message application.

### Async IO Design Patterns

#### Chaining Coroutines

Pay careful attention to the output, where part1() sleeps for a variable amount of time, and part2() begins working with the results as they become available:

In [10]:
import asyncio
import random
import time

async def part1(n: int) -> str:
    
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result

async def part2(n: int, arg: str) -> str:
    
    i = random.randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2{n, arg} == {result}.")
    return result

async def chain(n: int) -> None:
    
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
    
    await asyncio.gather(*(chain(n) for n in args))


import sys
random.seed(444)
args = [1, 2, 3]
start = time.perf_counter()
#asyncio.run(main(*args))
await main(*args)
end = time.perf_counter() - start
print(f"Program finished in {end:0.2f} seconds.")

part1(1) sleeping for 4 seconds.
part1(2) sleeping for 4 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(1) == result1-1.
part2(1, 'result1-1') sleeping for 7 seconds.
Returning part1(2) == result2-1.
part2(2, 'result2-1') sleeping for 4 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(2, 'result2-1') == result2-2 derived from result2-1.
-->Chained result2 => result2-2 derived from result2-1 (took 8.00 seconds).
Returning part2(1, 'result1-1') == result1-2 derived from result1-1.
-->Chained result1 => result1-2 derived from result1-1 (took 11.01 seconds).
Program finished in 11.01 seconds.


#### Using a Queue

There is an alternative structure that can also work with async IO: a number of producers, which are not associated with each other, add items to a queue. Each producer may add multiple items to the queue at staggered, random, unannounced times. A group of consumers pull items from the queue as they show up, greedily and without waiting for any other signal.

In this design, there is no chaining of any individual consumer to a producer. The consumers don’t know the number of producers, or even the cumulative number of items that will be added to the queue, in advance.

It takes an individual producer or consumer a variable amount of time to put and extract items from the queue, respectively. The queue serves as a throughput that can communicate with the producers and consumers without them talking to each other directly.

Note: While queues are often used in threaded programs because of the thread-safety of queue.Queue(), you shouldn’t need to concern yourself with thread safety when it comes to async IO. (The exception is when you’re combining the two, but that isn’t done in this tutorial.)

One use-case for queues (as is the case here) is for the queue to act as a transmitter for producers and consumers that aren’t otherwise directly chained or associated with each other.

In [13]:
import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    
    return os.urandom(size).hex()

async def randsleep(caller=None) -> None:
    
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
        
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    
    n = random.randint(0, 10)
    
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()


#import argparse
random.seed(444)
#parser = argparse.ArgumentParser()
#parser.add_argument("-p", "--nprod", type=int, default=5)
#parser.add_argument("-c", "--ncon", type=int, default=10)
#ns = parser.parse_args()
start = time.perf_counter()
#asyncio.run(main(**ns.__dict__))
await main(5, 10)
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")

Producer 0 sleeping for 4 seconds.
Producer 2 sleeping for 7 seconds.
Producer 3 sleeping for 4 seconds.
Producer 4 sleeping for 10 seconds.
Consumer 0 sleeping for 7 seconds.
Consumer 1 sleeping for 8 seconds.
Consumer 2 sleeping for 4 seconds.
Consumer 3 sleeping for 7 seconds.
Consumer 4 sleeping for 1 seconds.
Consumer 5 sleeping for 6 seconds.
Consumer 6 sleeping for 9 seconds.
Consumer 7 sleeping for 3 seconds.
Consumer 8 sleeping for 9 seconds.
Consumer 9 sleeping for 7 seconds.
Producer 0 added <ff1e73ad31> to queue.
Producer 0 sleeping for 10 seconds.
Producer 3 added <2a2a128332> to queue.
Producer 3 sleeping for 0 seconds.
Consumer 2 got element <ff1e73ad31> in 0.00057 seconds.
Consumer 2 sleeping for 1 seconds.
Consumer 4 got element <2a2a128332> in 0.00074 seconds.
Consumer 4 sleeping for 0 seconds.
Producer 3 added <e19d97dc32> to queue.
Producer 3 sleeping for 1 seconds.
Consumer 4 got element <e19d97dc32> in 0.00277 seconds.
Consumer 4 sleeping for 9 seconds.
Producer 3

The first few coroutines are helper functions that return a random string, a fractional-second performance counter, and a random integer. A producer puts anywhere from 1 to 5 items into the queue. Each item is a tuple of (i, t) where i is a random string and t is the time at which the producer attempts to put the tuple into the queue.

When a consumer pulls an item out, it simply calculates the elapsed time that the item sat in the queue using the timestamp that the item was put in with.

Keep in mind that asyncio.sleep() is used to mimic some other, more complex coroutine that would eat up time and block all other execution if it were a regular blocking function.