In [4]:
#Definition of communication functions

import socket, struct
import numpy as np
from io import BytesIO
import time

def receiveAll(socket, n):
    buffer = bytearray()
    while len(buffer) < n:
        chunk = socket.recv(n - len(buffer))
        if not chunk:
            raise ConnectionError("Socket Closed!")
        buffer.extend(chunk)
    return bytes(buffer)

def receiveData(socket):
    n = struct.unpack("!Q", receiveAll(socket,8))[0]
    payload = receiveAll(socket, n)
    data = np.load(BytesIO(payload), allow_pickle=False)
    return data

def sendData(socket, data):
    data = np.ascontiguousarray(data, dtype = np.float64)
    tempFile = BytesIO()
    np.save(tempFile, data, allow_pickle=False)
    payload = tempFile.getvalue()
    socket.sendall(struct.pack("!Q", len(payload)))
    socket.sendall(payload)

def receiveMessage(socket):
    n = struct.unpack("!Q", receiveAll(socket,8))[0]
    return receiveAll(socket, n).decode("utf-8")

def sendMessage(socket, message):
    payload = message.encode("utf-8")
    socket.sendall(struct.pack("!Q", len(payload)))
    socket.sendall(payload)

In [5]:
def findBestCPU(totalCores, availableCores, coreSpeeds, jobFeatures):
    site, cpu = np.shape(totalCores)
    siteNum = np.random.randint(0, site)
    siteArray = np.zeros(site)
    siteArray[siteNum] = 1
    return siteArray

In [6]:
host = "127.0.0.1"
port = 5555

def openSocket(host, port):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((host, port))
    server.listen(1)
    print(f"Server listening on {host}:{port}")
    connection, address = server.accept()
    print(f"Connected from {address}")
    return server, connection, address

def closeSocket(server, connection):
    try:
        connection.close()
    finally:
        server.close()


server, connection, address = openSocket(host, port)

sendMessage(connection, "CONN")
print("Connection confirmed.")
n = 0

while True:
    n += 1
    #After confirmation, dispatcher waits job submission
    try:
        message = receiveMessage(connection)
        if message == "SBMT":
            sendMessage(connection, "WAIT")
    except:
        closeSocket(server, connection)
        raise ConnectionError("Disconnected!")
        
    # C++ sends number of total cores waiting data message
    totalCores = receiveData(connection)
    sendMessage(connection, "CNFM")
    print("Cores received.")
    print(totalCores)
    
    # C++ sends number of available cores after confirmation
    availableCores = receiveData(connection)
    sendMessage(connection, "CNFM")
    print("Available cores received.")
    print(availableCores)
    
    # C++ sends core speeds
    coreSpeeds = receiveData(connection)
    sendMessage(connection, "CNFM")
    print("Cores speeds received.")
    print(coreSpeeds)
    
    # C++ sends job features
    jobFeatures = receiveData(connection)
    print("Job features received")
    print(jobFeatures)
    sendMessage(connection, "CNFM")
    
    message = receiveMessage(connection)
    print(message)
    if message == "WAIT":
        pass
    else:
        raise ConnectionError("Failed connection.")

    #Python finds the best CPU and returns a binary matrix with shape (#of sites x max # of cpus), all 0 except
    bestCPU = findBestCPU(totalCores, availableCores, coreSpeeds, jobFeatures)
    print("Best CPU decided")

    # C++ waits to receive the matrix after the last feature confirmation
    sendData(connection, bestCPU)
    print("Site is submitted")
    print(bestCPU)
    print(f"JOB {n} SUBMITTED")
    #Python waits for the next job submission

Server listening on 127.0.0.1:5555
Connected from ('127.0.0.1', 46814)
Connection confirmed.
Cores received.
[[20 26  0 ...  0  0  0]
 [35 35 35 ... 47 47 47]
 [39 39 39 ... 40 40 40]
 ...
 [33 33 33 ... 27 27 27]
 [38 38 38 ... 20 20 20]
 [31 31 31 ... 28 28 28]]
Available cores received.
[[20 26  0 ...  0  0  0]
 [35 35 35 ... 47 47 47]
 [39 39 39 ... 40 40 40]
 ...
 [33 33 33 ... 27 27 27]
 [38 38 38 ... 20 20 20]
 [31 31 31 ... 28 28 28]]
Cores speeds received.
[[30000000. 20000000.        0. ...        0.        0.        0.]
 [30000000. 30000000. 30000000. ... 20000000. 20000000. 20000000.]
 [30000000. 30000000. 30000000. ... 20000000. 20000000. 20000000.]
 ...
 [30000000. 30000000. 30000000. ... 20000000. 20000000. 20000000.]
 [30000000. 30000000. 30000000. ... 20000000. 20000000. 20000000.]
 [30000000. 30000000. 30000000. ... 20000000. 20000000. 20000000.]]
Job features received
[[8.00000000e+00 0.00000000e+00 1.33766837e+12 0.00000000e+00]]
WAIT
Best CPU decided
Site is submit

ConnectionError: Disconnected!