In [1]:
import time
import socket
from queue import Queue 
from threading import Thread, Condition
from ipywidgets import widgets
from IPython.display import display
import random

In [2]:
class EXP_Output:
    def __init__(self):
        #self.out = widgets.Output(layout={'border': '1px solid black'})
        #display(self.out)
        #with self.out:
        #print("Experiment outputs")

        self.Queue_Print = Queue()
        self.thread = Thread(target=self.loop_Print)
        self.thread.start()

    def print(self, msg):
        text_out = time.strftime("%H:%M:%S", time.localtime()) + "> "
        self.Queue_Print.put(text_out+msg)

    def loop_Print(self):
        while True:
            msg = self.Queue_Print.get()
            #with self.out:
            print(msg)
            self.Queue_Print.task_done()

In [3]:
# state of DLE
STATE_READY_TO_SEND = 0 # ready to send a packet to the DLE entity in another node through lower layer
STATE_WAITING_ACK   = 1 # waiting for ACK from the DLE entity in another node

# events of DLE
EVENT_UPPER_TX  = 0 # upper layer wants to send a packet
EVENT_LOWER_DAT = 1 # lower layer forwards an incoming data packet
EVENT_LOWER_ACK = 2 # lower layer forwards an incoming data packet
EVENT_TIMEOUT   = 3 # timer timeout
        
class DLE_TR_FSM:
    
    def __init__(self, name, lower_TR, exp_out):
        
        self.Name      = name
        self.Lower_TR  = lower_TR   # must provide send(msg) and receive()
        self.Queue_Tx  = Queue()
        self.Queue_Rx  = Queue()
        self.exp_out   = exp_out
        
        # create a queue for Finite State Machine
        self.Queue_FSM = Queue()
        
        # create a flag = condition to start sending
        self.cv_TxOp = Condition()
        self.TxOp    = True

        # init state to STATE_WAITING_UPPER
        self.State = STATE_READY_TO_SEND
        
        self.Procedure = [[self.FSM_upper_Tx, self.FSM_abnormal],
                          [self.FSM_lower_Rx, self.FSM_lower_Rx],
                          [self.FSM_abnormal, self.FSM_lower_Rx_ack],
                          [self.FSM_abnormal, self.FSM_timeout]] 
        

        # new features for retransmission
        # timer
        self.cv_Timer      = Condition()
        self.timer_active  = False
        self.timer_counter = 0
        
        # transmission 
        self.tx_buffer  = ''
        self.tx_seq_num = '0'
        
        # reception
        self.tx_ack_num = '0'
        
        
    def loop_Tx(self):

        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Tx starting")
    
        while (thread_running == True):

            # waiting for the ready to sent signal
            with self.cv_TxOp: 
                while (self.TxOp != True): 
                    self.cv_TxOp.wait()
            
                self.TxOp = False
                
                c = c + 1

                # get a message from queue
                msg = self.Queue_Tx.get()
                text_out = self.Name + " Tx: message " + str(c) + ": " + str(msg)
                self.exp_out.print(text_out)
            
                self.event_add(EVENT_UPPER_TX, msg) 

    def loop_Rx(self):
        
        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Rx starting")
    
        while (thread_running == True):

            c = c + 1

            # get message from a lower layer
            # this thread is blocked here
            msg = self.Lower_TR.receive()
            
            text_out = self.Name + " Rx: message " + str(c) + ": " + str(msg)
            self.exp_out.print(text_out)
            
            # add an event for FSM, assuming that msg is a sequence of bytes
            msg_type = msg[0]
            if (msg_type == '0'):
                self.event_add(EVENT_LOWER_DAT, msg[1:])
            elif (msg_type == '1'):
                self.event_add(EVENT_LOWER_ACK, msg[1:])
            else:
                text_out = self.Name + " Rx: message type unknown " + msg_type
                self.exp_out.print(text_out)

    def receive(self):
        return self.Queue_Rx.get()
    
    def send(self, msg):
        self.Queue_Tx.put(msg)
        
    def event_add(self, ev_type, msg):
        
        # preparing an event
        if (isinstance(msg, str)):
            msg = msg.encode()
        
        # the event is a sequence of bytes
        event = ev_type.to_bytes(1, "big")+msg
        
        # add event to queue
        self.Queue_FSM.put(event)        
        

    def loop_FSM(self):
        
        global thread_running
        self.exp_out.print(self.Name + ": loop_FSM starting")
        
        while (thread_running == True):
            
            # get the next event
            event = self.Queue_FSM.get()
            
            ev_type = event[0]
            
            text_out = self.Name + " FSM: state: " + str(self.State)+" event type: " + str(ev_type)
            self.exp_out.print(text_out)
            
            # process the event
            msg = event[1:].decode('utf-8') # message becomes a string
            self.Procedure[ev_type][self.State](msg)
        
    def FSM_abnormal(self, msg):
        
        text_out = self.Name + " FSM: error! " + msg
        self.exp_out.print(text_out)
    
    def FSM_upper_Tx(self, msg):   
        
        text_out = self.Name + " FSM: to send: " + msg
        self.exp_out.print(text_out)

        # buffer the message (for retransmission)
        self.tx_buffer = msg
        
        # prepare to send a data message
        msg = '0' + self.tx_seq_num + msg     # inicating a new packet and its sequence number
        self.Lower_TR.send(msg)

        # waiting for ACK
        self.State = STATE_WAITING_ACK
        
        # start timer
        with self.cv_Timer:
            self.timer_active  = True
            self.timer_counter = 10
            self.cv_Timer.notify()    
    
    def FSM_lower_Rx(self, msg):

        # get the sequence number
        seq = msg[0]
        msg = msg[1:]
        
        # received a new data packet        
        text_out = self.Name + " FSM: received: " + msg
        self.exp_out.print(text_out)

        if (seq == self.tx_ack_num):
            
            # put the message in a receiving queue
            self.Queue_Rx.put(msg)
            
            self.tx_ack_num = chr(ord(self.tx_ack_num) + 1)
            if (self.tx_ack_num == chr(ord('9') + 1)):
                self.tx_ack_num = '0'
        
        else:
            text_out  = self.Name + " FSM: received received frame " + seq
            text_out += " but expected " + self.tx_ack_num
            self.exp_out.print(text_out)
            

        # to send an ACK
        self.Lower_TR.send('1'+seq)

                
    def FSM_lower_Rx_ack(self, msg):
        
        ack = msg[0] # get the ack number
        if (ack != self.tx_seq_num):
            
            # the received ack does not match the seq
            text_out = "ACK # " + ack + " does not match local seq # " + self.tx_seq_num
            self.exp_out.print(text_out)
            return

        # stop timer
        self.timer_active = False
        
        # inc the seq
        self.tx_seq_num = chr(ord(self.tx_seq_num) + 1)
        if (self.tx_seq_num == chr(ord('9') + 1)):
                self.tx_seq_num = '0'
        
        # ready to send the next upper layer packet
        with self.cv_TxOp:
            self.State = STATE_READY_TO_SEND
            self.TxOp  = True
            self.cv_TxOp.notify()
            
    def FSM_timeout(self, msg):
        
        text_out = self.Name + " FSM: to resend frame " + self.tx_seq_num + " " + self.tx_buffer
        self.exp_out.print(text_out)
        
        # prepare to send a data message
        msg = '0' + self.tx_seq_num + self.tx_buffer     # inicating a new packet and its sequence number
        self.Lower_TR.send(msg)
        
        # start timer
        with self.cv_Timer:
            self.timer_active  = True
            self.timer_counter = 10
            self.cv_Timer.notify()

    def loop_timer(self):
        
        global thread_running
        self.exp_out.print(self.Name + ": loop_Timer starting")
        
        while (thread_running == True):
            
            with self.cv_Timer: 
                while (self.timer_active == False): 
                    self.cv_Timer.wait() 
            
                time.sleep(0.5)

                if (self.timer_counter == 0):
                    # add an event for timeout
                    self.event_add(EVENT_TIMEOUT, 'x')
                    self.timer_active = False

                else:
                    self.timer_counter = self.timer_counter - 1

In [4]:
class PLE_TR:

    def __init__(self, name, Socket, AP_Tx, AP_Rx, exp_out):
        
        self.Name      = name
        self.Socket    = Socket
        self.AP_Tx     = AP_Tx
        self.AP_Rx     = AP_Rx
        self.Queue_Tx  = Queue()
        self.Queue_Rx  = Queue()
        self.exp_out   = exp_out
        
    def loop_Tx(self):
        
        global thread_running
        self.exp_out.print(self.Name + ": loop_Tx starting")
    
        while (thread_running == True):
        
            # get a message from queue
            msg = self.Queue_Tx.get()

            prefix = f"{self.Name} Tx"
            text_out = f"{prefix}: message: " + str(msg)
            self.exp_out.print(text_out)
            
            # sending the message using socket
            msg_bytes = str.encode(msg)
            if (random.random() < 0.15):  # 3.a.1) 15% packet loss probability
                self.exp_out.print(f"{prefix}: packet loss")
            elif random.random() < 0.05:
                # 3.a.2) duplicating the transmission
                self.exp_out.print(f"{prefix}: transmission duplication")
                self.Socket.sendto(msg_bytes, self.AP_Tx)
                self.Socket.sendto(msg_bytes, self.AP_Tx)
            else:
                # sending the message using socket
                self.Socket.sendto(msg_bytes, self.AP_Tx)

    def loop_Rx(self):
        
        global thread_running
        global bufferSize
        self.exp_out.print(self.Name + ": loop_Rx starting")
    
        # binding the socket with the IP and port
        self.Socket.bind(self.AP_Rx)
        
        while (thread_running == True):
        
            # get a message from socket, this thread is blocked here
            msg_addr = self.Socket.recvfrom(bufferSize)

            # 3.b) the delay should be in range of 0 to 3 seconds
            #delay = random.uniform(0, 3)
            #time.sleep(delay)
            #time.sleep(0.1)
    
            msg  = msg_addr[0].decode('utf-8')
            addr = msg_addr[1]
            
            text_out = self.Name + " Rx: from " + str(addr) + ": " + str(msg)
            self.exp_out.print(text_out)
            
            self.Queue_Rx.put(msg)
            
    def send(self, msg):
        self.Queue_Tx.put(msg)
        
    def receive(self):
        return self.Queue_Rx.get()

In [5]:
class MLE_TR:
    
    def __init__(self, name, lower_TR, exp_out):
        
        self.Queue_User = Queue()
        self.Lower_TR = lower_TR
        self.Name = name
        self.Exp_ID = 0
        self.exp_out = exp_out

        # create a button object
        self.Button =  widgets.Button(
                            description='Start',
                            disabled=False,
                            button_style='', 
                            tooltip='Click me to submit a string',
                        )

        # Define a function to handle the click action on the button
        def on_submit_button_clicked(b):

            global thread_running
            
            #msg = self.Text.value
            #self.exp_out.print("ALE_0: received input: "+str(msg))
            prefix = f"{self.Name} Tx"
            self.exp_out.print(f"{prefix}: Sending 100 messages")
                
            if not thread_running:
                return

            self.Exp_ID += 1
            for i in range(100):
                msg = f"{self.Exp_ID}:{i + 1}:{time.time_ns()}"
                self.Lower_TR.send(msg)
                self.exp_out.print(f"{prefix}: Sending {msg}")

        # Attach the event to the button
        self.Button.on_click(on_submit_button_clicked)
        
        # Display the widget
        display(self.Button)

    def loop_Rx(self):
        
        global thread_running
        c = 0
        prefix = f"{self.Name} Rx"
        self.exp_out.print(f"{prefix}: loop_Rx starting")
        
        delay = 0
        
        msg_lost = 0
        loss_rate = 0
        
        msg_duplicated = 0
        duplication_rate = 0
    
        while thread_running:

            c = c + 1

            # get message from a lower layer
            # this thread is blocked here
            msg = self.Lower_TR.receive()
            received = time.time_ns()
            
            text_out = f"{prefix}: message#{c} {msg}"
            self.exp_out.print(text_out)

            # Parse received message
            msg_parsed = msg.split(':')
            msg_id = int(msg_parsed[0].strip())
            msg_i = int(msg_parsed[1].strip())
            msg_time = int(msg_parsed[2].strip())

            # Detect delay
            msg_delay = received - msg_time
            delay = (delay + msg_delay) / 2
            self.exp_out.print(f"{prefix}: Current delay: {delay}")
            
            # Detect loss rate
            if msg_i > c:
                msg_lost += msg_i - c
                self.exp_out.print(f"{prefix}: lacket poss: expected {c} but got {msg_i}, lost {msg_i - c} messages (total {msg_lost})")
                c = msg_i  # the inbetween got lost, so we increment to the one received, next loop we expect next msg
            loss_rate = msg_lost / c
            self.exp_out.print(f"{prefix}: Current loss rate: {(loss_rate * 100):.2f}%")

            # Detect duplication rate
            if c > msg_i:
                msg_duplicated += 1
                self.exp_out.print(f"{prefix}: Message {msg_i} already received, expected {c} (total {msg_duplicated} duplicated)")
                c -= 1
            duplication_rate = msg_duplicated / c
            self.exp_out.print(f"{prefix}: Current duplication rate: {(duplication_rate * 100):.2f}%")
        
    # to get a message from user queue. this function can block the thread
    def get(self):
        return self.Queue_User.get()

In [6]:
thread_running = False
bufferSize = 1024

# (0) creating an output object
exp_out = EXP_Output()

# (1) create physical layer entities
AP_local_1  = ("127.0.0.1", 30000)
AP_remote_1 = ("127.0.0.1", 31111)
Socket_1    = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
PLE_1       = PLE_TR("PLE_Alice", Socket_1, AP_remote_1, AP_local_1, exp_out)

# (2) create date link layer entities
DLE_1 = DLE_TR_FSM("DLE_Alice", PLE_1, exp_out)

# (3) create measurement layer entities
MLE_1 = MLE_TR("MLE_Alice", DLE_1, exp_out)

Button(description='Start', style=ButtonStyle(), tooltip='Click me to submit a string')

18:22:21> DLE_Alice: loop_Tx starting
18:22:21> DLE_Alice: loop_Rx starting
18:22:21> PLE_Alice: loop_Tx starting
18:22:21> PLE_Alice: loop_Rx starting
18:22:21> MLE_Alice Rx: loop_Rx starting
18:22:21> DLE_Alice: loop_FSM starting
18:22:21> DLE_Alice: loop_Timer starting
18:22:21> DLE_Bob: loop_Tx starting
18:22:21> DLE_Bob: loop_Rx starting
18:22:21> PLE_Bob: loop_Tx starting
18:22:21> PLE_Bob: loop_Rx starting
18:22:21> MLE_Bob Rx: loop_Rx starting
18:22:21> DLE_Bob: loop_FSM starting
18:22:21> DLE_Bob: loop_Timer starting
18:22:25> MLE_Alice Tx: Sending 100 messages
18:22:25> MLE_Alice Tx: Sending 1:1:1760566945749732131
18:22:25> MLE_Alice Tx: Sending 1:2:1760566945749742961
18:22:25> MLE_Alice Tx: Sending 1:3:1760566945749746711
18:22:25> MLE_Alice Tx: Sending 1:4:1760566945749749461
18:22:25> MLE_Alice Tx: Sending 1:5:1760566945749751861
18:22:25> MLE_Alice Tx: Sending 1:6:1760566945749754021
18:22:25> MLE_Alice Tx: Sending 1:7:1760566945749756331
18:22:25> MLE_Alice Tx: Sending

In [7]:
# (4) create physical layer entities
AP_local_2  = ("127.0.0.1", 31111)
AP_remote_2 = ("127.0.0.1", 30000)
Socket_2    = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
PLE_2       = PLE_TR("PLE_Bob", Socket_2, AP_remote_2, AP_local_2, exp_out)

# (5) create date link layer entities
DLE_2 = DLE_TR_FSM("DLE_Bob", PLE_2, exp_out)

# (6) create measurement layer entities
MLE_2 = MLE_TR("MLE_Bob", DLE_2, exp_out)

Button(description='Start', style=ButtonStyle(), tooltip='Click me to submit a string')

In [8]:
# start the loops of all entities
# all loops must be blocked at a certain position

t3_1 = Thread(target = DLE_1.loop_Tx, args = ())
t4_1 = Thread(target = DLE_1.loop_Rx, args = ())
t5_1 = Thread(target = PLE_1.loop_Tx, args = ()) 
t6_1 = Thread(target = PLE_1.loop_Rx, args = ())
t7_1 = Thread(target = MLE_1.loop_Rx, args = ())
f1_1 = Thread(target = DLE_1.loop_FSM, args = ())
f2_1 = Thread(target = DLE_1.loop_timer, args = ())

t3_2 = Thread(target = DLE_2.loop_Tx, args = ())
t4_2 = Thread(target = DLE_2.loop_Rx, args = ())
t5_2 = Thread(target = PLE_2.loop_Tx, args = ()) 
t6_2 = Thread(target = PLE_2.loop_Rx, args = ())
t7_2 = Thread(target = MLE_2.loop_Rx, args = ()) 
f1_2 = Thread(target = DLE_2.loop_FSM, args = ())
f2_2 = Thread(target = DLE_2.loop_timer, args = ())

thread_running = True

t3_1.start()
t4_1.start()
t5_1.start()
t6_1.start()
t7_1.start()
f1_1.start()
f2_1.start()

t3_2.start()
t4_2.start()
t5_2.start()
t6_2.start()
t7_2.start()
f1_2.start()
f2_2.start()


Exception in thread Thread-9 (loop_Rx):
Traceback (most recent call last):
  File [35m"/usr/lib64/python3.13/threading.py"[0m, line [35m1043[0m, in [35m_bootstrap_inner[0m
    [31mself.run[0m[1;31m()[0m
    [31m~~~~~~~~[0m[1;31m^^[0m
  File [35m"/home/dvt/.local/lib/python3.13/site-packages/ipykernel/ipkernel.py"[0m, line [35m772[0m, in [35mrun_closure[0m
    [31m_threading_Thread_run[0m[1;31m(self)[0m
    [31m~~~~~~~~~~~~~~~~~~~~~[0m[1;31m^^^^^^[0m
  File [35m"/usr/lib64/python3.13/threading.py"[0m, line [35m994[0m, in [35mrun[0m
    [31mself._target[0m[1;31m(*self._args, **self._kwargs)[0m
    [31m~~~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"/tmp/ipykernel_16238/413082734.py"[0m, line [35m47[0m, in [35mloop_Rx[0m
    [31mself.Socket.bind[0m[1;31m(self.AP_Rx)[0m
    [31m~~~~~~~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^[0m
[1;35mOSError[0m: [35m[Errno 98] Address already in use[0m
Exception in thread Thread-16 (loop_R