# Async IO

In [None]:
# pip install --upgrade pip aiohttp aiofiles

**Two keywords:** *async* and *await*

In [None]:
# In Jupyter Notebook
import nest_asyncio
nest_asyncio.apply()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())


In [3]:
import asyncio
import time

## Asynchronous version

In [16]:
async def count(i):
    print(f"{i}: One")
    await asyncio.sleep(1)
    print(f"{i}: Two")

In [14]:
async def main():
    await asyncio.gather(count(1), count(2), count(3))

In [17]:
# Original code:
# -----------------------------
# s = time.perf_counter()
# asyncio.run(main())
# elapsed = time.perf_counter() - s
# print(f"Code executed in {elapsed:0.2f} seconds.")
# -----------------------------


# Handling for environments with an already running event loop
try:
    import nest_asyncio
    nest_asyncio.apply()
except ImportError as ie:
    print(f'Error: {ie}')

s = time.perf_counter()

# Get the current event loop and run `main`
loop = asyncio.get_event_loop()

# Since I am using Jupyter, this will work fine
loop.run_until_complete(main())

elapsed = time.perf_counter() - s
print(f"Code executed in {elapsed:0.4f} seconds.")

1: One
2: One
3: One
1: Two
2: Two
3: Two
Code executed in 1.0033 seconds.


## Synchronous version

In [19]:
def count(i):
    print(f"{i}: One")
    time.sleep(1)
    print(f"{i}: Two")

def main():
    for i in range(3):
        count(i)

s = time.perf_counter()
main()
elapsed = time.perf_counter() - s
print(f"Code executed in {elapsed:0.2f} seconds.")

0: One
0: Two
1: One
1: Two
2: One
2: Two
Code executed in 3.01 seconds.


---------

In [2]:
import asyncio
import random

# ANSI colors
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
)


async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = random.randint(0, 10)
    
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx + 1)
        i = random.randint(0, 10)

    # Here c[0] is used to Reset to default — so the rest of the terminal output remains unaffected.
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    
    return i

async def main():
    res = await asyncio.gather(*(makerandom(i, 10 - i - 3) for i in range(3)))
    return res


try:
    import nest_asyncio
    nest_asyncio.apply()
except ImportError as ie:
    print(f'Error: {ie}')

random.seed(444)
r1, r2, r3 = asyncio.run(main())
print()
print(f"r1: {r1}, r2: {r2}, r3: {r3}")

[36mInitiated makerandom(0).
[36mmakerandom(0) == 4 too low; retrying.
[91mInitiated makerandom(1).
[91mmakerandom(1) == 4 too low; retrying.
[35mInitiated makerandom(2).
[35mmakerandom(2) == 0 too low; retrying.
[36mmakerandom(0) == 4 too low; retrying.
[91m---> Finished: makerandom(1) == 7[0m
[36mmakerandom(0) == 4 too low; retrying.
[35mmakerandom(2) == 4 too low; retrying.
[36m---> Finished: makerandom(0) == 8[0m
[35m---> Finished: makerandom(2) == 10[0m

r1: 8, r2: 7, r3: 10


### chained asynchronous execution

In [6]:
import asyncio
import random
import time

async def part1(n: int) -> str:
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result

async def part2(n: int, arg: str) -> str:
    i = random.randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2{n, arg} == {result}.")
    return result

async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))

random.seed(42)
args = [1, 2, 3]
start = time.perf_counter()
asyncio.run(main(*args))
end = time.perf_counter() - start
print(f"Program finished in {end:0.2f} seconds.")

part1(1) sleeping for 10 seconds.
part1(2) sleeping for 1 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(2) == result2-1.
part2(2, 'result2-1') sleeping for 3 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(2, 'result2-1') == result2-2 derived from result2-1.
-->Chained result2 => result2-2 derived from result2-1 (took 4.00 seconds).
Returning part1(1) == result1-1.
part2(1, 'result1-1') sleeping for 3 seconds.
Returning part2(1, 'result1-1') == result1-2 derived from result1-1.
-->Chained result1 => result1-2 derived from result1-1 (took 13.00 seconds).
Program finished in 13.00 seconds.


### Asynchronous producer-consumer demo

In [3]:

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

async def randsleep(caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()


random.seed(444)

nprod = 5
ncon = 7

async def wrapper():
    start = time.perf_counter()
    await main(nprod=nprod, ncon=ncon)
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

await wrapper()

Producer 0 sleeping for 4 seconds.
Producer 2 sleeping for 7 seconds.
Producer 3 sleeping for 4 seconds.
Producer 4 sleeping for 10 seconds.
Consumer 0 sleeping for 7 seconds.
Consumer 1 sleeping for 8 seconds.
Consumer 2 sleeping for 4 seconds.
Consumer 3 sleeping for 7 seconds.
Consumer 4 sleeping for 1 seconds.
Consumer 5 sleeping for 6 seconds.
Consumer 6 sleeping for 9 seconds.
Producer 0 added <aa9f87dd18> to queue.
Producer 0 sleeping for 3 seconds.
Producer 3 added <170ceafb11> to queue.
Producer 3 sleeping for 9 seconds.
Consumer 2 got element <aa9f87dd18> in 0.00037 seconds.
Consumer 2 sleeping for 7 seconds.
Consumer 4 got element <170ceafb11> in 0.00050 seconds.
Consumer 4 sleeping for 10 seconds.
Producer 2 added <4bdb6e8395> to queue.
Producer 2 sleeping for 0 seconds.
Consumer 0 got element <4bdb6e8395> in 0.00027 seconds.
Consumer 0 sleeping for 1 seconds.
Producer 2 added <cfb0c90a1a> to queue.
Producer 2 sleeping for 0 seconds.
Consumer 3 got element <cfb0c90a1a> in 0

# Full Program: Asynchronous Requests

In [None]:
"""
Asynchronously, get links embedded in multiple pages' HMTL.
"""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()
    return html

async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found

async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)

async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                write_one(file=file, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))