# 8. Asynchronous I/O
## Async/Await - Coroutines

- An `async` function, defined with `async def` is called a **coroutine**
- implemented like generators - able to pause execution and resume as needed
- `await` is similar to `yield` statement
    - function execution is paused while other codes are running
    - once `await/yield` resolves, the execution is resumed
    - event loops are responsible for pausing/resuming
- Python 2.7
    - future based concurrency can get strange when we tried to use coroutines as actual functions
    - generators couldn't return values
        - third party libraries handle this
- Python 3.4+
    - easier coroutine creation
    - third party libraries have to deal with this awkward transition
        - e.g.) `tornado`'s `gen` module
- most fully concurrent code's main entry point is primarily setting up / starting the event loop
    - this assumes your entire program is concurrent
    - `asyncio.loop`
        - `loop.run_until_complete(coro: Coroutine)`
        - `loop.run_forever()`
        - `asyncio.run(coro)`

## Objectives

- will analyze a web cralwer that fetches data from an HTTP server that has latency built
    - represents the general response-time latency when dealing with I/O
    - can support multiple connections at a time
        - true for most services
        - but if the service cannot handle multiple connections -- will perform only as fast as the serial case
- will first create a serial cralwer, as a naive python solution and then
- will build up to a full `aiohttp` solution by iterating through `gevent` and `tornado`
- finally will combine async I/O tasks with CPU tasks in order to effectively hide any time spent on I/O

## Server

In [1]:
import json
import time
from collections import defaultdict

from tornado import gen, httpserver, ioloop, options, web

options.define("port", default=8082, help="Port to serve on")


class AddMetric(web.RequestHandler):
    metric_data = defaultdict(list)

    async def get(self):
        if self.get_argument("flush", False):
            json.dump(self.metric_data, open("metric_data.json", "w+"))
        else:
            name = self.get_argument("name")
            try:
                delay = int(self.get_argument("delay", 1024))
            except ValueError:
                raise web.HTTPError(400, reason="Invalid value for delay")

            start = time.time()
            await gen.sleep(delay / 1000.0)
            self.write(".")
            self.finish()
            end = time.time()
            self.metric_data[name].append(
                {"start": start, "end": end, "dt": end - start}
            )

## Serial Crawler

- takes a list of URL's
- fetches them
- sums the total length of the content from the pages
- will use a custom http server taking two parameters: `name` and `delay`
    - `delay`: how long the server should pause before responding, in milliseconds
    - `name`: a name needed for logging


In [1]:
import random
import string

import requests


def generate_urls(base_url, num_urls):
    """
    We add random characters to the end of the URL to break any caching
    mechanisms in the requests library or the server
    """
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def run_experiment(base_url, num_iter=1000):
    response_size = 0
    for url in generate_urls(base_url, num_iter):
        response = requests.get(url)
        response_size += len(response.text)
    return response_size

In [2]:
import time

delay = 100
num_iter = 1000
base_url = f"http://127.0.0.1:8082/add?name=serial&delay={delay}&"

start = time.time()
result = run_experiment(base_url, num_iter)
end = time.time()
print(f"Result: {result}, Time: {end - start}")

Result: 1000, Time: 101.58819913864136


- serial: there is no interleaving of our requests
- each request takes 100 ms, and there are 500 requests: run time expected to be 50 sec
    

## Gevent

- is of paradigm of having async functions return futures
    - serial-compatible code
- monkey-patches the standard I/O functions to be async
    - most of the time we can simply use the standard I/O packages and just monkey-patch it

- produces two mechnisms to enable async programming
    - it patches the standard library with async I/O
    - it has `Greenlets` object that can be used for concurrent eecution
- A **greenlet** is a type of coroutine
    - can be thought of as a thread
    - however, all greenlets run on the same physical thread
    - we have an event loop on a single CPU that is able to switch during I/O wait
- gevent tries to make event loop handling as transparent as possible via use of `wait` functions
- `wait` will start an event loop and run it as long as is needed for all greenlets
    - consequently, most gevent code will run serially
    - then at some point, many greenlets to do a concurrent task and start the event loop with the `wait`
    - while `wait` is executing, all concurrent tasks queued up will run until completion/stopping condition
    - the rest of the code will run serially again

- futures are created with `gevent.spawn`
    - which takes a function and the arguments
    - then launches a greenlet responsible for running that function
    - the greenlet can be a future
        - since once the function completes, its value will be contained within the greenlet's `value` field
- patching of Python standard modules can make it harder to control the subtleties of what
- one thing to ensure when doing async I/O is not to open too many files/connections at once
    - could overload the remote server or
    - slow down our process by having to context-switch too many times
- to limit the number of open files, we use a semaphore as a lockign mechanism
- then can wait until `gevent.iwait` , which takes a sequence of futures and iterates over or
- `gevent.wait`, which would block execution of our program until all requests are done
- semaphore will handle grouping the rqeusts

In [1]:
import random
import string
import urllib.error
import urllib.parse
import urllib.request
from contextlib import closing

import gevent
from gevent import monkey
from gevent.lock import Semaphore

monkey.patch_socket()


def generate_urls(base_url, num_urls):
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def download(url, semaphore):
    with semaphore:  # <2>
        with closing(urllib.request.urlopen(url)) as data:
            return data.read()


def chunked_requests(urls, chunk_size=100):
    """
    Given an iterable of urls, this function will yield back the contents of the
    URLs. The requests will be batched up in "chunk_size" batches using a
    semaphore
    """
    semaphore = Semaphore(chunk_size)  # <1>
    requests = [gevent.spawn(download, u, semaphore) for u in urls]  # <3>
    for response in gevent.iwait(requests):
        yield response


def run_experiment(base_url, num_iter=1000):
    urls = generate_urls(base_url, num_iter)
    response_futures = chunked_requests(urls, 100)  # <4>
    response_size = sum(len(r.value) for r in response_futures)
    return response_size


import time

delay = 100
num_iter = 1000
base_url = f"http://127.0.0.1:8082/add?name=gevent&delay={delay}&"

start = time.time()
result = run_experiment(base_url, num_iter)
end = time.time()
print(f"Result: {result}, Time: {end - start}")


Result: 1000, Time: 1.1517233848571777


- N.B. `gevent` is used to make our I/O requests async, but no non-I/O oomputations while in I/O wait
    - there is still a massive speed-up by launching more requests while waiting for previous ones

## Tornado

- another package for async I/O in Python
- originally developed by Facebook, primarily for HTTP clients/servers
- framework since Python 3.5
- originaly used a system of callbacks - switched to coroutines
- currently, can either use `async/await` or `tornado.gen` module

In [7]:
import asyncio
import random
import string

from tornado.httpclient import AsyncHTTPClient

AsyncHTTPClient.configure(
    "tornado.curl_httpclient.CurlAsyncHTTPClient", max_clients=100  # <1>
)


def generate_urls(base_url, num_urls):
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


async def run_experiment(base_url, num_iter=1000):
    http_client = AsyncHTTPClient()
    urls = generate_urls(base_url, num_iter)
    response_sum = 0
    tasks = [http_client.fetch(url) for url in urls]  # <2>
    for task in asyncio.as_completed(tasks):  # <3>
        response = await task  # <4>
        response_sum += len(response.body)
    return response_sum


import time

delay = 100
num_iter = 1000
run_func = run_experiment(
    f"http://127.0.0.1:8082/add?name=tornado&delay={delay}&",
    num_iter)

start = time.time()
result = await run_func  # <5>
end = time.time()
print(f"Result: {result}, Time: {end - start}")

Result: 1000, Time: 1.1086230278015137


- important difference between `tornado` and `gevent` is when the event loop runs
    - `gevent` - the event loop is running only while the `iwait`
    - `tornado` - the event loop is running the entire time and controls the complete execution flow of the program
- `tornado` - ideal for a mostly I/O-bound application where most of it should be async
- `gevent` - ideal for mainly CPU - based that often involves heavy I/O
    - e.g.) a lot of computations over a dataset and then ust send the results back to the database
    - databases usually have http apis -- can even use `grequests`!

- another difference is the way the internals change the request call graphs
- `gevent` - very uniform call graph shape; new requests are issued the second a lost in the semaphore opens up
- `tornado` - is of "stop-and-go" shape; the internal limiting mechanism is not robust enough to finishing requests; suboptimal
- for all libraries that use asyncio to run the event loop, we can actually replace the backend library
    - `uvloop`

## aiohttp

- Python 3.4+ revamped the old `asyncio` standard library module, being quite low-level
- `aiohttp` is the first popular library built upon the new `asyncio` library
- provides both HTTP client, server functionality, and uses a similar API to that of `tornado`
- is a part of a greater project `aio-libs`

In [26]:
# asyncio http scraper
import asyncio
import os
import random
import string

import aiohttp


def generate_urls(base_url, num_urls):
    for i in range(num_urls):
        yield os.path.join(base_url, "".join(random.sample(string.ascii_lowercase, 10)))


def chunked_http_client(num_chunks):
    """
    Returns a function that can fetch from a URL,
    ensuring that only "num_chunks" of simultaneous connections are made.
    """
    # as in the `gevent` example, we must use a semaphore to limit the number of requests
    semaphore = asyncio.Semaphore(num_chunks)

    # return a new coroutine that will asynchronously download files and respect the semaphore locking
    async def http_get(url, client_session):
        nonlocal semaphore
        async with semaphore:
            async with client_session.request("GET", url) as response:
                return await response.content.read()
    

    return http_get


async def run_experiment(base_url, num_iter=1000):
    urls = generate_urls(base_url, num_iter)
    http_client = chunked_http_client(100)
    responses_sum = 0

    async with aiohttp.ClientSession() as client_session:
        # the http_client function returns futures
        # to keep track of progress, we save this into an iterable
        tasks = map(lambda url: http_client(url, client_session), urls)

        # as with `gevent`, we can wait for futures to become ready and iterate over them
        for future in asyncio.as_completed(tasks): #4
            data = await future
            responses_sum += len(data)

    return responses_sum


In [32]:
import time

delay = 100
num_iter = 1000

start = time.time()
result = await run_experiment(
    f"http://127.0.0.1:8082/add?name=asyncio&delay={delay}", num_iter
)
end = time.time()
print(f"Result: {result}, Time: {end - start}")

Result: 97000, Time: 0.24190711975097656


- different types of `async` calls and `await`
    - `async with`
        - `async` context manager, used to get access to shared resources in a concurrent-friendly way
        - we allow other coroutines to run while waiting to acquire the resources we are requesting
        - consequently, sharing things such as open semaphore slots / already open connections to our host can be done more efficiently than we experienced with `tornado`
    - `async def`
    - `await`
- well behaved call graph
    - faster than both `gevent` and `tornado` despite each call takes slightly longer
    - this is due to a faster resumption of coroutines paused by the semaphore or waiting for the HTTP client

- aiohttp vs tornado
    - with aiohttp we are very much in control of the event loop and the various subtleties
        - e.g.) we manually acquire the client session
        - e.g.) we manually read from the connection
    - very useful for real-world application
        - can easily add time-outs
        - can add functions such as post-process trigger
        - for a web server, such control allows us to write "defensive" code