In [1]:
import socket
import threading
import json
import random
import time
import hashlib
import datetime

MESSAGE_SIZE = 1024
MAX_PEERS = 4
MAX_MSG_PER_PEER = 10
MSG_INTERVAL = 5
LIVENESS_CHECK = 13
OUTPUT_FILE = "output.txt"
seed_nodes = []

In [2]:
class peerNode:
    def __init__(self, p_host, p_port, config_data):
        self.p_host = p_host
        self.p_port = p_port
        self.p_address = (self.p_host, self.p_port)
        self.chosen_peers = set()
        self.chosen_seeds = []
        self.config_data = config_data
        self.msg_lst = {} # A dictionary of message hashes. As hashes are one to one functions, each hash corresponds to a single value
        self.msg_cnt = 0 
        self.timestamp = 0.0
#------------------------------------------------------------------------------------------
    # def start(self):
    #     threading.Thread(target=self.broadcast_msg).start()
    #     threading.Thread(target=self.liveness).start()
#------------------------------------------------------------------------------------------    
    def choose_nodes(self):
        n = len(seed_nodes)
        return (n // 2) + 1
#------------------------------------------------------------------------------------------    
    def connect_to_seeds(self):
        self.chosen_seeds = random.sample(seed_nodes, self.choose_nodes())
        for s_host, s_port in self.chosen_seeds:
            try:
                seed_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                seed_socket.connect((s_host, s_port))
                seed_socket.sendall(f"REGISTER {self.p_host} {self.p_port}".encode())
                # seed_socket.close()
                print(f"The peer connected to seed node at {s_host}:{s_port}")
            except Exception as e:
                print(f"Failed to connect to the seed node at {s_host}:{s_port}, {e}")
#------------------------------------------------------------------------------------------
    def get_peer_lists(self):
        connected_peers = set()
        for (s_host, s_port) in self.chosen_seeds:
            try:
                seed_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                seed_socket.connect((s_host, s_port))
                seed_socket.sendall("GET PEER LIST".encode())
                peer_list = seed_socket.recv(MESSAGE_SIZE).decode().split(",")
                print(f"The peer list sent by the seed is : {peer_list}")
                connected_peers = connected_peers.union(set(peer_list))
                # seed_socket.close()
                print(f"Connected peers to seed {s_host}:{s_port} are : {connected_peers}")
            except Exception as e:
                print(f"Failed to get the peer nodes from the seed at {s_host}:{s_port}, {e}")
        return connected_peers
# #------------------------------------------------------------------------------------------
    def connect_to_peers(self):
        connected_peers = self.get_peer_lists()
        connected_peers = list(connected_peers)
        chosen_peers = random.sample(connected_peers, min(len(connected_peers), MAX_PEERS))
        chosen_peers.remove(f'{self.p_host}:{self.p_port}')
        print(f"The chosen peers are : {chosen_peers}")
        for peer_details in chosen_peers:
            # print(peer_details)
            peer_details = peer_details.split(":")
            print(f"The peer details are : {peer_details}")
            peer_host, peer_port = peer_details[0], int(peer_details[1])
            try:
                peer_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                peer_socket.connect((peer_host, peer_port))
                self.chosen_peers.add((peer_host, peer_port))
                print(f"Connected to peer at {peer_host}:{peer_port}")
                threading.Thread(target=self.listen_peers, args=(peer_socket, )).start()
                # threading.Thread(target=self.broadcast_msg, args=(peer_socket, )).start()
            except Exception as e:
                print(f"Error connecting to peer {peer_host}:{peer_port}, {e}")
        else:
            pass

    def listen_peers(self, nbr_sock):
        try:
            while True:
                req = nbr_sock.recv(MESSAGE_SIZE)
                if not req:
                    continue
                if req[0] == "GOSSIP":
                    threading.Thread(target=self.broadcast_msg, args=(self, req[1]))
                elif req[0] == "LIVENESS_CHECK":
                    req = json.dumps(["LIVENESS_REPLY", datetime.now().time().strftime("%Y-%m-%d %H:%M:%S") , req[2], self.id])
                    nbr_sock.sendall(req.encode())
        except Exception as e:
            print(e)


# #------------------------------------------------------------------------------------------    
    def gossip_msg(self):
        self.timestamp = float(time.time())
        return f"{self.timestamp}:{self.p_host}:{self.msg_cnt+1}"
# #------------------------------------------------------------------------------------------
    def broadcast_msg(self, msg):
        # if self.msg_cnt < MAX_MSG_PER_PEER: # A node stops after it has generated 10 messages
            # message = self.gossip_msg()
            message_hash = hashlib.sha256(msg.encode()).hexdigest()
            if message_hash in self.msg_lst:
                return
            if message_hash not in self.msg_lst:
                self.msg_lst[message_hash] = set() # we have created a set as a particular node can send the message to the connected node at most once
            for (frnd_host, frnd_port) in self.chosen_peers:
                if self.msg_cnt < MAX_MSG_PER_PEER:
                    try:
                        frnd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        frnd_socket.connect((frnd_host, frnd_port))
                        frnd_socket.sendall(msg.encode())
                        self.msg_cnt += 1
                        # After the node has broadcated a message it needs to stop for 5 seconds to broadcst the next message.
                        time.sleep(MSG_INTERVAL)
                        self.msg_lst[message_hash].add((frnd_host, frnd_port))
                        print(f"Message broadcasted to {frnd_host}:{frnd_port}")
                        with open(OUTPUT_FILE, "a") as file:
                            file.write(f"Broadcasted message to {frnd_host}:{frnd_port}: {msg}\n")
                    except Exception as e:
                        print(f"Failed to broadcast message to {frnd_host}:{frnd_port}")
                        with open(OUTPUT_FILE, "a") as file:
                            file.write(f"Failed to broadcast message to {frnd_host}:{frnd_port}: {msg}\n")

            
            
# #------------------------------------------------------------------------------------------
    def liveness(self):
        consec_fails = 0
        while consec_fails<3:
            for (frnd_host, frnd_port) in self.chosen_peers:
                try:
                    frnd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    frnd_socket.connect((frnd_host, frnd_port))
                    req = json.dumps(["LIVENESS_CHECK", datetime.now().time().strftime("%Y-%m-%d %H:%M:%S") ,self.id])
                    frnd_socket.sendall(req.encode())
                    resp = frnd_socket.recv(MESSAGE_SIZE).decode()
                    if resp[0] == "LIVENESS_REPLY":
                        consec_fails = 0 # reset the number of fails
                        print(f"Peer {frnd_host}:{frnd_port}")
                    # frnd_socket.close()
                except Exception as e:
                    consec_fails+=1
                    if consec_fails >= 3:
                        self.notify_seed(frnd_host, frnd_port)
                        self.chosen_peers.remove((frnd_host, frnd_port))
                        print(f"Peer {frnd_host}:{frnd_port} is DEAD...")
            time.sleep(LIVENESS_CHECK) # Wait for 13 seconds to check the liveness of the next Node.
# #------------------------------------------------------------------------------------------
#     def notify_seed(self, peer_host, peer_port):
#         for (s_host, s_port) in self.chosen_seeds:
#             try:
#                 seed_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#                 seed_sock.connect((s_host, s_port))
#                 print(f"The peer at {peer_host}:{peer_port} is dead!")
#                 seed_sock.sendall("DEAD NODE".encode())
#                 confirmation = seed_sock.recv(MESSAGE_SIZE).decode()
#                 print(f"Sent dead node message to seed node {s_host}:{s_port}")
#                 if confirmation == "REMOVED":
#                     print("The seed confirms that the dead node has been removed!")
#             except Exception as e:
#                 print(f"Error notifying seed node {peer_host}:{peer_port} about dead node, {e}")

In [3]:
def main():
    with open('./config_file.json') as config_file:
        config_data = json.load(config_file)
    global seed_nodes

    seed_addresses = config_data["Seed_addresses"]
    for seed_info in seed_addresses:
        host = seed_info.get("Host")
        port = seed_info.get("Port")
        seed_nodes.append((host, port))


    peer = peerNode("127.0.0.1", 54380, config_data)
    peer.connect_to_seeds()
    peer.get_peer_lists()
    peer.connect_to_peers()
    # peer.start()

In [4]:
if __name__ == "__main__":
    main()

The peer connected to seed node at 127.0.0.1:12345
The peer connected to seed node at 127.0.0.1:12348
The peer connected to seed node at 127.0.0.1:12346
The peer list sent by the seed is : ['127.0.0.1:54380']
Connected peers to seed 127.0.0.1:12345 are : {'127.0.0.1:54380'}
The peer list sent by the seed is : ['127.0.0.1:54380']
Connected peers to seed 127.0.0.1:12348 are : {'127.0.0.1:54380'}
The peer list sent by the seed is : ['127.0.0.1:54380']
Connected peers to seed 127.0.0.1:12346 are : {'127.0.0.1:54380'}
The peer list sent by the seed is : ['127.0.0.1:54380']
Connected peers to seed 127.0.0.1:12345 are : {'127.0.0.1:54380'}
The peer list sent by the seed is : ['127.0.0.1:54380']
Connected peers to seed 127.0.0.1:12348 are : {'127.0.0.1:54380'}
The peer list sent by the seed is : ['127.0.0.1:54380']
Connected peers to seed 127.0.0.1:12346 are : {'127.0.0.1:54380'}
The chosen peers are : []
