In [1]:
import time
import socket
from queue import Queue 
from threading import Thread, Condition
from ipywidgets import widgets
from IPython.display import display
import random

In [2]:
class EXP_Output:
    def __init__(self):
        #self.out = widgets.Output(layout={'border': '1px solid black'})
        #display(self.out)
        #with self.out:
        #print("Experiment outputs")

        self.Queue_Print = Queue()
        self.thread = Thread(target=self.loop_Print)
        self.thread.start()

    def print(self, msg):
        text_out = time.strftime("%H:%M:%S", time.localtime()) + "> "
        self.Queue_Print.put(text_out+msg)

    def loop_Print(self):
        while True:
            msg = self.Queue_Print.get()
            #with self.out:
            print(msg)
            self.Queue_Print.task_done()

In [3]:
class ALE_TextInput:
    
    def __init__(self, exp_out):
        
        self.Queue_User = Queue()
        self.exp_out = exp_out

        # Create a text input widget
        self.Text = widgets.Text(
                        value='',
                        placeholder='Type something and press enter',
                        description='To send:',
                        disabled=False
                    )

        # create a button object
        self.Botton =  widgets.Button(
                            description='Submit',
                            disabled=False,
                            button_style='', 
                            tooltip='Click me to submit a string',
                        )

        # Define a function to handle the click action on the button
        def on_submit_button_clicked(b):

            global thread_running
            
            msg = self.Text.value
            #self.exp_out.print("ALE_0: received input: "+str(msg))
                
            if thread_running == True:
                if msg == "end":
                    thread_running = False
                
                for i in range(20):
                    self.Queue_User.put(msg+str(i))

            self.Text.value = ''  # Clear the input field after submission

        # Attach the event to the button
        self.Botton.on_click(on_submit_button_clicked)
        
        # Display the widgets
        display(self.Text, self.Botton)
        
    # to get a message from user queue. this function can block the thread
    def get(self):
        return self.Queue_User.get()

In [4]:
class ALE_TR:
    
    def __init__(self, name, upper_Tx, lower_TR, exp_out):
        
        self.Name     = name
        self.Upper_Tx = upper_Tx  # must provide get()
        self.Lower_TR = lower_TR  # must provide send(msg), and receive()
        self.exp_out  = exp_out
        
    def loop_Tx(self):

        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Tx starting")
    
        while (thread_running == True):

            c = c + 1

            # get text from Upper_Tx, which must provide a get method
            # this thread is blocked here
            msg = self.Upper_Tx.get()
            
            text_out = self.Name + " Tx: message " + str(c) + ": " + str(msg)
            self.exp_out.print(text_out)

            # add the text to the queue
            self.Lower_TR.send(msg)
            
    def loop_Rx(self):
        
        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Rx starting")
    
        while (thread_running == True):

            c = c + 1

            # get message from a lower layer
            # this thread is blocked here
            msg = self.Lower_TR.receive()
            
            text_out = self.Name + " Rx: message " + str(c) + ": " + str(msg)
            self.exp_out.print(text_out)

In [5]:
# state of DLE
STATE_READY_TO_SEND = 0 # ready to send a packet to the DLE entity in another node through lower layer
STATE_WAITING_ACK   = 1 # waiting for ACK from the DLE entity in another node

# events of DLE
EVENT_UPPER_TX  = 0 # upper layer wants to send a packet
EVENT_LOWER_DAT = 1 # lower layer forwards an incoming data packet
EVENT_LOWER_ACK = 2 # lower layer forwards an incoming data packet
EVENT_TIMEOUT   = 3 # timer timeout
        
class DLE_TR_FSM:
    
    def __init__(self, name, lower_TR, exp_out):
        
        self.Name      = name
        self.Lower_TR  = lower_TR   # must provide send(msg) and receive()
        self.Queue_Tx  = Queue()
        self.Queue_Rx  = Queue()
        self.exp_out   = exp_out
        
        # create a queue for Finite State Machine
        self.Queue_FSM = Queue()
        
        # create a flag = condition to start sending
        self.cv_TxOp = Condition()
        self.TxOp    = True

        # init state to STATE_WAITING_UPPER
        self.State = STATE_READY_TO_SEND
        
        self.Procedure = [[self.FSM_upper_Tx, self.FSM_abnormal],
                          [self.FSM_lower_Rx, self.FSM_lower_Rx],
                          [self.FSM_abnormal, self.FSM_lower_Rx_ack],
                          [self.FSM_abnormal, self.FSM_timeout]] 
        

        # new features for retransmission
        # timer
        self.cv_Timer      = Condition()
        self.timer_active  = False
        self.timer_counter = 0
        
        # transmission 
        self.tx_buffer  = ''
        self.tx_seq_num = '0'
        
        # reception
        self.tx_ack_num = '0'
        
        
    def loop_Tx(self):

        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Tx starting")
    
        while (thread_running == True):

            # waiting for the ready to sent signal
            with self.cv_TxOp: 
                while (self.TxOp != True): 
                    self.cv_TxOp.wait()
            
                self.TxOp = False
                
                c = c + 1

                # get a message from queue
                msg = self.Queue_Tx.get()
                text_out = self.Name + " Tx: message " + str(c) + ": " + str(msg)
                self.exp_out.print(text_out)
            
                self.event_add(EVENT_UPPER_TX, msg) 

    def loop_Rx(self):
        
        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Rx starting")
    
        while (thread_running == True):

            c = c + 1

            # get message from a lower layer
            # this thread is blocked here
            msg = self.Lower_TR.receive()
            
            text_out = self.Name + " Rx: message " + str(c) + ": " + str(msg)
            self.exp_out.print(text_out)
            
            # add an event for FSM, assuming that msg is a sequence of bytes
            msg_type = msg[0]
            if (msg_type == '0'):
                self.event_add(EVENT_LOWER_DAT, msg[1:])
            elif (msg_type == '1'):
                self.event_add(EVENT_LOWER_ACK, msg[1:])
            else:
                text_out = self.Name + " Rx: message type unknown " + msg_type
                self.exp_out.print(text_out)

    def receive(self):
        return self.Queue_Rx.get()
    
    def send(self, msg):
        self.Queue_Tx.put(msg)
        
    def event_add(self, ev_type, msg):
        
        # preparing an event
        if (isinstance(msg, str)):
            msg = msg.encode()
        
        # the event is a sequence of bytes
        event = ev_type.to_bytes(1, "big")+msg
        
        # add event to queue
        self.Queue_FSM.put(event)        
        

    def loop_FSM(self):
        
        global thread_running
        self.exp_out.print(self.Name + ": loop_FSM starting")
        
        while (thread_running == True):
            
            # get the next event
            event = self.Queue_FSM.get()
            
            ev_type = event[0]
            
            text_out = self.Name + " FSM: state: " + str(self.State)+" event type: " + str(ev_type)
            self.exp_out.print(text_out)
            
            # process the event
            msg = event[1:].decode('utf-8') # message becomes a string
            self.Procedure[ev_type][self.State](msg)
        
    def FSM_abnormal(self, msg):
        
        text_out = self.Name + " FSM: error! " + msg
        self.exp_out.print(text_out)
    
    def FSM_upper_Tx(self, msg):   
        
        text_out = self.Name + " FSM: to send: " + msg
        self.exp_out.print(text_out)

        # buffer the message (for retransmission)
        self.tx_buffer = msg
        
        # prepare to send a data message
        msg = '0' + self.tx_seq_num + msg     # inicating a new packet and its sequence number
        self.Lower_TR.send(msg)

        # waiting for ACK
        self.State = STATE_WAITING_ACK
        
        # start timer
        with self.cv_Timer:
            self.timer_active  = True
            self.timer_counter = 10
            self.cv_Timer.notify()    
    
    def FSM_lower_Rx(self, msg):

        # get the sequence number
        seq = msg[0]
        msg = msg[1:]
        
        # received a new data packet        
        text_out = self.Name + " FSM: received: " + msg
        self.exp_out.print(text_out)

        if (seq == self.tx_ack_num):
            
            # put the message in a receiving queue
            self.Queue_Rx.put(msg)
            
            self.tx_ack_num = chr(ord(self.tx_ack_num) + 1)
            if (self.tx_ack_num == chr(ord('9') + 1)):
                self.tx_ack_num = '0'
        
        else:
            text_out  = self.Name + " FSM: received received frame " + seq
            text_out += " but expected " + self.tx_ack_num
            self.exp_out.print(text_out)
            

        # to send an ACK
        self.Lower_TR.send('1'+seq)

                
    def FSM_lower_Rx_ack(self, msg):
        
        ack = msg[0] # get the ack number
        if (ack != self.tx_seq_num):
            
            # the received ack does not match the seq
            text_out = "ACK # " + ack + " does not match local seq # " + self.tx_seq_num
            self.exp_out.print(text_out)
            return

        # stop timer
        self.timer_active = False
        
        # inc the seq
        self.tx_seq_num = chr(ord(self.tx_seq_num) + 1)
        if (self.tx_seq_num == chr(ord('9') + 1)):
                self.tx_seq_num = '0'
        
        # ready to send the next upper layer packet
        with self.cv_TxOp:
            self.State = STATE_READY_TO_SEND
            self.TxOp  = True
            self.cv_TxOp.notify()
            
    def FSM_timeout(self, msg):
        
        text_out = self.Name + " FSM: to resend frame " + self.tx_seq_num + " " + self.tx_buffer
        self.exp_out.print(text_out)
        
        # prepare to send a data message
        msg = '0' + self.tx_seq_num + self.tx_buffer     # inicating a new packet and its sequence number
        self.Lower_TR.send(msg)
        
        # start timer
        with self.cv_Timer:
            self.timer_active  = True
            self.timer_counter = 10
            self.cv_Timer.notify()

    def loop_timer(self):
        
        global thread_running
        self.exp_out.print(self.Name + ": loop_Timer starting")
        
        while (thread_running == True):
            
            with self.cv_Timer: 
                while (self.timer_active == False): 
                    self.cv_Timer.wait() 
            
                time.sleep(0.5)

                if (self.timer_counter == 0):
                    # add an event for timeout
                    self.event_add(EVENT_TIMEOUT, 'x')
                    self.timer_active = False

                else:
                    self.timer_counter = self.timer_counter - 1

In [6]:
class PLE_TR:

    def __init__(self, name, Socket, AP_Tx, AP_Rx, exp_out):
        
        self.Name      = name
        self.Socket    = Socket
        self.AP_Tx     = AP_Tx
        self.AP_Rx     = AP_Rx
        self.Queue_Tx  = Queue()
        self.Queue_Rx  = Queue()
        self.exp_out   = exp_out
        
    def loop_Tx(self):
        
        global thread_running
        self.exp_out.print(self.Name + ": loop_Tx starting")
    
        while (thread_running == True):
        
            # get a message from queue
            msg = self.Queue_Tx.get()
            
            text_out = self.Name + " Tx: message: " + str(msg)
            self.exp_out.print(text_out)
            
            # sending the message using socket
            msg_bytes = str.encode(msg)
            if (random.random() < 0.1):
                self.exp_out.print("packet loss")

            elif (random.random() < 0.05):
                self.Socket.sendto(msg_bytes, self.AP_Tx)
                self.Socket.sendto(msg_bytes, self.AP_Tx)
                self.exp_out.print("Transmission Duplicated")
            else:
                # sending the message using socket
                self.Socket.sendto(msg_bytes, self.AP_Tx)

    def loop_Rx(self):
        
        global thread_running
        global bufferSize
        self.exp_out.print(self.Name + ": loop_Rx starting")
    
        # binding the socket with the IP and port
        self.Socket.bind(self.AP_Rx)
        
        while (thread_running == True):
        
            # get a message from socket, this thread is blocked here
            msg_addr = self.Socket.recvfrom(bufferSize)

            # Random delay ranging from 0 - 2 seconds.
            delay = random.uniform(0, 2)
            time.sleep(delay)
    
            msg  = msg_addr[0].decode('utf-8')
            addr = msg_addr[1]
            
            text_out = self.Name + " Rx: from " + str(addr) + ": " + str(msg)
            self.exp_out.print(text_out)
            
            self.Queue_Rx.put(msg)
            
    def send(self, msg):
        self.Queue_Tx.put(msg)
        
    def receive(self):
        return self.Queue_Rx.get()


In [7]:
class MLE_TR:
    def __init__(self, lower_layer, experiment_id=1):
        self.lower_layer = lower_layer
        self.experiment_id = experiment_id
        self.sent_messages = {}
        self.received_messages = {}
        
        self.button = widgets.Button(description="Run Measurment Experiment")
        self.button.on_click(self.send_messages_callback)
        display(self.button)

    def send_messages_callback(self, b):
        """
        Callback function triggered by the button click.
        Sends 100 messages with an unique ID and timesamp
        """

        for msg_id in range(1, 101):
            timestamp = time.time_ns()
            packet = f"{self.experiment_id},{msg_id},{timestamp}" # Construct the package format
            self.sent_messages[msg_id] = timestamp
            self.lower_layer.send(packet)
            print(f"Sent: {packet}")

    def loop_Rx(self):
        """
        Loop that continously receives messages from the lower layer
        For each received packet, it:
            - Parses the packet to extract the data
            - Calculates the delay between when the package was sent and received
            - Tracks duplicates received packets

        Once 100 unique messages are received, it calcules:
            - The average delay
            - The loss rate (if missing, 20% chance of that happening)
            - Duplicate rate, if any
        """
        delays = []
        total_expected = 100

        while True:
            packet = self.lower_layer.receive()
            try:
                experiment_id, message_id, sent_time = packet.split(',')
                message_id = int(message_id)
                sent_time = int(sent_time)
            except Exception as e:
                print(f"Error - {e} - parsing packet: ", packet)
                continue

            # Compute the delay in milliseconds using the current timestamp
            current_time = time.time_ns()
            delay_ms = (current_time - sent_time) / 1e6
            delays.append(delay_ms)
            
            # Count how many times this message ID is received (for duplicate detection)
            self.received_messages[message_id] = self.received_messages.get(message_id, 0) + 1
            print(f"Received: {packet}, Delay: {delay_ms:.2f} ms")
            
            # Optionally break out of the loop when all 100 unique messages have been received.
            if len(self.received_messages) == total_expected:
                break

        # Calculate average delay across all received messages
        avg_delay = sum(delays) / len(delays) if delays else 0
        
        # Compute loss rate based on missing unique message IDs
        loss_rate = (total_expected - len(self.received_messages)) / total_expected
        
        # Compute duplicate rate by summing up extra receptions beyond the first for each message
        duplicate_count = sum(count - 1 for count in self.received_messages.values() if count > 1)
        duplicate_rate = duplicate_count / total_expected
        
        print(f"Average Delay: {avg_delay:.2f} ms")
        print(f"Loss Rate: {loss_rate * 100:.2f}%")
        print(f"Duplicate Rate: {duplicate_rate * 100:.2f}%")

In [8]:
thread_running = False
bufferSize = 1024

# (0) creating an output object
exp_out = EXP_Output()

# (1) create physical layer entities
AP_local_1  = ("127.0.0.1", 30000)
AP_remote_1 = ("127.0.0.1", 31111)
Socket_1    = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
PLE_1       = PLE_TR("PLE_Alice", Socket_1, AP_remote_1, AP_local_1, exp_out)

# (2) create date link layer entities
DLE_1 = DLE_TR_FSM("DLE_Alice", PLE_1, exp_out)

# (3) create application layer entities
ALE_0 = ALE_TextInput(exp_out)
ALE_1 = ALE_TR("ALE_Alice", ALE_0, DLE_1, exp_out)

Text(value='', description='To send:', placeholder='Type something and press enter')

Button(description='Submit', style=ButtonStyle(), tooltip='Click me to submit a string')

In [9]:
# (4) create physical layer entities
AP_local_2  = ("127.0.0.1", 31111)
AP_remote_2 = ("127.0.0.1", 30000)
Socket_2    = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
PLE_2       = PLE_TR("PLE_Bob", Socket_2, AP_remote_2, AP_local_2, exp_out)

# (5) create date link layer entities
DLE_2 = DLE_TR_FSM("DLE_Bob", PLE_2, exp_out)

# (6) create application layer entities
ALE_3 = ALE_TextInput(exp_out)
ALE_2 = ALE_TR("ALE_Bob", ALE_3, DLE_2, exp_out)

Text(value='', description='To send:', placeholder='Type something and press enter')

Button(description='Submit', style=ButtonStyle(), tooltip='Click me to submit a string')

In [None]:
# (7) create measurement layer entities
MLE_1 = MLE_TR(DLE_1, experiment_id=1)

Button(description='Run Measurment Experiment', style=ButtonStyle())

Sent: 1,1,1742843977193503000
Sent: 1,2,1742843977193555000
Sent: 1,3,1742843977193564000
Sent: 1,4,1742843977193570000
Sent: 1,5,1742843977193574000
Sent: 1,6,1742843977193579000
Sent: 1,7,1742843977193583000
Sent: 1,8,1742843977193588000
Sent: 1,9,1742843977193592000
Sent: 1,10,1742843977193598000
Sent: 1,11,1742843977193602000
Sent: 1,12,1742843977193607000
Sent: 1,13,1742843977193611000
Sent: 1,14,1742843977193615000
Sent: 1,15,1742843977193619000
Sent: 1,16,1742843977193624000
Sent: 1,17,1742843977193628000
Sent: 1,18,1742843977193632000
Sent: 1,19,1742843977193635000
Sent: 1,20,1742843977193640000
Sent: 1,21,1742843977193644000
Sent: 1,22,1742843977193648000
Sent: 1,23,1742843977193653000
Sent: 1,24,1742843977193657000
Sent: 1,25,1742843977193661000
Sent: 1,26,1742843977193665000
Sent: 1,27,1742843977193669000
Sent: 1,28,1742843977193673000
Sent: 1,29,1742843977193677000
Sent: 1,30,1742843977193681000
Sent: 1,31,1742843977193686000
Sent: 1,32,1742843977193690000
Sent: 1,33,174284

In [10]:
# start the loops of all entities
# all loops must be blocked at a certain position

t1_1 = Thread(target = ALE_1.loop_Tx, args = ()) 
t2_1 = Thread(target = ALE_1.loop_Rx, args = ()) 
t3_1 = Thread(target = DLE_1.loop_Tx, args = ())
t4_1 = Thread(target = DLE_1.loop_Rx, args = ())
t5_1 = Thread(target = PLE_1.loop_Tx, args = ()) 
t6_1 = Thread(target = PLE_1.loop_Rx, args = ())
f1_1 = Thread(target = DLE_1.loop_FSM, args = ())
f2_1 = Thread(target = DLE_1.loop_timer, args = ())


t1_2 = Thread(target = ALE_2.loop_Tx, args = ()) 
t2_2 = Thread(target = ALE_2.loop_Rx, args = ()) 
t3_2 = Thread(target = DLE_2.loop_Tx, args = ())
t4_2 = Thread(target = DLE_2.loop_Rx, args = ())
t5_2 = Thread(target = PLE_2.loop_Tx, args = ()) 
t6_2 = Thread(target = PLE_2.loop_Rx, args = ()) 
f1_2 = Thread(target = DLE_2.loop_FSM, args = ())
f2_2 = Thread(target = DLE_2.loop_timer, args = ())

thread_running = True

t1_1.start()
t2_1.start()
t3_1.start()
t4_1.start()
t5_1.start()
t6_1.start()
t1_2.start()
t2_2.start()
t3_2.start()
t4_2.start()
t5_2.start()
t6_2.start()
f1_1.start()
f1_2.start()
f2_1.start()
f2_2.start()

15:16:07> ALE_Alice: loop_Tx starting
15:16:07> ALE_Alice: loop_Rx starting
15:16:07> DLE_Alice: loop_Tx starting
15:16:07> DLE_Alice: loop_Rx starting
15:16:07> PLE_Alice: loop_Tx starting
15:16:07> PLE_Alice: loop_Rx starting
15:16:07> ALE_Bob: loop_Tx starting
15:16:07> ALE_Bob: loop_Rx starting
15:16:07> DLE_Bob: loop_Tx starting
15:16:07> DLE_Bob: loop_Rx starting
15:16:07> PLE_Bob: loop_Tx starting
15:16:07> PLE_Bob: loop_Rx starting
15:16:07> DLE_Alice: loop_FSM starting
15:16:07> DLE_Bob: loop_FSM starting
15:16:07> DLE_Alice: loop_Timer starting
15:16:07> DLE_Bob: loop_Timer starting
15:19:37> DLE_Alice Tx: message 1: 1,1,1742843977193503000
15:19:37> DLE_Alice FSM: state: 0 event type: 0
15:19:37> DLE_Alice FSM: to send: 1,1,1742843977193503000
15:19:37> PLE_Alice Tx: message: 001,1,1742843977193503000
15:19:38> PLE_Bob Rx: from ('127.0.0.1', 30000): 001,1,1742843977193503000
15:19:38> DLE_Bob Rx: message 1: 001,1,1742843977193503000
15:19:38> DLE_Bob FSM: state: 0 event type