# asyncio: Practical examples

## A Full Program: Asynchronous Requests

You’ve made it this far, and now it’s time for the fun and painless part. In this section, you’ll build a web-scraping URL collector, areq.py, using aiohttp, a blazingly fast async HTTP client/server framework. (We just need the client part.) Such a tool could be used to map connections between a cluster of sites, with the links forming a directed graph.

> Note: You may be wondering why Python’s requests package isn’t compatible with async IO. requests is built on top of urllib3, which in turn uses Python’s http and socket modules. By default, socket operations are blocking. This means that Python won’t like await requests.get(url) because .get() is not awaitable. In contrast, almost everything in aiohttp is an awaitable coroutine, such as session.request() and response.text(). It’s a great package otherwise, but you’re doing yourself a disservice by using requests in asynchronous code.

The high-level program structure will look like this:
- Read a sequence of URLs from a local file, urls.txt.
- Send GET requests for the URLs and decode the resulting content. If this fails, stop there for a URL.
- Search for the URLs within href tags in the HTML of the responses.
- Write the results to foundurls.txt.
- Do all of the above as asynchronously and concurrently as possible. (Use aiohttp for the requests, and aiofiles for the file-appends. These are two primary examples of IO that are well-suited for the async IO model.)

Here are the contents of urls.txt. It’s not huge, and contains mostly highly trafficked sites:

    $ cat urls.txt
    https://regex101.com/
    https://docs.python.org/3/this-url-will-404.html
    https://www.nytimes.com/guides/
    https://www.mediamatters.org/
    https://1.1.1.1/
    https://www.politico.com/tipsheets/morning-money
    https://www.bloomberg.com/markets/economics
    https://www.ietf.org/rfc/rfc2616.txt

The second URL in the list should return a 404 response, which you’ll need to handle gracefully. If you’re running an expanded version of this program, you’ll probably need to deal with much hairier problems than this, such a server disconnections and endless redirects.

The requests themselves should be made using a single session, to take advantage of reusage of the session’s internal connection pool.

Let’s take a look at the full program. We’ll walk through things step-by-step after:

In [None]:
#!/usr/bin/env python3
# areq.py

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")

# loger v modulu aiohttp in ga izklopimo da nas ne moti pri našem logiranju
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """
    # Don't do any try/except here.  If either the request or reading
    # of bytes raises, let that be handled by caller.
    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status() # raise if status >= 400
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text() # For bytes: resp.read()
    
    # Dont close session; let caller decide when to do that.
    return html

async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        # May be raised from other libraries, such as chardet or yarl.
        # logger.exception will show the full traceback.
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        # This portion is not really async, but it is the request/response
        # IO cycle that eats the largest portion of time.
        for link in HREF_RE.findall(html):
            try:
                # Ensure we return an absolute path
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found

async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)

async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                write_one(file=file, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)  # see also: return_exceptions=True

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    # Header - just a single, initial row-write
    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

This script is longer than our initial toy programs, so let’s break it down.

The constant HREF_RE is a regular expression to extract what we’re ultimately searching for, href tags within HTML:

In [1]:
import re
HREF_RE = re.compile(r'href="(.*?)"')
HREF_RE.search('Go to <a href="https://realpython.com/">Real Python</a>')

<re.Match object; span=(9, 39), match='href="https://realpython.com/"'>

The coroutine fetch_html() is a wrapper around a GET request to make the request and decode the resulting page HTML. It makes the request, awaits the response, and raises right away in the case of a non-200 status:

In [None]:
resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()

If the status is okay, fetch_html() returns the page HTML (a str). Notably, there is no exception handling done in this function. The logic is to propagate that exception to the caller and let it be handled there:

In [None]:
html = await resp.text()

We await session.request() and resp.text() because they’re awaitable coroutines. The request/response cycle would otherwise be the long-tailed, time-hogging portion of the application, but with async IO, fetch_html() lets the event loop work on other readily available jobs such as parsing and writing URLs that have already been fetched.

Next in the chain of coroutines comes parse(), which waits on fetch_html() for a given URL, and then extracts all of the href tags from that page’s HTML, making sure that each is valid and formatting it as an absolute path.

Admittedly, the second portion of parse() is blocking, but it consists of a quick regex match and ensuring that the links discovered are made into absolute paths.

In this specific case, this synchronous code should be quick and inconspicuous. But just remember that any line within a given coroutine will block other coroutines unless that line uses yield, await, or return. If the parsing was a more intensive process, you might want to consider running this portion in its own process with loop.run_in_executor().

Next, the coroutine write() takes a file object and a single URL, and waits on parse() to return a set of the parsed URLs, writing each to the file asynchronously along with its source URL through use of aiofiles, a package for async file IO.

Lastly, bulk_crawl_and_write() serves as the main entry point into the script’s chain of coroutines. It uses a single session, and a task is created for each URL that is ultimately read from urls.txt.

Here are a few additional points that deserve mention:
- The default ClientSession has an adapter with a maximum of 100 open connections. To change that, pass an instance of asyncio.connector.TCPConnector to ClientSession. You can also specify limits on a per-host basis.
- You can specify max timeouts for both the session as a whole and for individual requests.
- This script also uses async with, which works with an asynchronous context manager. I haven’t devoted a whole section to this concept because the transition from synchronous to asynchronous context managers is fairly straightforward. The latter has to define `.__aenter__()` and `.__aexit__()` rather than `.__exit__()` and `.__enter__()`. As you might expect, async with can only be used inside a coroutine function declared with async def.

If you’d like to explore a bit more, the companion files for this tutorial up at GitHub have comments and docstrings attached as well.

Here’s the execution in all of its glory, as areq.py gets, parses, and saves results for 9 URLs in under a second:

    python3 areq.py

That’s not too shabby! As a sanity check, you can check the line-count on the output. In my case, it’s 626, though keep in mind this may fluctuate:

    $ wc -l foundurls.txt
         626 foundurls.txt

    $ head -n 3 foundurls.txt
    source_url  parsed_url
    https://www.bloomberg.com/markets/economics https://www.bloomberg.com/feedback
    https://www.bloomberg.com/markets/economics https://www.bloomberg.com/notices/tos

<div class="alert alert-primary" role="alert">
<p><strong>Next Steps</strong>: If you’d like to up the ante, make this webcrawler recursive. You can use <a href="https://github.com/aio-libs/aioredis"><code>aio-redis</code></a> to keep track of which URLs have been crawled within the tree to avoid requesting them twice, and connect links with Python’s <code>networkx</code> library.</p>
<p>Remember to be nice. Sending 1000 concurrent requests to a small, unsuspecting website is bad, bad, bad. There are ways to limit how many concurrent requests you’re making in one batch, such as in using the <a href="https://stackoverflow.com/q/40836800/7954504">sempahore</a> objects of <code>asyncio</code> or using a pattern <a href="https://www.artificialworlds.net/blog/2017/05/31/python-3-large-numbers-of-tasks-with-limited-concurrency/">like this one</a>. If you don’t heed this warning, you may get a massive batch of <code>TimeoutError</code> exceptions and only end up hurting your own program.</p>
</div>

# asyncio: We Did It Wrong

https://www.roguelynn.com/words/asyncio-we-did-it-wrong/

<p>But it’s easy to get lulled into a false sense of security. This ain’t helpful. We’re led to believe that we’re able to do a lot with the structured <code>async</code>/<code>await</code> API layer. Some tutorials, while great for the developer getting their toes wet, try to illustrate <a href="https://medium.com/archsaber/a-simple-introduction-to-pythons-asyncio-595d9c9ecf8c#f57b">real</a> <a href="http://markuseliasson.se/article/introduction-to-asyncio/">world</a> <a href="https://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/">examples</a>, but are actually just beefed-up “hello, world”s. Some <a href="https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html">even</a> <a href="http://stackabuse.com/python-async-await-tutorial/">misuse</a> <a href="https://pymotw.com/3/asyncio/futures.html">parts</a> of <code>asyncio</code>’s interface, allowing one to <a href="https://www.transceptor.technology/single-post/2016/08/19/Part-1-Avoiding-callback-hell-in-Python">easily fall</a> into the depths of <a href="http://callbackhell.com/">callback hell</a>. Some get you easily up and running with <code>asyncio</code>, but then you may not realize it’s not correct or exactly what you want, or only gets you part of the way there. While <a href="https://medium.com/python-pandemonium/asyncio-coroutine-patterns-beyond-await-a6121486656f">there are tutorials</a> that do to improve upon the basic Hello, World “use” case, often times, it doesn’t go far enough. It’s often <em>still</em> just a web crawler. I’m not sure about others, but I’m not building web crawlers at Spotify.</p>

It’s not the fault of anyone though; asynchronous programming is difficult. Whether you use asyncio, Twisted, Tornado, or Golang, Erlang, Haskell, whatever, it’s just difficult. I myself have even fallen into this false sense of ease that the asyncio community builds where once I import it, everything will just fall into place as they should. I do believe asyncio is quite user-friendly, but I did underestimate the inherit complexity concurrent programming brings.

The past couple of services we built at Spotify were perfect use cases for asyncio: a chaos-monkey-like service for restarting instances in Google Cloud, and an event-driven host name generation service for DNS. Sure, we needed to make a lot of HTTP requests that should be non-blocking much like web crawlers. But these services also had to react to messages from a pub/sub, measure the progress of actions initiated from those messages, handle any incomplete actions or other external errors, take care of pub/sub message lease management, measure SLIs, and send metrics. And we needed to use non-asyncio-friendly dependencies as well. This quickly got difficult.

So having lived through that and survived, allow me to provide you a real-world example that actually comes from the real world. As I mentioned, one of the asyncio services we built is similar to a chaos monkey to do periodic hard restarts of our entire fleet of instances. We’ll build a simplified version, dubbing it “Mayhem Mandrill” which will listen for a pub/sub message as a trigger to go ahead and restart a host based off of that message. As we build this service, I’ll point out potential traps to avoid. This will essentially become the type of resource that past Lynn would have wanted a year or two ago.

https://pypi.org/project/attrs/

## Initial Setup with asyncio

To recap from the intro, we are building a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

### Foundations for a Pub/Sub

There are a lot of choices for pub/sub-like technologies out there; I’m most familiar with Google Cloud Pub/Sub. But for our purposes, we’ll simulate a pub/sub with asyncio, inspired by this official-looking tutorial using asyncio.Queues.

### Running an asyncio-based Service

So far, we don’t have a running service; it’s merely just a pipeline or a batch job right now.

Also note that this asyncio.run is new as of 3.7:

In [None]:
# <---snip--->
def main():
    queue = asyncio.Queue()
    asyncio.run(publish(queue, 5))
    asyncio.run(consume(queue))

But this is a service, so we don’t want it to just run once, but continually consume from a publisher. And unfortunately, there isn’t a decent way to start a long-running service that is not an HTTP server in python 3.7. So we’ll stick with the pre-3.7 boilerplate.

With that, we’ll create tasks out of the two coroutines using `asyncio.create_task`, which will schedule them on the loop. And then start the loop, telling it to run forever.

In [None]:
def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.create_task(publish(queue, 5))
    loop.create_task(consume(queue))
    loop.run_forever()
    loop.close()
    logging.info("Successfully shutdown the Mayhem service.")

When running with this updated code, we see that all messages are published and then consumed. Then we hang because there is no more work to be done; we only published 5 messages, after all. To stop the “hanging” process, we must interrupt it (via ^C or sending a signal like kill -15 <pid>):

That’s nice and …ugly. You may notice that we never get to the log line of "Successfully shutdown the Mayhem service."; we actually are not closing the loop either. Let’s add a quick fix.

### Running the event loop defensively

Our main function is just typical boilerplate code to get the service running. We have a queue instance, setup the loop, schedule the publish & consume tasks, and start then close the event loop:

In [None]:
def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.create_task(publish(queue, 5))
    loop.create_task(consume(queue))
    loop.run_forever()
    loop.close()
    logging.info("Successfully shutdown the Mayhem service.")

We’ve been stopping our service with CTRL-C, a.k.a. SIGINT, a.k.a. KeyboardInterrupt. So let’s wrap our running of the service in a try/catch/finally around that interrupt signal:

In [None]:
def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue, 5))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")


if __name__ == "__main__":
    main()

And running it again, sending CTRL-C after it is done consuming.

This is clean enough for now, but later on we’ll build off of this by adding a graceful shutdown in part 2 as well as add proper exception handling in part 3.

## True Concurrency with asyncio

### We’re still blocking

I’ve seen quite a tutorials that make use of async and await in a way that, while does not block the event loop, is still iterating through tasks serially, effectively not actually adding any concurrency.

As our code so far was adapted from this popular tutorial, we are still sequentially processing each item we produce and consume. **The event loop itself isn’t blocked**; if we had other tasks/coroutines going on, they of course wouldn’t be blocked.

This might seem obvious to some, but it definitely isn’t to all. We are essentially blocking ourselves; first we produce all the messages, one by one. Then we consume them, one by one. The loops we have (for x in range(1, n+1) in publish(), and while True in consume()) block ourselves from moving onto the next message while we await to do something.

While this is technically a working example of a pub/sub-like queue with asyncio, it’s not what we want. Whether we are building an event-driven service (like this walk through), or a pipeline/batch job, we’re not taking advantage of the concurrency that asyncio can provide.

### Actually being concurrent

To reiterate our goal here: we want to build an event-driven service that consumes from a pub/sub, and processes messages as they come in. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

To help facilitate this, we’ll also need to build a service that actually runs forever. We’re not going to have a preset number of messages; we need to react whenever we’re told to restart an instance. The triggering event to publish a restart request message could be an on-demand request from a service owner, or a scheduled gradually rolling restart of the fleet.

#### Concurrent publisher

Let’s first create a mock publisher that will always be publishing restart request messages, and therefore never indicate that it’s done. This also means we’re not providing a set number of messages to publish, so we have to rework that a bit, too. Here I’m adding a while True loop since we don’t want a finite number of messages to create. I’m also adding the creation of a unique ID for each message produced.

And finally – one of the key parts: I’m no longer using await with queue.put(msg). Instead, I’m using asyncio.create_task(queue.put(msg)):

In [None]:
# <-- snip -->
import uuid
# <-- snip -->

async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f"Published message {msg}")
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")


if __name__ == "__main__":
    main()

The asyncio.create_task will actually schedule the coroutine on the loop without blocking the rest of the for-loop. It can be thought of as a “fire and forget” mechanism. If we left the await in here, everything after within the scope of the publish coroutine function will be blocked. This isn’t that much of an issue in our current setup. However, it could be if we limited the size of the queue, then that await could be waiting on space to free up in the queue. So using create_task tells the loop to put the message on the queue as soon as it gets a chance, and allows us to continue on publishing messages.

Running for a few messages, then killing it, we see:

Let’s ignore that final ERROR line for now; we’ll be addressing it in exception handling.

So we’re happily creating and publishing messages, but it’s probably hard to see how this is concurrent right now. Let’s add multiple producers to help see this fact. I’ll temporarily updating the publish coroutine function to take in a publisher_id to make it clear that we have multiple publishers:

In [None]:
async def publish(queue, publisher_id):
    choices = string.ascii_lowercase + string.digits
    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f"[{publisher_id}] Published message {msg}")
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())

and then create multiple coroutines:

In [None]:
def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    coros = [publish(queue, i) for i in range(1, 4)]

    try:
        [loop.create_task(coro) for coro in coros]
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")


if __name__ == "__main__":
    main()

So now it’s a bit easier to see the concurrency with the out-of-order publisher IDs:

For the rest of the walk through, I’ll remove the multiple publishers; I just wanted to easily convey that it’s now concurrent, not just non-blocking.

I will also switch the log level of the publisher logs to debug so we can focus on the meat of the service since the publish coroutine function is merely meant to simulate an external pub/sub-like system.

#### Concurrent consumer

Now comes the time to add concurrency to the consumer bit. For this, the goal is to constantly consume messages from the queue and create non-blocking work based off of a newly-consumed message; in this case, to restart an instance.

The tricky part is the consumer needs to be written in a way that the consumption of a new message from the queue is separate from the work on the message itself. In other words, we have to simulate being “event-driven” by regularly pulling for a message in the queue since there’s no way to trigger work based off of a new message available in the queue (a.k.a. push-based).

Let’s first update the PubSubMessage class definition to add a boolean attribute for easier testing in the future:

In [None]:
@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

Now let’s add a coroutine that mocks the restart work that needs to be done on any consumed message:

In [None]:
async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.restarted = True
    logging.info(f"Restarted {msg.hostname}")

We’ll stick with our while True loop and await for the next message on the queue, and then create a task (and - not obviously - schedule it on the loop) out of restart_host rather than just await it.

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")

        asyncio.create_task(restart_host(msg))

Then adding it to our main section:

In [None]:
def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")


if __name__ == "__main__":
    main()

Running this, we see (note: I’ve put the "Published message PubSubMessage(...)" log line in publish to debug so we can see the concurrent consumer work a little easier):

Nice. We’re now pulling for messages whenever they’re available, and able to restart hosts but not block consuming messages.

### Concurrent work

We may want to do more than one thing per message. For example, we’d like to store the message in a database for potentially replaying later as well as initiate a restart of the given host.

Like we did with adding the restart_host functionality, let’s update the PubSubMessage class definition to add another boolean attribute for easier testing in the future:

In [None]:
@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

And now let’s define a save coroutine function:

In [None]:
async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.saved = True
    logging.info(f"Saved {msg} into database")

Within the consume coroutine function, we could just await on both coroutines serially:

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")

        # sequential awaits may **not** be what you want
        await save(msg)
        await restart_host(msg)

And running the script with this looks like:

We can see that although it doesn’t block the event loop, await save(msg) blocks await restart_host(msg), which blocks the consumption of future messages. But, perhaps we don’t need to await these two coroutines one right after another. These two tasks don’t necessarily need to depend on one another – completely side-stepping the potential concern/complexity of “should we restart a host if we fail to add the message to the database”.

So let’s treat them as such. Instead of awaiting them, we can make use asyncio.create_task again to have them scheduled on the loop, basically chucking it over to the loop for it to execute when it next can.

So let’s treat them as such. Instead of awaiting them, we can make use asyncio.create_task again to have them scheduled on the loop, basically chucking it over to the loop for it to execute when it next can.

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")

        asyncio.create_task(save(msg))
        asyncio.create_task(restart_host(msg))

Running with this approach, we can see save doesn’t unnecessarily block restart_host:

Yay! Note that we still do use await with the msg = await queue.get() because we can’t do anything further until we actually have a message. But restart_host and save do not need to block each other, nor do they need to block the loop from consuming another message.

### Aside: When you want sequential work
As an aside, sometimes you want your work to happen serially.

For instance, maybe you only want to restart hosts that have an uptime of more than 7 days, so you await another coroutine to check a host’s last restart date:

In [None]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")

        # potentially what you want
        last_restart = await last_restart_date(msg)
        if today - last_restart > max_days:
            await restart_host(msg)

Needing code to be sequential, to have steps or dependencies, it doesn’t mean that it can’t still be asynchronous. The await last_restart_date(msg) will yield to the loop, but it doesn’t mean that restart_host of that msg will be the next thing that the loop executes. It actually will not be available to execute until last_restart_date coroutine is done. The await just allows other work to happen outside of consume that has been scheduled on the loop.

### Message Cleanup

We’ve pulled a message from the queue, and fanned out work based off of that message. Now we need to perform any finalizing work on that message; for example, we need to acknowledge that we’re done with the message so it isn’t re-delivered by mistake.

We’ll first separate out the pulling of the message from the creating work off of it:

In [None]:
async def handle_message(msg):
    asyncio.create_task(save(msg))
    asyncio.create_task(restart_host(msg))

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")
        asyncio.create_task(handle_message(msg))

And let’s add yet another boolean attribute to our PubSubMessage class:

In [None]:
@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)
    acked         = attr.ib(repr=False, default=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

And now let’s define some cleanup behavior:

In [None]:
def cleanup(msg):
    msg.acked = True
    logging.info(f"Done. Acked {msg}")

We could go back to the sequential awaits since that’s a very direct way to manipulate the ordering:

In [None]:
async def handle_message(msg):
    await save(msg)
    await restart_host(msg)
    cleanup(msg)

But then we lose concurrency that we had. What we therefore want is to somehow have a task that wraps around the two coroutines of save and restart_host, since we have to wait for both to finish before cleaning up can happen.

We can make use of asyncio.gather, which returns a future-like object, to which we can attach the callback of cleanup via add_done_callback.

In [None]:
# <--snip-->
import functools
# <--snip-->

def cleanup(msg, fut):
    msg.acked = True
    logging.info(f"Done. Acked {msg}")


async def handle_message(msg):
    g_future = asyncio.gather(save(msg), restart_host(msg))

    callback = functools.partial(cleanup, msg)
    # add_done_callback requires a non-async func
    g_future.add_done_callback(callback)
    await g_future


async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")
        asyncio.create_task(handle_message(msg))

# <--snip-->

So once both save(msg) and restart(msg) coroutines are complete, cleanup will be called:

I personally have an allergy to callbacks. As well, perhaps we need cleanup to be non-blocking. Another approach could be just to await it:

In [None]:
async def cleanup(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.acked = True
    logging.info(f"Done. Acked {msg}")

async def handle_message(msg):
    await asyncio.gather(save(msg), restart_host(msg))
    await cleanup(msg)

Great, look how clean that is!

asyncio is pretty easy to use, but being easy to use doesn’t automatically mean you’re using it correctly. You can’t just throw around async and await keywords around blocking code. It’s a shift in a mental paradigm. Both with needing to think of what work can be queued up and fired off, and also where your code might still need to be sequential.

Having sequential code – “first A, then B, then C” – may seem like it’s blocking when it’s not. Sequential code can still be asynchronous. I might have to call customer service for something, and wait to be taken off hold to talk to them, but while I wait, I can put the phone on speaker and pet my super needy cat. I might be single-threaded as a person, but I can multi-task like CPUs, just like my code.

### Graceful Shutdowns with asyncio

Often, you’ll want your service to gracefully shutdown if it receives a POSIX signal of some sort, e.g. clean up open database connections, stop consuming messages, finish responding to current requests while not accepting new requests, etc. So, if we happen to restart an instance of our own service, we should clean up the “mess” we’ve made before exiting out.

We’ve been catching the commonly-known KeyboardInterrupt exception like many other tutorials and libraries. But there are many common signals that a service should expect and handled. A few typical ones are (descriptions from man signal):
- SIGHUP - Hangup detected on controlling terminal or death of controlling process
- SIGQUIT - Quit from keyboard (via ^\)
- SIGTERM - Termination signal
- SIGINT - Interrupt program

There’s also SIGKILL (i.e. the familiar kill -9) and SIGSTOP, although the standard is that they can’t be caught, blocked, or ignored.

Currently, if we quit our service via `^\` or send a signal via something like `pkill -TERM -f <script path>`, our service doesn’t get a chance to clean up:

    $ python part-1/mayhem_10.py
    19:08:25,553 INFO: Consumed PubSubMessage(instance_name='cattle-npww')
    19:08:25,554 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-npww')
    19:08:25,655 INFO: Consumed PubSubMessage(instance_name='cattle-rm7n')
    19:08:25,655 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-rm7n')
    19:08:25,790 INFO: Saved PubSubMessage(instance_name='cattle-rm7n') into database
    19:08:25,831 INFO: Saved PubSubMessage(instance_name='cattle-npww') into database
    [1]    78851 terminated  python part-1/mayhem_10.py

We see that we don’t reach the finally clause.

### Using a Signal Handler

It should also be pointed out that – even if we were to only ever expect a KeyboardInterrupt / SIGINT signal – it could happen outside the catching of the exception, potentially causing the service to end up in an incomplete or otherwise unknown state:

In [None]:
def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()                                # <-- could happen here or earlier

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")                        # <-- could happen here 
    finally:
        loop.close()                                               # <-- could happen here 
        logging.info("Successfully shutdown the Mayhem service.")  # <-- could happen here 

So, instead of catching KeyboardInterrupt, let’s attach some signal handlers to the loop.

First, we should define the shutdown behavior we want when a signal is caught:

In [None]:
async def shutdown(signal, loop):
    """Cleanup tasks tied to the service's shutdown."""
    logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info(f"Cancelling {len(tasks)} outstanding tasks")
    await asyncio.gather(*tasks)
    logging.info(f"Flushing metrics")
    loop.stop()

Here I’m just closing that simulated database connections, returning messages to pub/sub as not acknowledged (so they can be redelivered and not dropped), and finally cancelling the tasks. We don’t necessarily need to cancel pending tasks; we could just collect and allow them to finish. We may also want to take this opportunity to flush any collected metrics so they’re not lost.

Let’s hook this up to the main event loop now. We can also remove the KeyboardInterrupt catch since that’s now taken care of with adding signal.SIGINT as a handled signal.

In [None]:
# <-- snip -->
import signal
# <-- snip -->

def main():
    loop = asyncio.get_event_loop()
    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
    queue = asyncio.Queue()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

Side note: You might have noticed that within the lambda closure, I binded the s immediately. This is because without that, we end up running into an apparently common gotcha in Python-land: late bindings.

So now when I run the script, and in another terminal, run `pkill -TERM -f "python part-2/mayhem_1.py"`, (or -HUP or -INT), we see the following: