In [29]:
import socket
import psutil as p
import yaml
import os
import pynvml
import threading
import speedtest
import math
# ML Specific
import pandas as pd
import numpy as np


In [30]:
# Read YAML file
with open("config.yaml", 'r') as stream:
    config_loaded = yaml.safe_load(stream)
    
######################################## Connections ################################
# format: [(ip,port,connection)i] for i=onode_count
onodes_connections = []
# format: connection_object
snode_connection = None
server_connection = None
    
# Connecting to server's ip address
server_connection = socket.socket()
host = config_loaded["server"]["ip"]
port = config_loaded["server"]["port"]
o_nodes_count = config_loaded["kazaa"]["o_node_per"]

In [31]:
# Global Fields
status = ''
onodes = []
onodes_threads = [None]*o_nodes_count
snodes = []
self_kazaa_constant = 0
onodes_kazaa_constant = [0]*o_nodes_count

username = input("Input your username: ")
print('Waiting for connection response')
try:
    server_connection.connect((host, port))
    print("Connected")
except socket.error as e:
    print(str(e))

Waiting for connection response
Connected


In [32]:
# socket send consistent method 
# it sends the string, waits for the length of string client received, compares this length to the actual length. If both are equal
# it sends ack else, it resends the same txt again.
def socket_send(connection, txt):
    ack = False
    print(">>sending",txt+"...")
    while(not ack):
        connection.send(str.encode(txt))
        len_txt = int(connection.recv(1024).decode('utf-8'))
        if len_txt == len(txt):
            connection.send(str.encode("ack"))
            ack = True
        else:
            connection.send(str.encode("resending"))

# socket receive consistent method 
# it receives the string, then sends its size back to the server and then it waits for an acknowledgement, If ack is received it 
# breaks out of the loop
def socket_rcv(connection, size=1024):
    ack = False
    while(not ack):
        txt = connection.recv(size).decode('utf-8')
        connection.send(str.encode(str(len(txt))))
        ack = True if connection.recv(size).decode('utf-8') == "ack" else False
    print(">>received", txt+"...")
    return txt

def memory():
    mem = p.virtual_memory()
    return (mem.total, mem.available)

def cpu():
    return (100-p.cpu_percent(), p.cpu_count(logical=True))

def gpu_memory():
    try:
        pynvml.nvmlInit()
        # 1 here is the GPU id
        handle = pynvml.nvmlDeviceGetHandleByIndex(1)
        meminfo = pynvml.nvmlDeviceGetMemoryInfo(handle)
        return meminfo.free
    except:
        return 0

def download_speed():
    try:
        st = speedtest.Speedtest()
        return st.download()/8
    except:
        return 0
   
def receive_rows(connection):
    print("starting receiving rows")
    row_count_str = socket_rcv(connection)
    if row_count_str.startswith("rcv:"):
        row_count = int(row_count_str[len("rcv:"):])
        headers = socket_rcv(connection)[len("rcv:"):].split(":")
        print("row_count:", row_count, "headers:", headers)
        data = []
        for i in range(row_count):
            data_rcvd = socket_rcv(connection, 32768)
            while(data_rcvd.count(":") > 3):
                socket_send(connection, "resend")
                data_rcvd = socket_rcv(connection, 32768)
            data.append(data_rcvd.split(":"))
            socket_send(connection, "rcvd")
            print("Rcvd row:", data[-1][:-1])
    return pd.DataFrame(data, columns=headers)

def send_rows_onode(connection, df, division, index, headers):
    start = division[index - 1] if index > 0 else 0
    end = division[index]
    socket_send(connection, "rcv:"+str(end - start))
    socket_send(connection, "rcv:"+headers)
    for i, row in df[end:start].iterrows():
        socket_send(connection, str(row[0])+":"+str(row[1])+":"+str(row[2])+":"+str(row[3]))
        ack = socket_rcv(connection)
        while(ack != "rcvd"):
            socket_send(connection, str(row[0])+":"+str(row[1])+":"+str(row[2])+":"+str(row[3]))
            ack = socket_rcv(connection)

def ensemble_distribution():
    # receive data from server
    df = receive_rows(server_connection)
    print("data rcvd from server for",df.shape[0],"rows with headers:\n",df.columns)
    division = []
    for i in onodes_kazaa_constant:
        division.append(math.ceil((df.shape[0]*i)/sum(onodes_kazaa_constant)))
    # also adding self division
    division.append((df.shape[0]*self_kazaa_constant)/sum(onodes_kazaa_constant))
    division = list(np.cumsum(division))
    # headers
    headers = ":".join(list(df.columns))
    
    # data for regional ordinary nodes
    index = 0
    print("Sending data...")
    for onode_connection in onodes_connections:
        send_rows_onode(onode_connection[2], df, division, index, headers)
        print("Sent all data to", onode_connection[0], "at port", onode_connection[1])
        index += 1
        
    # data for yourself
    df = df[division[-2]:division[-1]]
            
# here snode is sender and onode is receiver
def snode_onode_socket(connection, id):
    print("Waiting for kazaa_constant of",id,"ordinary node.")
    onodes_kazaa_constant[id] = float(socket_rcv(connection)[len("k_const:"):])
    
    if((id+1) == o_nodes_count):
        ensemble_distribution()
        
# here onode is sender and snode is receiver
def onode_snode_socket(connection):
    socket_send(connection, "k_const:"+str(self_kazaa_constant))
    df = receive_rows(connection)
    print("df received from supernode of my region has shape: ", df.shape)
 
def send_system_info():
    messages_to_send = 5
    socket_send(server_connection, "rcv:"+str(messages_to_send))
    # download speed in bytes
    socket_send(server_connection, "dsp:"+str(download_speed()))
    # available GPU ram in bytes
    socket_send(server_connection, "gpu:"+str(gpu_memory()))
    # available ram in bytes
    socket_send(server_connection, "ram:"+str(memory()[1]))
    # cpu free in percentage
    socket_send(server_connection, "cpu:"+str(cpu()[0]))
    # cpu cores in count
    socket_send(server_connection, "cor:"+str(cpu()[1]))
print("All defined perfectly")
   

All defined perfectly


In [33]:
# first check response received i.e "Server is working"
res = socket_rcv(server_connection)
print(res)

# sending system info
print("\n" + "/"*40 + " waiting for ack " + "/"*40)
send_system_info()


>>received Server is working:...
Server is working:

//////////////////////////////////////// waiting for ack ////////////////////////////////////////
>>sending rcv:5...
>>sending dsp:1440062.8193552135...
>>sending gpu:0...
>>sending ram:8990572544...
>>sending cpu:68.4...
>>sending cor:4...


In [34]:
# receive acknowledge
res = socket_rcv(server_connection)
print(res)

>>received ack: received following system info:  
Download_speed = 1440062.8193552135B 
Free_GPU_RAM = 0.0B 
Free_RAM = 8990572544.0B 
Free_CPU = 68.4 
CPU_Cores = 4...
ack: received following system info:  
Download_speed = 1440062.8193552135B 
Free_GPU_RAM = 0.0B 
Free_RAM = 8990572544.0B 
Free_CPU = 68.4 
CPU_Cores = 4


In [35]:
# receive status in kazaa architecture
print("\n" + "/"*40 + " waiting for status " + "/"*40)
status = socket_rcv(server_connection)[len("status:"):]
status
          


//////////////////////////////////////// waiting for status ////////////////////////////////////////
>>received status:s...


's'

In [36]:
self_kazaa_constant = socket_rcv(server_connection)[len("k_const:"):]
self_kazaa_constant

>>received k_const:0.8216999999999999...


'0.8216999999999999'

In [37]:
print("You are appointed as " + ("Super node" if status=="s" else "Ordinary node") + " with kazaa constant as", self_kazaa_constant)

You are appointed as Super node. with kazaa constant as 0.8216999999999999
