## Importing Libraries

In [1]:
import json
import matplotlib.pyplot as plt

import asyncio
from time import perf_counter
import aiohttp
import requests
from time import sleep
from tqdm import tqdm
from random import randint, shuffle

### <font size="5">Initializing the default configuration</font> 

In [2]:
LOAD_BALANCER_URL = "http://localhost:8000"
payload = {
    "N":6,
    "schema":{"columns":["Stud_id","Stud_name","Stud_marks"],
    "dtypes":["Number","String","String"]},
    "shards":[  {"Stud_id_low":0, "Shard_id": "sh1", "Shard_size":4096},
                {"Stud_id_low":4096, "Shard_id": "sh2", "Shard_size":4096},
                {"Stud_id_low":8192, "Shard_id": "sh3", "Shard_size":4096},
                {"Stud_id_low":12288, "Shard_id": "sh4", "Shard_size":4096},
                {"Stud_id_low":16384, "Shard_id": "sh5", "Shard_size":4096},
                {"Stud_id_low":20480, "Shard_id": "sh6", "Shard_size":4096}],

    "servers":{ "Server0":["sh1","sh2", "sh3", "sh4", "sh5"],
                "Server1":["sh2", "sh3", "sh4", "sh5","sh6"],
                "Server2":["sh3", "sh4", "sh5","sh6","sh1"],
                "Server3":["sh4", "sh5","sh6","sh1","sh2"],
                "Server4":["sh5", "sh6","sh1","sh2","sh3"],
                "Server5":["sh6","sh1","sh2","sh3", "sh4"],
                "Server6":["sh1","sh2","sh3", "sh4", "sh5"],
                "Server7":["sh2","sh3", "sh4", "sh5","sh6"],
                "Server8":["sh3", "sh4","sh6","sh1"],
                "Server9":["sh5","sh6","sh1","sh2"],
               
                
                
                }
}

response = requests.post(LOAD_BALANCER_URL+"/init", json=payload)
print(response.text)



{"message":"value of N and number of shards/servers don't match!","status":"failure"}


In [3]:
# Making 10,000 asynchronus write requests
REQ_COUNT = 10_000
MIN_STUDENT_ID, MAX_STUDENT_ID = 0, 24575
async def write(session, payload):
    async with session.post(LOAD_BALANCER_URL+"/write", json=payload) as r:
        if r.status != 200:
            r.raise_for_status()
            print("Error in write request")
        res=await r.text()
        return res
    
async def write_all(session, payload_list):
    tasks = []
    for payload in payload_list:
        task = asyncio.create_task(write(session, payload))
        tasks.append(task)
        
    print(f"<+> Sent all {len(payload_list)} write requests ")
    res = await asyncio.gather(*tasks)
    return res

shard_count = 6
block_size = MAX_STUDENT_ID//shard_count   
start = perf_counter()
timeout = aiohttp.ClientTimeout(total=60*30)
async with aiohttp.ClientSession(timeout=timeout) as session:
    payload_list = []
    i = 0
    while i <= MAX_STUDENT_ID:
        student_id = i
        student_name = "student_"+str(i)
        student_marks = i%100
        payload = { "data": [ {"Stud_id": student_id,"Stud_name": student_name, "Stud_marks": student_marks} ]
                    }
        payload_list.append(payload)
        
        if len(payload_list) == REQ_COUNT:
            break
        
        i+=1
        if i%block_size == REQ_COUNT//shard_count:
            i= (1+i//block_size)*block_size
        
    shuffle(payload_list)
    htmls = await write_all(session, payload_list)
end = perf_counter()
print(f"Time taken to complete {REQ_COUNT} requests: {end-start:.2f} seconds")

<+> Sent all 10000 write requests 


ClientResponseError: 500, message='Internal Server Error', url=URL('http://localhost:8000/write')

In [None]:
# Making 10,000 asynchronus read requests

REQ_COUNT = 10000
async def read(session, payload):
    async with session.post(LOAD_BALANCER_URL+"/read", json=payload) as r:
        if r.status != 200:
            r.raise_for_status()
            print("Error in read request")
        res=await r.text()
        return res
    
async def read_all(session, payload_list):
    tasks = []
    for payload in payload_list:
        task = asyncio.create_task(read(session, payload))
        tasks.append(task)
        
    print(f"<+> Sent all {len(payload_list)} read requests ")
    res = await asyncio.gather(*tasks)
    return res

shard_count = 4
block_size = MAX_STUDENT_ID//shard_count   
start = perf_counter()
async with aiohttp.ClientSession() as session:
    payload_list = []
    
    for i in range(REQ_COUNT):
        low = randint(MIN_STUDENT_ID, MAX_STUDENT_ID)
        high = randint(low, min(MAX_STUDENT_ID, low + randint(1, 100)))
        
        payload = { "Stud_id": {"low":low, "high":high}   }
        payload_list.append(payload)
        
        if len(payload_list) == REQ_COUNT:
            break
        
        
    htmls = await read_all(session, payload_list)
end = perf_counter()
print(f"Time taken to complete {REQ_COUNT} requests: {end-start:.2f} seconds")

<+> Sent all 10000 read requests 
Time taken to complete 10000 requests: 75.85 seconds
