# Making your server go Brrrrrrrr

## (Async)

# What does Async Mean?

To understand what async mean we first need to know what sync means.

### Synchronous operations

For an operation to be synchronous that means that when I start doing that operation I am sure that the am working on it.

### Asynchronous operations

On the contrary Async operations are operations where the following happens:
1. I am just given the instruction of doing it
2. I am not expected to reply in real time
3. I should be able to proceses it at my own time

![AsyncvsSync](AsyncvsSync.png)

# What is Async Programming

Async programming is the concept of writing code that takes the function calls and replies to them later after processing, thus enabling I/O bound operations to be replied to later.

Let's note that async programming is single threaded so you can always assume that at a time only one operation is executing.

### I/O Bound Operations

I/O Bound operations are the operations where the processing happnes outside of your current machine hence your machine is just idley sitting waiting for a response.

### Python async programming
Example:
```python
import asyncio

async def fn():
    #do something

async def run():
    task = fn()
    await fn()

asyncio.run(run())
```

Let's look at the above code and figure out what happens when task = fn() happens this doesn't mean that we called fn() and it's executing behind the scenes it just means that fn() was registered to be executed later.

### (A)Wait a minute what does await do?

Does await actually tell the thread to start executing fn() and it just executes it ... Nope.

What does await do? Await yields control back to an orchesterator that orchestrator looks through the registered functions and chooses which one to execute, but before going through that let's look at some examples.

In [7]:
import httpx
import asyncio

URL = "https://jsonplaceholder.typicode.com/todos/1"

"""
Example:
    CPU bound code:
        Is any code that runs on your machine
        
    I/O bound code:
        Code that isn't run on your machine is I/O bound code

    Normal sync flow:
        1. send request -> CPU bound (blocking code)
        2. process the request -> I/O bound (Non-blocking code)
        3. receive the response -> CPU bound (blocking code)

    Async flow:
    
        10 requests
        10 times 1. send the request
    
        for request in requests:
            await get_response()

    Sync order of execution:
    
        1. fetch_data(1)
        2. client.get(URL)
        3. return response.json()

        4. fetch_data(2)
        5. client.get(URL)
        6. return response.json()
    
        7. fetch_data(3)
        8. client.get(URL)
        9. return response.json()

    Async order of execution:
        1. fetch_data(1)
        2. fetch_data(2)
        3. fetch_data(3)
        
        4. client.get(URL)
        5. client.get(URL) Non-blocking code
        6. client.get(URL)
        
        ...

"""


async def fetch_data(index):
    print(f"Start async task: {index}")
    async with httpx.AsyncClient() as client:
        response = await client.get(URL)
        print(f"Finished async task: {index}")
        return response.json()


async def run():
    await asyncio.gather(fetch_data(1), fetch_data(2), fetch_data(3))

await run()

Start async task: 1
Start async task: 2
Start async task: 3
Finished async task: 1
Finished async task: 3
Finished async task: 2


# Event Loop & Task Queue

### Event Loop

The event loop is the resource that is responsible for choosing which registered function to get executed and at it's core it's just a while loop, it can be as easy as the following code:

```python
    while tasks:
        task = tasks.pop(0)
        task()
```

### Task Queue

Ideally you would want some data structure to maintain the tasks registered so that the event loop would execute them and that can be also as easy as an array holding your tasks which gets registered when you're calling an async function

![event_loop](event_loop.png)


Don't believe me? Let's write our own event_loop, task_queue and use them to create our own async programming.

In [8]:
from collections import deque
import time

tasks = deque()


def event_loop():
    while tasks:
        task, start, args = tasks.popleft()
        if time.time() < start:
            tasks.append((task, start, args))
        else:
            task(*args)


def add_task(
    task,
    *args,
    callback: int = 0,
):
    tasks.append((task, time.time() + callback, args))

# Thread Blocking vs Non-Blocking code

How good your code performs might some time depends on how you're calling your functions

If you're not writing concurrently bad functions it might show and disallow the event loop to help you because for example there's always one function in the task queue because your functions are blocking the thread from preforming from registering multiple functions at once.

So you're forcing your async code to become sync while adding the overhead of it being async.

![event_loop_guard](event_loop_guard.jpg)

In [12]:
### Non-Blocking Code
"""
    Event loop
        [task1(2), task2(2)]

"""


def task1(cnt: int):
    print(f"Task 1 {cnt}")
    if cnt == 5:
        return
    # I/O bound
    add_task(task1, cnt + 1)  # await task1(cnt + 1)


def task2(cnt: int):
    print(f"Task 2 {cnt}")
    if cnt == 5:
        return
    add_task(task2, cnt + 1)  # await task2(cnt+1)


add_task(task1, 1)
add_task(task2, 1)
event_loop()

# """
#     task1(1)
#     task1(2)
#     task1(3)
#     task1(4)
#     task1(5)
    
#     task2(1)
#     task2(2)
#     task2(3)
#     task2(4)
#     task2(5)

# """

Task 1 1
Task 2 1
Task 1 2
Task 2 2
Task 1 3
Task 2 3
Task 1 4
Task 2 4
Task 1 5
Task 2 5


In [13]:
### Blocking Code

def task1(cnt: int):
    for i in range(cnt):
        # I/O blocking
        print(f"Task 1 {i + 1}")


def task2(cnt: int):
    for i in range(cnt):
        # I/O blocking
        print(f"Task 2 {i + 1}")


add_task(task1, 5)
add_task(task2, 5)
event_loop()

Task 1 1
Task 1 2
Task 1 3
Task 1 4
Task 1 5
Task 2 1
Task 2 2
Task 2 3
Task 2 4
Task 2 5


# Implemeting Sleep function (Blocking vs Non-Blocking)

There are two ways to implement the sleep function the first one is time.sleep() and the second one is asyncio.sleep()

### time.sleep()

![time_sleep](time_sleep.jpg)

time.sleep() is a thread level sleep function that forces the whole thread to sleep disallowing the thread from registering other functions in the event loop

### asyncio.sleep()

![asyncio_sleep](asyncio_sleep.jpg)

asyncio.sleep() is a function level sleep function that forces only the function calling the sleep to sleep while the rest of the thread keeps on registering functions



Let's look at how different their implementations are



In [14]:
### Blocking sleep

import time


def task1():
    print("Let's annoy the other task")
    start_time = time.time()
    delay = 5
    while time.time() < start_time + delay:
        pass


def task2():
    print("Let's wait :(")


add_task(task1)
add_task(task2)
event_loop()

Let's annoy the other task
Let's wait :(


In [16]:
### Non-Blocking sleep
def task3():
    print("Failed to annoy the other task :(")


def task1():
    print("Let's annoy the other task")
    add_task(task3, callback=5)


def task2():
    print("I didn't wait :)")


add_task(task1)
add_task(task2)
event_loop()


Let's annoy the other task
I didn't wait :)
Failed to annoy the other task :(


## Coroutines & Generators

- **Generators** are functions that can be paused & resumed by using `yield` within them, but you need to call them yourself.
- **Coroutines** are special functions in Python that can be paused and resumed, making them sophisticated generators that integrate correctly with the event loop.

The differnece is the generators are getting executed by you explicitly in the code, while the coroutines get executed by the event loop.

In [19]:
### Generators

def task():
    for i in range(1, 10):
        yield i


t = task()
print(t)
for i in range(1, 10):
    print(next(t))

<generator object task at 0x11853bed0>
1
2
3
4
5
6
7
8
9


In [22]:
### Coroutines

async def fn():
    print("Here's your co-routine sir/ma'am")


async def run():
    a = fn()
    print(a)
    time.sleep(5)
    await a


await run()

<coroutine object fn at 0x118525000>
Here's your co-routine sir/ma'am


# Futures & Tasks

- When you use `asyncio.create_task()`, it creates a **task**, which is essentially a wrapper over the coroutine.  
- When you create the task along with the coroutine, a **future** is created that waits for the coroutine to be awaited so that its value is set.  


In [25]:
### Futures

async def run():
    loop = asyncio.get_event_loop()

    future = loop.create_future()
    print(future)
    future.set_result(50)
    print(future)
    res = await future

    print(res)

await run()

<Future pending>
<Future finished result=50>
50


![servers](servers.jpg)

# Servers

For you to up a server you need one thing only a socket

A server socket is a network endpoint that listens for incoming connections from clients. It acts as a communication gateway, allowing multiple clients to establish connections and exchange data over a network.

The socket allows you to do three key functions to do I/O bound operations with other servers/clients.

### accept

socket.accept() is a function that returns the client_socket and client_address that wants to connect to this server.

When for example someone somewhere is doing server.connect(IP, PORT).

### send

socket.send() is a function that sends a message to a certain socket

### recv

socket.recv() is a function that receives a message from a certain socket

Do you think we can write an async server using the above functions? Yes we can.

In [None]:
import socket
import select
import queue

HOST = "127.0.0.1"
PORT = 5000
BUFFER_SIZE = 1 << 10

# TCP Socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((HOST, PORT))
server_socket.listen(5)

# select lists
read_sockets = [server_socket]
write_sockets = []
message_queues = {}


def delete_socket(sock):
    read_sockets.remove(sock)
    if sock in write_sockets:
        write_sockets.remove(sock)
    sock.close()
    del message_queues[sock]


def accept():
    client_socket, _ = server_socket.accept()
    read_sockets.append(client_socket)
    message_queues[client_socket] = queue.Queue()


def recv(sock):
    return sock.recv(BUFFER_SIZE)


def send(sock, msg):
    sock.send(msg)


print(f"Server listening on {HOST}:{PORT}")

while True:
    # Wait for sockets to be ready
    readable, writable, _ = select.select(read_sockets, write_sockets, [])

    for sock in readable:
        if sock == server_socket:
            accept()
        else:
            data = recv(sock)  # Receive data from client
            if data:
                print(f"Received : {data.decode()} from {sock.getpeername()}")
                response = f"Echo : {data.decode()}"

                message_queues[sock].put(
                    response.encode()
                )  # pushing data to the client

                if sock not in write_sockets:
                    write_sockets.append(sock)
            else:
                # Client disconnected
                print(f"Client {sock.getpeername()} disconnected")

                delete_socket(sock)

    for sock in writable:
        try:
            next_message = message_queues[sock].get_nowait()
        except queue.Empty:
            write_sockets.remove(sock)
        else:
            send(sock, next_message)
