In [1]:
import time
import socket
from queue import Queue 
from threading import Thread, Condition
from ipywidgets import widgets
from IPython.display import display
import random

In [2]:
class EXP_Output:
        def __init__(self):
            #self.out = widgets.Output(layout={'border': '1px solid black'})
            #display(self.out)
            #with self.out:
            #print(\"Experiment outputs\")
    
            self.Queue_Print = Queue()
            self.thread = Thread(target=self.loop_Print)
            self.thread.start()
    
        def print(self, msg):
            text_out = time.strftime("%H:%M:%S", time.localtime()) + "> "
            self.Queue_Print.put(text_out+msg)
    
        def loop_Print(self):
            while True:
                msg = self.Queue_Print.get()
                #with self.out:
                print(msg)
                self.Queue_Print.task_done()

In [3]:
class ALE_TextInput:
    
        def __init__(self, exp_out):
            
            self.Queue_User = Queue()
            self.exp_out = exp_out
    
            # Create a text input widget
            self.Text = widgets.Text(
                            value='',
                            placeholder='Type something and press enter',
                            description='To send:',
                            disabled=False
                        )
    
            # create a button object
            self.Botton =  widgets.Button(
                                description='Submit',
                                disabled=False,
                                button_style='', 
                                tooltip='Click me to submit a string',
                            )
    
            # Define a function to handle the click action on the button
            def on_submit_button_clicked(b):
    
                global thread_running
                
                msg = self.Text.value
                #self.exp_out.print(\"ALE_0: received input: \"+str(msg))
                    
                if thread_running == True:
                    if msg == "end":
                        thread_running = False
                    
                    for i in range(20):
                        self.Queue_User.put(msg+str(i))
    
                self.Text.value = ''  # Clear the input field after submission
    
            # Attach the event to the button
            self.Botton.on_click(on_submit_button_clicked)
            
            # Display the widgets
            display(self.Text, self.Botton)
            
        # to get a message from user queue. this function can block the thread
        def get(self):
            return self.Queue_User.get()

In [4]:
class ALE_TR:
    
    def __init__(self, name, upper_Tx, lower_TR, exp_out):
        
        self.Name     = name
        self.Upper_Tx = upper_Tx
        self.Lower_TR = lower_TR
        self.exp_out  = exp_out
        
    def loop_Tx(self):
        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Tx starting")
    
        while (thread_running == True):
            c = c + 1
            msg = self.Upper_Tx.get()
            text_out = self.Name + " Tx: message " + str(c) + ": " + str(msg)
            self.exp_out.print(text_out)
            self.Lower_TR.send(msg)
            
    def loop_Rx(self):
        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Rx starting")
    
        while (thread_running == True):
            c = c + 1
            msg = self.Lower_TR.receive()
            text_out = self.Name + " Rx: message " + str(c) + ": " + str(msg)
            self.exp_out.print(text_out)

In [5]:
STATE_READY_TO_SEND = 0
STATE_WAITING_ACK   = 1

EVENT_UPPER_TX  = 0
EVENT_LOWER_DAT = 1
EVENT_LOWER_ACK = 2
EVENT_TIMEOUT   = 3
        
class DLE_TR_FSM:
    
    def __init__(self, name, lower_TR, exp_out):
        
        self.Name      = name
        self.Lower_TR  = lower_TR
        self.Queue_Tx  = Queue()
        self.Queue_Rx  = Queue()
        self.exp_out   = exp_out
        
        self.Queue_FSM = Queue()
        
        self.cv_TxOp = Condition()
        self.TxOp    = True

        self.State = STATE_READY_TO_SEND
        
        self.Procedure = [[self.FSM_upper_Tx, self.FSM_abnormal],
                          [self.FSM_lower_Rx, self.FSM_lower_Rx],
                          [self.FSM_abnormal, self.FSM_lower_Rx_ack],
                          [self.FSM_abnormal, self.FSM_timeout]] 
        

        self.cv_Timer      = Condition()
        self.timer_active  = False
        self.timer_counter = 0
        
        self.tx_buffer  = ''
        self.tx_seq_num = '0'
        
        self.tx_ack_num = '0'
        
        
    def loop_Tx(self):
        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Tx starting")
    
        while (thread_running == True):
            with self.cv_TxOp: 
                while (self.TxOp != True): 
                    self.cv_TxOp.wait()
            
                self.TxOp = False
                c = c + 1
                msg = self.Queue_Tx.get()
                text_out = self.Name + " Tx: message " + str(c) + ": " + str(msg)
                self.exp_out.print(text_out)
                self.event_add(EVENT_UPPER_TX, msg) 

    def loop_Rx(self):
        global thread_running
        c = 0
        self.exp_out.print(self.Name + ": loop_Rx starting")
    
        while (thread_running == True):
            c = c + 1
            msg = self.Lower_TR.receive()
            text_out = self.Name + " Rx: message " + str(c) + ": " + str(msg)
            self.exp_out.print(text_out)
            
            msg_type = msg[0]
            if (msg_type == '0'):
                self.event_add(EVENT_LOWER_DAT, msg[1:])
            elif (msg_type == '1'):
                self.event_add(EVENT_LOWER_ACK, msg[1:])
            else:
                text_out = self.Name + " Rx: message type unknown " + msg_type
                self.exp_out.print(text_out)

    def receive(self):
        return self.Queue_Rx.get()
    
    def send(self, msg):
        self.Queue_Tx.put(msg)
        
    def event_add(self, ev_type, msg):
        if (isinstance(msg, str)):
            msg = msg.encode()
        
        event = ev_type.to_bytes(1, "big")+msg
        self.Queue_FSM.put(event)        
        

    def loop_FSM(self):
        global thread_running
        self.exp_out.print(self.Name + ": loop_FSM starting")
        
        while (thread_running == True):
            event = self.Queue_FSM.get()
            ev_type = event[0]
            text_out = self.Name + " FSM: state: " + str(self.State)+" event type: " + str(ev_type)
            self.exp_out.print(text_out)
            msg = event[1:].decode('utf-8')
            self.Procedure[ev_type][self.State](msg)
        
    def FSM_abnormal(self, msg):
        text_out = self.Name + " FSM: error! " + msg
        self.exp_out.print(text_out)
    
    def FSM_upper_Tx(self, msg):   
        text_out = self.Name + " FSM: to send: " + msg
        self.exp_out.print(text_out)

        self.tx_buffer = msg
        
        msg = '0' + self.tx_seq_num + msg
        self.Lower_TR.send(msg)

        self.State = STATE_WAITING_ACK
        
        with self.cv_Timer:
            self.timer_active  = True
            self.timer_counter = 10
            self.cv_Timer.notify()    
    
    def FSM_lower_Rx(self, msg):
        seq = msg[0]
        msg = msg[1:]
        
        text_out = self.Name + " FSM: received: " + msg
        self.exp_out.print(text_out)

        if (seq == self.tx_ack_num):
            self.Queue_Rx.put(msg)
            self.tx_ack_num = chr(ord(self.tx_ack_num) + 1)
            if (self.tx_ack_num == chr(ord('9') + 1)):
                self.tx_ack_num = '0'
        
        else:
            text_out  = self.Name + " FSM: received received frame " + seq
            text_out += " but expected " + self.tx_ack_num
            self.exp_out.print(text_out)
            
        self.Lower_TR.send('1'+seq)

                
    def FSM_lower_Rx_ack(self, msg):
        ack = msg[0]
        if (ack != self.tx_seq_num):
            text_out = "ACK # " + ack + " does not match local seq # " + self.tx_seq_num
            self.exp_out.print(text_out)
            return

        self.timer_active = False
        
        self.tx_seq_num = chr(ord(self.tx_seq_num) + 1)
        if (self.tx_seq_num == chr(ord('9') + 1)):
                self.tx_seq_num = '0'
        
        with self.cv_TxOp:
            self.State = STATE_READY_TO_SEND
            self.TxOp  = True
            self.cv_TxOp.notify()
            
    def FSM_timeout(self, msg):
        text_out = self.Name + " FSM: to resend frame " + self.tx_seq_num + " " + self.tx_buffer
        self.exp_out.print(text_out)
        
        msg = '0' + self.tx_seq_num + self.tx_buffer
        self.Lower_TR.send(msg)
        
        with self.cv_Timer:
            self.timer_active  = True
            self.timer_counter = 10
            self.cv_Timer.notify()

    def loop_timer(self):
        global thread_running
        self.exp_out.print(self.Name + ": loop_Timer starting")
        
        while (thread_running == True):
            with self.cv_Timer: 
                while (self.timer_active == False): 
                    self.cv_Timer.wait() 
            
            time.sleep(0.5)

            if (self.timer_counter == 0):
                self.event_add(EVENT_TIMEOUT, 'x')
                self.timer_active = False

            else:
                self.timer_counter = self.timer_counter - 1

In [6]:
class PLE_TR:

    def __init__(self, name, Socket, AP_Tx, AP_Rx, exp_out):
        
        self.Name      = name
        self.Socket    = Socket
        self.AP_Tx     = AP_Tx
        self.AP_Rx     = AP_Rx
        self.Queue_Tx  = Queue()
        self.Queue_Rx  = Queue()
        self.exp_out   = exp_out
        
    def loop_Tx(self):
        global thread_running
        self.exp_out.print(self.Name + ": loop_Tx starting")
    
        while (thread_running == True):
            msg = self.Queue_Tx.get()
            text_out = self.Name + " Tx: message: " + str(msg)
            self.exp_out.print(text_out)
            msg_bytes = str.encode(msg)
            
            rand_val = random.random()
            
            if rand_val < 0.15:  # 15% packet loss
                self.exp_out.print("packet loss")
            elif rand_val < 0.20:  # 5% packet duplication
                self.exp_out.print("packet duplication")
                # Send original packet
                self.Socket.sendto(msg_bytes, self.AP_Tx)
                # Send duplicate packet
                self.Socket.sendto(msg_bytes, self.AP_Tx)
            else:  # 80% normal transmission
                self.Socket.sendto(msg_bytes, self.AP_Tx)

    def loop_Rx(self):
        global thread_running
        global bufferSize
        self.exp_out.print(self.Name + ": loop_Rx starting")
    
        self.Socket.bind(self.AP_Rx)
        
        while (thread_running == True):
            msg_addr = self.Socket.recvfrom(bufferSize)

            # Random delay between 0 and 3 seconds
            delay_time = random.uniform(0, 3)
            time.sleep(delay_time)
            self.exp_out.print(f"delayed for {delay_time:.2f} seconds")
        
            msg  = msg_addr[0].decode('utf-8')
            addr = msg_addr[1]
            text_out = self.Name + " Rx: from " + str(addr) + ": " + str(msg)
            self.exp_out.print(text_out)
            self.Queue_Rx.put(msg)
            
    def send(self, msg):
        self.Queue_Tx.put(msg)
        
    def receive(self):
        return self.Queue_Rx.get()

In [7]:
thread_running = False
bufferSize = 1024

exp_out = EXP_Output()

AP_local_1  = ("127.0.0.1", 30000)
AP_remote_1 = ("127.0.0.1", 31111)
Socket_1    = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
PLE_1       = PLE_TR("PLE_Alice", Socket_1, AP_remote_1, AP_local_1, exp_out)

DLE_1 = DLE_TR_FSM("DLE_Alice", PLE_1, exp_out)

ALE_0 = ALE_TextInput(exp_out)
ALE_1 = ALE_TR("ALE_Alice", ALE_0, DLE_1, exp_out)

Text(value='', description='To send:', placeholder='Type something and press enter')

Button(description='Submit', style=ButtonStyle(), tooltip='Click me to submit a string')

22:09:40> ALE_Alice: loop_Tx starting
22:09:40> ALE_Alice: loop_Rx starting
22:09:40> DLE_Alice: loop_Tx starting
22:09:40> DLE_Alice: loop_Rx starting
22:09:40> PLE_Alice: loop_Tx starting
22:09:40> PLE_Alice: loop_Rx starting
22:09:40> ALE_Bob: loop_Tx starting
22:09:40> ALE_Bob: loop_Rx starting
22:09:40> DLE_Bob: loop_Tx starting
22:09:40> DLE_Bob: loop_Rx starting
22:09:40> PLE_Bob: loop_Tx starting
22:09:40> PLE_Bob: loop_Rx starting
22:09:40> DLE_Alice: loop_FSM starting
22:09:40> DLE_Bob: loop_FSM starting
22:09:40> DLE_Alice: loop_Timer starting
22:09:40> DLE_Bob: loop_Timer starting
22:09:58> ALE_Bob Tx: message 1: Bob to Alice0
22:09:58> ALE_Bob Tx: message 2: Bob to Alice1
22:09:58> ALE_Bob Tx: message 3: Bob to Alice2
22:09:58> ALE_Bob Tx: message 4: Bob to Alice3
22:09:58> ALE_Bob Tx: message 5: Bob to Alice4
22:09:58> ALE_Bob Tx: message 6: Bob to Alice5
22:09:58> ALE_Bob Tx: message 7: Bob to Alice6
22:09:58> ALE_Bob Tx: message 8: Bob to Alice7
22:09:58> ALE_Bob Tx: me

In [8]:
AP_local_2  = ("127.0.0.1", 31111)
AP_remote_2 = ("127.0.0.1", 30000)
Socket_2    = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
PLE_2       = PLE_TR("PLE_Bob", Socket_2, AP_remote_2, AP_local_2, exp_out)

DLE_2 = DLE_TR_FSM("DLE_Bob", PLE_2, exp_out)

ALE_3 = ALE_TextInput(exp_out)
ALE_2 = ALE_TR("ALE_Bob", ALE_3, DLE_2, exp_out)

Text(value='', description='To send:', placeholder='Type something and press enter')

Button(description='Submit', style=ButtonStyle(), tooltip='Click me to submit a string')

In [9]:
t1_1 = Thread(target = ALE_1.loop_Tx, args = ()) 
t2_1 = Thread(target = ALE_1.loop_Rx, args = ()) 
t3_1 = Thread(target = DLE_1.loop_Tx, args = ())
t4_1 = Thread(target = DLE_1.loop_Rx, args = ())
t5_1 = Thread(target = PLE_1.loop_Tx, args = ()) 
t6_1 = Thread(target = PLE_1.loop_Rx, args = ())
f1_1 = Thread(target = DLE_1.loop_FSM, args = ())
f2_1 = Thread(target = DLE_1.loop_timer, args = ())

t1_2 = Thread(target = ALE_2.loop_Tx, args = ()) 
t2_2 = Thread(target = ALE_2.loop_Rx, args = ()) 
t3_2 = Thread(target = DLE_2.loop_Tx, args = ())
t4_2 = Thread(target = DLE_2.loop_Rx, args = ())
t5_2 = Thread(target = PLE_2.loop_Tx, args = ()) 
t6_2 = Thread(target = PLE_2.loop_Rx, args = ()) 
f1_2 = Thread(target = DLE_2.loop_FSM, args = ())
f2_2 = Thread(target = DLE_2.loop_timer, args = ())

thread_running = True

t1_1.start()
t2_1.start()
t3_1.start()
t4_1.start()
t5_1.start()
t6_1.start()
t1_2.start()
t2_2.start()
t3_2.start()
t4_2.start()
t5_2.start()
t6_2.start()
f1_1.start()
f1_2.start()
f2_1.start()
f2_2.start()