In [10]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [11]:
%cd /content/drive/MyDrive/distributed_app

/content/drive/MyDrive/distributed_app


In [12]:
import socket
import threading
import time
import sys

class Node:
    def __init__(self, node_type, host, port, peers):

        self.node_type = node_type
        self.host = host
        self.port = port
        self.peers = peers
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connections = []

    def start(self):

        listen_thread = threading.Thread(target=self.listen)
        listen_thread.start()

        time.sleep(2)

        self.connect_to_peers()

    def listen(self):
        self.socket.bind((self.host, self.port))
        self.socket.listen(5)
        # print(f"[{self.node_type.upper()}] Listening on {self.host}:{self.port}")

        while True:
            conn, addr = self.socket.accept()
            self.connections.append(conn)
            # print(f"[{self.node_type.upper()}] Accepted connection from {addr}")
            threading.Thread(target=self.receive_data, args=(conn,)).start()

    def connect_to_peers(self):
        """Connect to predefined peers."""
        for peer_type, (peer_host, peer_port) in self.peers.items():
            try:
                peer_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                peer_socket.connect((peer_host, peer_port))
                self.connections.append(peer_socket)
                #print(f"[{self.node_type.upper()}] Connected to {peer_type.upper()} node at {peer_host}:{peer_port}")
            except socket.error as e:
                print(f"[{self.node_type.upper()}] Failed to connect to {peer_type.upper()}. Error: {e}")

    def send_data(self, data):

        for conn in self.connections:
            try:
                conn.sendall(data.encode())
                print(f"[{self.node_type.upper()}] Sent data: {data}")
            except socket.error as e:
                print(f"[{self.node_type.upper()}] Error sending data: {e}")

    def receive_data(self, conn):
        """Receive data from a specific connection."""
        while True:
            try:
                data = conn.recv(1024).decode()
                if data:
                    print(f"[{self.node_type.upper()}] Received data: {data}")
                    self.process_instruction(data)
            except socket.error as e:
                print(f"[{self.node_type.upper()}] Error receiving data: {e}")
                break

    def process_instruction(self, instruction):
        print(f"[{self.node_type.upper()}] Processing instruction: {instruction}")

class NodeA(Node):
    def process_instruction(self, instruction):
        """Process 'a' type instructions."""
        if instruction.startswith("a"):
            print(f"[A] Processing: {instruction}")
            time.sleep(1)
            self.send_data(f"Completed {instruction} by A")
        else:
            print(f"[A] Ignored instruction: {instruction}")

class NodeB(Node):
    def process_instruction(self, instruction):
        """Process 'b' type instructions."""
        if instruction.startswith("b"):
            print(f"[B] Processing: {instruction}")
            time.sleep(1)
            self.send_data(f"Completed {instruction} by B")
        else:
            print(f"[B] Ignored instruction: {instruction}")

class NodeC(Node):
    def process_instruction(self, instruction):
        """Process 'c' type instructions."""
        if instruction.startswith("c"):
            print(f"[C] Processing: {instruction}")
            time.sleep(1)
            self.send_data(f"Completed {instruction} by C")
        else:
            print(f"[C] Ignored instruction: {instruction}")

def read_instructions_from_file(filename):
    """Read instructions from a file and return them as a list."""
    with open(filename, 'r') as file:
        instructions = file.read().strip().split()
    return instructions

def main():

    filename = "instructions.txt"  # The file containing instructions
    instructions = read_instructions_from_file(filename)
    print(f'This is the instructions {instructions}')

    # they are all on localhost but different ports
    node_a_host, node_a_port = '127.0.0.1', 5000
    node_b_host, node_b_port = '127.0.0.1', 5001
    node_c_host, node_c_port = '127.0.0.1', 5002


    node_a_peers = {'b': (node_b_host, node_b_port), 'c': (node_c_host, node_c_port)}
    node_b_peers = {'a': (node_a_host, node_a_port), 'c': (node_c_host, node_c_port)}
    node_c_peers = {'a': (node_a_host, node_a_port), 'b': (node_b_host, node_b_port)}


    node_a = NodeA('a', node_a_host, node_a_port, node_a_peers)
    node_b = NodeB('b', node_b_host, node_b_port, node_b_peers)
    node_c = NodeC('c', node_c_host, node_c_port, node_c_peers)


    threading.Thread(target=node_a.start).start()
    threading.Thread(target=node_b.start).start()
    threading.Thread(target=node_c.start).start()


    time.sleep(2)

    for instruction in instructions:
        if instruction.startswith("a"):
            node_a.process_instruction(instruction)
        elif instruction.startswith("b"):
            node_b.process_instruction(instruction)
        elif instruction.startswith("c"):
            node_c.process_instruction(instruction)
        time.sleep(0.5)

if __name__ == "__main__":
    main()

Exception in thread Thread-84 (listen):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
Exception in thread Thread-86 (listen):
Traceback (most recent call last):
Exception in thread Thread-88 (listen):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
        self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
    self._target(*self._args, **self._kwargs)
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
  File "<ipython-input-12-b63a352257c0>", line 26, in listen
  File "<ipython-input-12-b63a352257c0>", line 26, in listen
OSError: [Errno 98] Address already in use    self._target(*self._args, **self._kwargs)
  File "<ipython-input-12-

This is the instructions ['a1', 'a2', 'c1', 'b1', 'c2', 'a3', 'b2', 'b3', 'a4', 'c3', 'b4']
[B] Accepted connection from ('127.0.0.1', 34990)
[C] Accepted connection from ('127.0.0.1', 57330)
[A] Accepted connection from ('127.0.0.1', 41664)
[A] Processing: a1
[C] Accepted connection from ('127.0.0.1', 57338)
[B] Accepted connection from ('127.0.0.1', 34998)[A] Accepted connection from ('127.0.0.1', 41674)

[A] Sent data: Completed a1 by A
[A] Sent data: Completed a1 by A
[C] Received data: Completed a1 by A
[C] Ignored instruction: Completed a1 by A
[B] Received data: Completed a1 by A
[B] Ignored instruction: Completed a1 by A
[A] Processing: a2
[A] Sent data: Completed a2 by A
[A] Sent data: Completed a2 by A
[B] Received data: Completed a2 by A
[B] Ignored instruction: Completed a2 by A
[C] Received data: Completed a2 by A
[C] Ignored instruction: Completed a2 by A
[C] Processing: c1
[C] Sent data: Completed c1 by C
[C] Sent data: Completed c1 by C
[A] Received data: Completed c1 b