<a href="https://colab.research.google.com/github/baihoneycutt/learn-py/blob/main/asyncio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Section 1 Syntax and Native Coroutines
A coroutine is a specialized version of a Python `generator` function.
> A coroutine can suspend its execution before reaching return

In [9]:
import asyncio # use asyncio
async def c():
    print(f"Start...")
    await asyncio.sleep(1)
    print(f"End...")
async def main():
    await asyncio.gather(c(),c(),c(),c())
if __name__ == "__main__":
    import time
    s = time.perf_counter()
    await main() # asyncio.run(main()) # only for standalone
    elapsed = time.perf_counter() - s
    print(f"{__name__} executed in {elapsed:4f} seconds.") 

Start...
Start...
Start...
Start...
End...
End...
End...
End...
__main__ executed in 1.003738 seconds.


In [12]:
def c():
    print(f"Start...")
    time.sleep(1) # Note: can't just await for asyncio, c() is **NOT** async
    print(f"End...")
def main():
    for _ in range(4):
        c()
if __name__ == "__main__":
    import time
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"{__name__} executed in {elapsed:4f} seconds.") 

Start...
End...
Start...
End...
Start...
End...
Start...
End...
__main__ executed in 4.007882 seconds.


- `async def` results in a **native coroutine** or an **asynchronous generator** (`async with` and `async for` are valid as well).
- `await` hands function control over the event loop (*by suspending the associated `surrounding` coroutine*). 
> **To Clarify:** <br>
\- If an *`await f()`* is encountered within the scope of *`g()`*, the execution of `g()` is suspended till the result of `f()` is returned. <br>
\- Something else meanwhile runs
```python
async def g():
    \# Pause and back to g() when f() is ready
    await f()
    return r
```

In [18]:
import asyncio
import random
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
) # ANSI colors
async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = random.randint(0, 10)
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx + 1)
        i = random.randint(0, 10)
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i

async def main():
    res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
    return res

if __name__ == "__main__":
    random.seed(444)
    r1, r2, r3 = await main() # r1, r2, r3 = asyncio.run(main())
    print()
    print(f"r1: {r1}, r2: {r2}, r3: {r3}")

[36mInitiated makerandom(0).
[36mmakerandom(0) == 4 too low; retrying.
[91mInitiated makerandom(1).
[91mmakerandom(1) == 4 too low; retrying.
[35mInitiated makerandom(2).
[35mmakerandom(2) == 0 too low; retrying.
[36mmakerandom(0) == 4 too low; retrying.
[91mmakerandom(1) == 7 too low; retrying.
[36mmakerandom(0) == 4 too low; retrying.
[35mmakerandom(2) == 4 too low; retrying.
[36mmakerandom(0) == 8 too low; retrying.
[91m---> Finished: makerandom(1) == 10[0m
[36mmakerandom(0) == 7 too low; retrying.
[36mmakerandom(0) == 8 too low; retrying.
[35mmakerandom(2) == 4 too low; retrying.
[36mmakerandom(0) == 7 too low; retrying.
[36mmakerandom(0) == 1 too low; retrying.
[36mmakerandom(0) == 6 too low; retrying.
[35m---> Finished: makerandom(2) == 9[0m
[36mmakerandom(0) == 3 too low; retrying.
[36mmakerandom(0) == 9 too low; retrying.
[36mmakerandom(0) == 7 too low; retrying.
[36m---> Finished: makerandom(0) == 10[0m

r1: 10, r2: 10, r3: 9


In [21]:
print (u"\u001b[40m A \u001b[41m B \u001b[42m C \u001b[43m D \u001b[0m")
print (u"\u001b[44m A \u001b[45m B \u001b[46m C \u001b[47m D \u001b[0m")
print (u"\u001b[40;1m A \u001b[41;1m B \u001b[42;1m C \u001b[43;1m D \u001b[0m")
print (u"\u001b[44;1m A \u001b[45;1m B \u001b[46;1m C \u001b[47;1m D \u001b[0m")

[40m A [41m B [42m C [43m D [0m
[44m A [45m B [46m C [47m D [0m
[40;1m A [41;1m B [42;1m C [43;1m D [0m
[44;1m A [45;1m B [46;1m C [47;1m D [0m


In [22]:
import sys
for i in range(0, 16):
    for j in range(0, 16):
        code = str(i * 16 + j)
        sys.stdout.write(u"\u001b[48;5;" + code + "m " + code.ljust(4))
    print (u"\u001b[0m")

[48;5;0m 0   [48;5;1m 1   [48;5;2m 2   [48;5;3m 3   [48;5;4m 4   [48;5;5m 5   [48;5;6m 6   [48;5;7m 7   [48;5;8m 8   [48;5;9m 9   [48;5;10m 10  [48;5;11m 11  [48;5;12m 12  [48;5;13m 13  [48;5;14m 14  [48;5;15m 15  [0m
[48;5;16m 16  [48;5;17m 17  [48;5;18m 18  [48;5;19m 19  [48;5;20m 20  [48;5;21m 21  [48;5;22m 22  [48;5;23m 23  [48;5;24m 24  [48;5;25m 25  [48;5;26m 26  [48;5;27m 27  [48;5;28m 28  [48;5;29m 29  [48;5;30m 30  [48;5;31m 31  [0m
[48;5;32m 32  [48;5;33m 33  [48;5;34m 34  [48;5;35m 35  [48;5;36m 36  [48;5;37m 37  [48;5;38m 38  [48;5;39m 39  [48;5;40m 40  [48;5;41m 41  [48;5;42m 42  [48;5;43m 43  [48;5;44m 44  [48;5;45m 45  [48;5;46m 46  [48;5;47m 47  [0m
[48;5;48m 48  [48;5;49m 49  [48;5;50m 50  [48;5;51m 51  [48;5;52m 52  [48;5;53m 53  [48;5;54m 54  [48;5;55m 55  [48;5;56m 56  [48;5;57m 57  [48;5;58m 58  [48;5;59m 59  [48;5;60m 60  [48;5;61m 61  [48;5;62m 62  [48;5;63m 63  [0m
[48;5;64m 64  [48;5;65m 65  

In [36]:
print("\033[35m"f"test")

[35mtest


# Async IO patterns
## Chaining Corroutines
A coroutine object is awaitable, so another coroutine can `await` -> smaller manageable recyclable .....

In [None]:
import asyncio
import random
import time

async def part1(n: int) -> str:
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result

async def part2(n: int, arg: str) -> str:
    i = random.randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2({n, arg}) == {result}.")
    return result

async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end} seconds).")

async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))

if __name__ == "__main__":
    import sys
    random.seed(444)
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
    start = time.perf_counter()
    asyncio.run(main(*args))
    end = time.perf_counter() - start
    print(f"Program finished in {end} seconds.")

In [2]:
import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

async def randsleep(caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()

if __name__ == "__main__":
    import argparse
    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=5)
    parser.add_argument("-c", "--ncon", type=int, default=10)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

usage: ipykernel_launcher.py [-h] [-p NPROD] [-c NCON]
ipykernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-437d860e-0ba0-4d31-af12-aca038a72b98.json


SystemExit: ignored

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [13]:
loop = asyncio.get_event_loop()