# Concurrent web requests with asyncio and aiohttp

__aoihttp__ - library that uses non-blocking sockets to make web requests and returns coroutines for those requests, which we can then await for a result. <br>
We have also __requests__ but it has only blocking sockets!

In [3]:
import asyncio
import socket
from types import TracebackType
from typing import Optional, Type

class ConnectedSocket:
    def __init__(self, server_socket):
        self._connection = None
        self._server_socket = server_socket

    async def __aenter__(self):
        print('Entering context manager, waiting for connection')
        loop = asyncio.get_event_loop()
        connection, address = await loop.sock_accept(self._server_socket)
        self._connection = connection
        print('Accepted a connection')
        return self._connection
    
    async def __aexit__(self,
                        exc_type: Optional[Type[BaseException]],
                        exc_val: Optional[BaseException],
                        exc_tb: Optional[TracebackType]):
        print('Exiting context manager')
        self._connection.close()
        print('Closed connection')


async def main():
    loop = asyncio.get_event_loop()
    server_socket = socket.socket()
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_address = ('127.0.0.1', 8000)
    server_socket.setblocking(False)
    server_socket.bind(server_address)
    server_socket.listen()

    # We can use this method to catch requests and close them automatically 
    async with ConnectedSocket(server_socket) as connection:
        data = await loop.sock_recv(connection, 1024)
        print(data)
    
await main()

Entering context manager, waiting for connection
Accepted a connection
b'\x1b[A'
Exiting context manager
Closed connection


We have sessions, where you’ll keep many connections open, which can then be recycled. This is known as connection pooling. Connection pooling is an important concept that aids the performance of our aiohttp-based applications. <br>
We will create one session for application. <br>
A session object has methods on it for making any number of web requests, such as GET, PUT, and POST.

In [12]:
import aiohttp
import asyncio
from util.async_timed import async_timed
from aiohttp import ClientSession

@async_timed()
async def fetch_status(session : ClientSession, url: str) -> int:
    async with session.get(url) as result:
        return result.status

@async_timed()
async def main():
    # We create session where we will work / we can have till 100 connections by default but we can change it
    async with aiohttp.ClientSession() as session:
        url = 'https://www.example.com'
        status = await fetch_status(session, url)
        print(f"Status for {url} was {status}")

await main()

starting <function main at 0x000002337C132020> with args () {}'
starting <function fetch_status at 0x000002337C132D40> with args (<aiohttp.client.ClientSession object at 0x000002337BFF6E50>, 'https://www.example.com') {}'
finished <function fetch_status at 0x000002337C132D40> in 0.7393 second(s)
Status for https://www.example.com was 200
finished <function main at 0x000002337C132020> in 0.7403 second(s)


We can set timeout for our requests by using __ClientTimeout__. By default it is 5 mins. We can change this value.

In [26]:
import aiohttp
import asyncio
from util.async_timed import async_timed
from aiohttp import ClientSession

@async_timed()
async def fetch_status(session : ClientSession, url: str) -> int:
    # Creating timeout for 100ms for fetching status
    ten_millis = aiohttp.ClientTimeout(total=.01)

    async with session.get(url, timeout = ten_millis) as result:
        return result.status

@async_timed()
async def main():
    # Creating timeout for 1 sec
    session_timeout = aiohttp.ClientTimeout(total=1, connect=.1)

    async with aiohttp.ClientSession(timeout = session_timeout) as session:
        url = 'https://www.example.com'
        status = await fetch_status(session, url)
        print(f"Status for {url} was {status}")

# We will get asyncio.TimeoutError if our reqests will exceed the time that we defined
try:
    await main()
except asyncio.TimeoutError as f:
    print(f"Timeout error")

starting <function main at 0x000002337C8FFA60> with args () {}'
starting <function fetch_status at 0x000002337C8FF2E0> with args (<aiohttp.client.ClientSession object at 0x000002337C98C490>, 'https://www.example.com') {}'
finished <function fetch_status at 0x000002337C8FF2E0> in 0.0000 second(s)
finished <function main at 0x000002337C8FFA60> in 0.0010 second(s)
Timeout error


In [None]:
# This will not work concurrently because we create and immedeately await it, so it will run as synchonous code
"""
async def main() -> None:
    delay_times = [3, 3, 3]
    [await asyncio.create_task(delay(seconds)) for seconds in delay_times]
"""

# To fix it firstly we need to create all tasks then await all of them like this
"""
async def main() -> None:
    delay_times = [3, 3, 3]
    tasks = [asyncio.create_task(delay(seconds)) for seconds in delay_times]
    [await task for task in tasks]
 """
# Still this code is not perfect, if one task will get exception, then all tasks will be canceled but we can fix it with asyncio

We can use __gather__ instead of these list comprehensions, also it can solve the problem above

In [1]:
import asyncio
import aiohttp
from aiohttp import ClientSession
from chapter_04 import fetch_status
from util.async_timed import async_timed

# Lets 1000 times fetch status
@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://www.example.com' for _ in range(1000)]
        requests = [fetch_status(session, url) for url in urls]
        status_codes = await asyncio.gather(*requests)
        print(status_codes)

# Code is working concurrently
await main()

starting <function main at 0x0000016E4B299940> with args () {}'


  asyncio.sleep(delay)


[200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200,

Very important: It is worth noting that the results for each awaitable we pass in may not complete in
a deterministic order, A nice feature of gather is that, regardless of when
our awaitables complete, we are guaranteed the results will be returned in the order
we passed them in.

Working with exceptions with gather: <br>
* return_exceptions=False — This is the default value for gather. In this case, if any of our coroutines throws an exception, our gather call will also throw that exception when we await it. However, even though one of our coroutines failed, our other coroutines are not canceled and will continue to run as long as we handle the exception, or the exception does not result in the event loop stopping and canceling the tasks.
* return_exceptions=True — In this case, gather will return any exceptions as part of the result list it returns when we await it. The call to gather will not throw any exceptions itself, and we’ll be able handle all exceptions as we wish.

In [8]:
# What happen if we use wrong url:
@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://www.example.com', 'python://www.example.com']
        requests = [fetch_status(session, url) for url in urls]
        status_codes = await asyncio.gather(*requests, return_exceptions=1)
        print(status_codes)

# We get AssertionError frow wrong urls
# Impoertant, even if we get exceptions, all tasks is running anyways
await main()

starting <function main at 0x000001C84D126480> with args () {}'
[200, AssertionError()]
finished <function main at 0x000001C84D126480> in 0.7804 second(s)


Disadvantages of __gather__:
* It is hard with gather cancel all tasks if we get exception.
* All tasks should end before we can do something with results

Processing requests as they complete:

In [9]:
async def fetch_status(session : ClientSession, url: str, delay: int = 0) -> int:
    await asyncio.sleep(delay)
    async with session.get(url) as result:
        return result.status
    
@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [fetch_status(session, 'https://www.example.com', 1),
                    fetch_status(session, 'https://www.example.com', 1),
                    fetch_status(session, 'https://www.example.com', 10)]
        
        # Gather is waiting all 10 seconds  
        "results = await asyncio.gather(*fetchers)"
        # finished <function main at 0x0000016E4B77E7A0> in 10.1961 second(s)

        # as_completed returns tasks the moment when they finish not waiting other tasks
        for finished_tasks in asyncio.as_completed(fetchers):
            print(await finished_tasks)

await main()

starting <function main at 0x0000016E4E354720> with args () {}'
200
200
200
finished <function main at 0x0000016E4E354720> in 10.1944 second(s)


In [None]:
@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [fetch_status(session, 'https://www.example.com', 10),
                    fetch_status(session, 'https://www.example.com', 10),
                    fetch_status(session, 'https://www.example.com', 1)]

        # as_completed returns tasks the moment when they finish not waiting other tasks
        for finished_tasks in asyncio.as_completed(fetchers, timeout=2):
            try:
                print(await finished_tasks)
            except asyncio.TimeoutError as f:
                print(f"TimeoutError")

# The order os tasks is unpredictable
await main()

### __Important__: Even if we get exception, tasks will continue to run as for both gather and as_completed

In [17]:
# We can kinda solve problem of running tasks even after getting exception with wait
@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [asyncio.create_task(fetch_status(session, 'https://www.example.com', 1) ) for _ in range(4) ]

        done, pending = await asyncio.wait(fetchers)

        print(f"Done task count: {len(done)}")
        print(f"Pending task count: {len(pending)}")

        for done_task in done:
            result = await done_task
            print(result)
    
await main()

starting <function main at 0x0000016E4B77EB60> with args () {}'
Done task count: 4
Pending task count: 0
200
200
200
200
finished <function main at 0x0000016E4B77EB60> in 1.7736 second(s)


In [20]:
import logging

# We can kinda solve problem of running tasks even after getting exception with wait
@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [asyncio.create_task(fetch_status(session, 'https://www.example.com', 1) ),
                    asyncio.create_task(fetch_status(session, 'python://www.example.com', 1) )]

        done, pending = await asyncio.wait(fetchers)

        print(f"Done task count: {len(done)}")
        print(f"Pending task count: {len(pending)}")

        for done_task in done:
            # result = await done_task -> we can get exception
            if done_task.exception() is None:
                print(done_task.result())
            else:
                # logging.error("Error", exc_info=done_task.exception())
                print("Error")

await main()

starting <function main at 0x0000016E4E356A20> with args () {}'
Done task count: 2
Pending task count: 0
200
Error
finished <function main at 0x0000016E4E356A20> in 1.7361 second(s)


ALL_COMLETED works like gather, so it means if we get exceptions, we will not see the until all tasks ends <br>
FIRST_EXCEPTION it will return done, pending immediately if it will face exception

In [25]:
import logging

@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [asyncio.create_task(fetch_status(session, 'python://www.example.com', 1) ),
                    asyncio.create_task(fetch_status(session, 'https://www.example.com', 3) ),
                    asyncio.create_task(fetch_status(session, 'https://www.example.com', 5) )]

        done, pending = await asyncio.wait(fetchers, return_when=asyncio.FIRST_EXCEPTION)

        print(f"Done task count: {len(done)}")
        print(f"Pending task count: {len(pending)}")

        for done_task in done:
            # result = await done_task -> we can get exception
            if done_task.exception() is None:
                print(done_task.result())
            else:
                # logging.error("Error", exc_info=done_task.exception())
                print("Error")

        for pending_task in pending:
            pending_task.cancel()

# Here we get exception and cancel all pending tasks
await main()

starting <function main at 0x0000016E4E3565C0> with args () {}'
Done task count: 1
Pending task count: 2
Error
finished <function main at 0x0000016E4E3565C0> in 1.0173 second(s)


We can change behaviour of __wait__ to work as __as_completed__ using FIRST_COMPLETED

In [27]:
@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [asyncio.create_task(fetch_status(session, 'https://www.example.com', 1) ),
                    asyncio.create_task(fetch_status(session, 'https://www.example.com', 3) ),
                    asyncio.create_task(fetch_status(session, 'https://www.example.com', 5) )]

        pending = fetchers

        while pending:
            done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)

            print(f"Done task count: {len(done)}")
            print(f"Pending task count: {len(pending)}")

            for done_task in done:
                # result = await done_task -> we can get exception
                if done_task.exception() is None:
                    print(done_task.result())
                else:
                    # logging.error("Error", exc_info=done_task.exception())
                    print("Error")

# Now it works like as_comleted but now we can actually have data about done tasks and pednig tasks 
await main()

starting <function main at 0x0000016E4E355580> with args () {}'
Done task count: 1
Pending task count: 2
200
Done task count: 1
Pending task count: 1
200
Done task count: 1
Pending task count: 0
200
finished <function main at 0x0000016E4E355580> in 5.1975 second(s)


In [29]:
@async_timed()
async def main():
    async with aiohttp.ClientSession() as session:
        fetchers = [asyncio.create_task(fetch_status(session, 'https://www.example.com', 1) ),
                    asyncio.create_task(fetch_status(session, 'https://www.example.com', 3) ),
                    asyncio.create_task(fetch_status(session, 'https://www.example.com', 5) )]

        done, pending = await asyncio.wait(fetchers, return_when=asyncio.ALL_COMPLETED, timeout=3)

        print(f"Done task count: {len(done)}")
        print(f"Pending task count: {len(pending)}")

        for done_task in done:
            # result = await done_task -> we can get exception
            if done_task.exception() is None:
                print(done_task.result())
            else:
                # logging.error("Error", exc_info=done_task.exception())
                print("Error")

        for pending_task in pending:
            pending_task.cancel()

# apllying timeout after which we should cancel all pending tasks for that moment
await main()

starting <function main at 0x0000016E4E3565C0> with args () {}'
Done task count: 1
Pending task count: 2
200
finished <function main at 0x0000016E4E3565C0> in 3.0033 second(s)


* Our own asynchronous context managers: __async with__
* __aiohttp__ library to make asynchronous web requests
* __gather__, __as_completed__, __wait__, working with exceptions, timeouts and cancelling tasks