In [None]:
import socket
import json
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.server import SimpleXMLRPCRequestHandler
# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
    rpc_paths = ('/RPC2',)
from multiprocessing.managers import BaseManager
from multiprocessing import Process

In [2]:
class LamportHandler:
    def __init__(self, nodes, id):
        self.nodes = [Node(ip, port) for ip, port in nodes.items()]
        self.requests = []
        self.id = id
        self.reply_table = {}
        self.reset_reply_table()
        self.values = {
            "x": 0,
            "y": 0
        }
        self.time = 0
        
    def reset_reply_table(self):
        for node in self.nodes:
            node_id = node.ip.split('.')[3]
            self.reply_table[node_id] = 0

    def request_mutual_exclusion(self, req):
        self.increment_timer()
        req["time"] = self.time
        req["sender"] = self.id
        self.requests.append(req)
        print(self.requests)
        self.broadcast(req)

    def broadcast(self, msg):
        for node in self.nodes:
            node.send_message(json.dumps(msg))

    def external_request(self, req):
        time = req.get("time")
        time = max(time, TIME) + 1
        id = req["sender"]
        node = get_node(id)
        
        if not node:
            print(f"Invalid node {id}")
        
        self.requests.append(req)
        msg = {"reply":"reply", "id": self.id}
        node.send_message(json.dumps(msg))


    def get_node(self, id):
        for node in self.nodes:
            if node.id == id:
                return node
        
        return None

    def reply(self, id):
        self.reply_table[id] = 1
        print(self.reply_table)
        self.check_critic_section()
    
    def check_critic_section(self):
        for id, reply in self.reply_table.items():
            if reply == 0:
                return
        
        if self.requests[0]["sender"] == self.id:
            self.critic_section()
            self.release()

    def critic_section(self):
        print("In the critic section")
        request = self.requests.pop(0)
        action = request["action"].upper()
        value = int(request["value"])
        target = request["target"]

        print(f"Applying {action} to {target} value {value}")

        if (action == "ADD"):
            response = self.add(target, value)
        elif (action == "MULTIPLY"):
            response = self.multiply(target, value)


    def release_request(self, release):
        id = release["id"]

        if id == self.requests[0]["sender"]:
            self.critic_section()
        
        self.check_critic_section()
    
    def release(self):
        msg = {"release":"release", "id": self.id}
        self.broadcast(json.dumps(msg))
        self.reset_reply_table()

    def increment_timer(self):
        self.time += 1

    def add(self, target, value):
        self.values[target.lower()] = self.values[target.lower()] + value

    def multiply(self, target, value):
        self.values[target.lower()] = self.values[target.lower()] * value
    
    def get(self, target):
        return self.values[target.lower()]


In [3]:
class Node:
    def __init__(self, ip, port):
        self.id = ip.split('.')[3]
        self.ip = ip
        self.port = port
    
    def send_message(self, msg):
        print(f"Sending message to {self.id} -> {msg}")
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect((self.ip, self.port))
        client.send((msg+'\n').encode('utf-8'))
        client.close()

In [4]:
def start_rpc_server(lamport_handler):
    with SimpleXMLRPCServer(("192.168.100.241", 8000), requestHandler=RequestHandler) as server:
        server.register_introspection_functions()

        class MyFuncs:
            def ping(self):
                return True
        
            def add(self, target, value):
                payload = {"target": target, "value": value, "action": "ADD"}
                lamport_handler.request_mutual_exclusion(payload)

            def multiply(self, target, value):
                payload = {"target": target, "value": value, "action": "MULTIPLY"}
                lamport_handler.request_mutual_exclusion(payload)

            def get(self, target):
                return lamport_handler.get(target)

        server.register_instance(MyFuncs())

        print("Starting XML RPC Server")

        # Run the server's main loop
        server.serve_forever()


In [5]:

def start_socket(lamport_handler):
    HOST, PORT = "192.168.100.241", 9002
    print("Starting socket")
    try:
        soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        soc.bind((HOST, PORT))
        soc.listen(5)
        while True:
            conn, addr = soc.accept()
            print ("Got connection from", addr)
            msg = json.loads(conn.recv(1024).decode('utf-8'))
            
            action = msg.get("action", None)
            if action:
                lamport_handler.external_request(msg)
            
            reply = msg.get("reply")
            if reply:
                id = msg.get("id")
                lamport_handler.reply(id)
            
            release = msg.get("release", None)
            if release:
                lamport_handler.release_request(msg)

    except KeyboardInterrupt:
        print('Exiting')

In [None]:
def LamportHandlerProxy():
    nodes = {"192.168.100.241": 8002}
    return LamportHandler(nodes, "210")

def start_server():

    BaseManager.register('LamportHandler', LamportHandlerProxy)         
    print('test_1')
    Manager = BaseManager()
    print('test_2')
    Manager.start()
    print('test_3')
    lamport_handler = Manager.LamportHandler()
    print('test')

    try:
        rpc_server = Process(target=start_rpc_server, args=(lamport_handler,))
        socket_process = Process(target=start_socket, args=(lamport_handler,))
        rpc_server.start()
        print("test")
        socket_process.start()
        rpc_server.join()
        socket_process.join()
    except KeyboardInterrupt:
        print('Exiting')
if __name__ == '__main__': 
    start_server()

test_1
test_2
