Base code obtained from https://realpython.com/async-io-python/

# The *asyncio* package and *async/awat*

## The *async/await* syntax and native coroutines

A coroutine is a function that **can suspend its execution before reaching return**, and can indirectly pass control to another coroutine for some time.

In [7]:
import asyncio
import time
import random
import sys
import os
import itertools as it
import argparse

# This allows to run asyncio on a Jupyter Notebook
import nest_asyncio
nest_asyncio.apply()

In [2]:
async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

In [3]:
if __name__ == "__main__":
    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"Async code executed in {elapsed:0.2f} seconds.")

One
One
One
Two
Two
Two
Async code executed in 1.05 seconds.


In [4]:
def count():
    print("One")
    time.sleep(1)
    print("Two")

def main():
    for _ in range(3):
        count()

In [5]:
if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"Synchronous code executed in {elapsed:0.2f} seconds.")

One
Two
One
Two
One
Two
Synchronous code executed in 3.01 seconds.


## The rules of *asyncio*

The keyword ***await* passes function control back to the event loop**

In [6]:
async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r

In [7]:
# ANSI colors
c = ("\033[0m", #End of color
     "\033[36m", #Cyan
     "\033[91m", #Red
     "\033[35m") #Magenta)

In [8]:
async def makerandom(idx: int,
                     threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}) with threshold {threshold}.")
    i = random.randint(0, 10)

    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying...")
        await asyncio.sleep(idx + 1)
        i = random.randint(0, 10)

    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i

In [9]:
async def main():
    res = await asyncio.gather(*(makerandom(i, 10-i-1) for i in range(3)))
    return res

In [10]:
if __name__ == "__main__":
    random.seed(444)

    loop = asyncio.get_event_loop()
    r1, r2, r3 = asyncio.run(main())
    print()
    print(f"r1: {r1}, r2: {r2}, r3: {r3}")

[36mInitiated makerandom(0) with threshold 9.
[36mmakerandom(0) == 4 too low; retrying...
[91mInitiated makerandom(1) with threshold 8.
[91mmakerandom(1) == 4 too low; retrying...
[35mInitiated makerandom(2) with threshold 7.
[35mmakerandom(2) == 0 too low; retrying...
[36mmakerandom(0) == 4 too low; retrying...
[91mmakerandom(1) == 7 too low; retrying...
[36mmakerandom(0) == 4 too low; retrying...
[35mmakerandom(2) == 4 too low; retrying...
[36mmakerandom(0) == 8 too low; retrying...
[91m---> Finished: makerandom(1) == 10[0m
[36mmakerandom(0) == 7 too low; retrying...
[36mmakerandom(0) == 8 too low; retrying...
[35mmakerandom(2) == 4 too low; retrying...
[36mmakerandom(0) == 7 too low; retrying...
[36mmakerandom(0) == 1 too low; retrying...
[36mmakerandom(0) == 6 too low; retrying...
[35m---> Finished: makerandom(2) == 9[0m
[36mmakerandom(0) == 3 too low; retrying...
[36mmakerandom(0) == 9 too low; retrying...
[36mmakerandom(0) == 7 too low; retrying...
[36m--

# *Async IO* Design Patterns

## Chaining coroutines

The runtime of *main()* will be equial to the maximum runtime of the tasks that it gathers together and schedules

In [11]:
async def part1(n: int) -> str:
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds...")
    await asyncio.sleep(i)

    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")

    return result

In [12]:
async def part2(n: int,
                arg: str) -> str:
    i = random.randint(0, 10)
    print(f"part2({n, arg}) sleeping for {i} seconds...")
    await asyncio.sleep(i)

    result = f"result{n}-2 derived from {arg}"
    print(f"Retruning part2{n, arg} == {result}.")

    return result

In [13]:
async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"--> Chained result{n} => {p2} (took {end:0.2f} seconds).")

In [14]:
async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))

In [15]:
if __name__ == "__main__":
    random.seed(444)

    args = [1, 2, 3]
    start = time.perf_counter()
    asyncio.run(main(*args))
    end = time.perf_counter() - start

    print(f"Program finished in {end:0.2f} seconds.")

part1(1) sleeping for 4 seconds...
part1(2) sleeping for 4 seconds...
part1(3) sleeping for 0 seconds...
Returning part1(3) == result3-1.
part2((3, 'result3-1')) sleeping for 4 seconds...
Returning part1(1) == result1-1.
part2((1, 'result1-1')) sleeping for 7 seconds...
Returning part1(2) == result2-1.
part2((2, 'result2-1')) sleeping for 4 seconds...
Retruning part2(3, 'result3-1') == result3-2 derived from result3-1.
--> Chained result3 => result3-2 derived from result3-1 (took 4.02 seconds).
Retruning part2(2, 'result2-1') == result2-2 derived from result2-1.
--> Chained result2 => result2-2 derived from result2-1 (took 8.04 seconds).
Retruning part2(1, 'result1-1') == result1-2 derived from result1-1.
--> Chained result1 => result1-2 derived from result1-1 (took 11.04 seconds).
Program finished in 11.04 seconds.


## Using a queue

There is an alternative structure that can also work with async IO: a number of producers, which are not associated with each other, add items to a queue. Each producer may add multiple items to the queue at staggered, random, unannounced times. A group of consumers pull items from the queue as they show up, greedily and without waiting for any other signal.

In this design, there is no chaining of any individual consumer to a producer. The consumers don’t know the number of producers, or even the cumulative number of items that will be added to the queue, in advance.

It takes an individual producer or consumer a variable amount of time to put and extract items from the queue, respectively. The queue serves as a throughput that can communicate with the producers and consumers without them talking to each other directly.

In [18]:
async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

In [19]:
async def randsleep(caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

In [20]:
async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

In [21]:
async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

In [22]:
async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()

In [23]:
if __name__ == "__main__":
    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=2)
    parser.add_argument("-c", "--ncon", type=int, default=5)
    ns = parser.parse_args(args=[])
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

Producer 0 sleeping for 4 seconds.
Consumer 0 sleeping for 4 seconds.
Consumer 1 sleeping for 7 seconds.
Consumer 2 sleeping for 4 seconds.
Consumer 3 sleeping for 4 seconds.
Consumer 4 sleeping for 8 seconds.
Producer 0 added <72d801cf9f> to queue.
Producer 0 sleeping for 10 seconds.
Consumer 0 got element <72d801cf9f> in 0.00074 seconds.
Consumer 0 sleeping for 7 seconds.
Producer 0 added <465c9fe8e2> to queue.
Producer 0 sleeping for 8 seconds.
Consumer 3 got element <465c9fe8e2> in 0.00182 seconds.
Consumer 3 sleeping for 4 seconds.
Producer 0 added <f0305b4405> to queue.
Producer 0 sleeping for 7 seconds.
Consumer 2 got element <f0305b4405> in 0.00168 seconds.
Consumer 2 sleeping for 1 seconds.
Producer 0 added <c52bf7ce81> to queue.
Consumer 1 got element <c52bf7ce81> in 0.00217 seconds.
Consumer 1 sleeping for 6 seconds.
Program completed in 29.04754 seconds.


# *Async IO*'s Roots in Generators

## *Async for* and Async Generators+Comprehensions

Along with plain async/await, Python also enables async for to iterate over an asynchronous iterator. The purpose of an asynchronous iterator is for it to be able to call asynchronous code at each stage when it is iterated over.

In [24]:
async def mygen(u: int = 10):
    # Yield powers of 2
    i = 0
    while i < u:
        yield 2 ** i
        i += 1
        await asyncio.sleep(0.1)

In [27]:
async def main():
    # This code does NOT generate concurrent execution (run two lines at the same time)
    g = [i async for i in mygen()]
    f = [j async for j in mygen() if not (j // 3 % 5)]
    return g, f

In [26]:
g, f = asyncio.run(main())
g,f

([1, 2, 4, 8, 16, 32, 64, 128, 256, 512], [1, 2, 16, 32, 256, 512])

# Asynchronous Requests code

Code that builds a web-scraping URL collector following the next steps:
1. Read a sequence of URLs from a local file, urls.txt.
2. Send GET requests for the URLs and decode the resulting content. If this fails, stop there for a URL.
3. Search for the URLs within href tags in the HTML of the responses.
4. Write the results to foundurls.txt.
5. Do all of the above as asynchronously and concurrently as possible. (Use aiohttp for the requests, and aiofiles for the file-appends. These are two primary examples of IO that are well-suited for the async IO model.)

In [40]:
import asyncio
import logging
import re
import sys
import urllib.error
import urllib.parse
import aiofiles
import aiohttp

from typing import IO
from aiohttp import ClientSession

In [41]:
logging.basicConfig(format = "%(asctime)s %(levelname)s:%(name)s: %(message)s",
                    level = logging.DEBUG,
                    datefmt = "%H:%H:%S",
                    stream = sys.stderr,)

logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

In [42]:
async def fetch_html(url: str,
                     session: ClientSession,
                     **kwargs) -> str:

    # GET request wrapper to fetch page HTML
    resp = await session.request(method = "GET",
                                 url = url,
                                 **kwargs)
    resp.raise_for_status()

    logger.info(f"Got response [{resp.status}] for URL: {url}")
    html = await resp.text()

    return html

In [43]:
async def parse(url: str,
                session: ClientSession,
                **kwargs) -> set:

    # Find HREFs in the HTML of the url
    found = set()
    try:
        html = await fetch_html(url = url,
                                session = session,
                                **kwargs)
    except(aiohttp.ClientError,
           aiohttp.http_exceptions.HttpProcessingError,) as e:
        logger.error(f"aiohttp exception for {url} [{getattr(e, 'status', None)}]: {getattr(e, 'message', None)}")

        return found

    except Exception as e:
        logger.exception(f"Non-aiohttp exception occured: {getattr(e, '__dict__', {})}")
        return found

    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            
            except(urllib.error.URLError, ValueError):
                logger.exception(f"Error parsing URL: {link}")
                pass

            else:
                found.add(abslink)

        logger.info(f"Found {len(found)} links for {url}")
        
        return found

In [44]:
async def write_one(file: IO,
                    url: str,
                    **kwargs) -> None:

    # Write the found HREFs from url to file
    res = await parse(url = url,
                      **kwargs)

    if not res:
        return None
    
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info(f"Wrote results for source URL: {url}")

In [45]:
async def bulk_crawl_and_write(file: IO,
                               urls: set,
                               **kwargs) -> None:

    # Crawl and write concurrently to file for multiple urls
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(write_one(file = file,
                                   url = url,
                                   session = session,
                                   **kwargs))
            await asyncio.gather(*tasks)

In [48]:
if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3,7)
    # here = pathlib.Path(__file__).parent

    with open("urls.txt") as infile:
        urls = set(map(str.strip, infile))

    outpath = "foundurls.txt"
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file = outpath,
                                     urls = urls))

11:11:13 INFO:areq: Got response [200] for URL: https://regex101.com/
11:11:13 INFO:areq: Found 76 links for https://regex101.com/
11:11:13 INFO:areq: Wrote results for source URL: https://regex101.com/


RuntimeError: cannot reuse already awaited coroutine

11:11:13 ERROR:areq: aiohttp exception for https://www.bloomberg.com/markets/economics [None]: None
