In [1]:
import time
import socket
from queue import Queue 
from threading import Thread, Condition
import statistics

In [2]:
from ipywidgets import widgets
from IPython.display import display

class ALE_TextInput:
    
    def __init__(self):
        
        self.Queue_User = Queue()
        self.Text = widgets.Text()
        display(self.Text)
        
        def input_handler(sender):
    
            global thread_running
    
            msg = self.Text.value
            print(msg)
            self.Text.value = ""
            if msg=='end':
                thread_running = False

            #self.Queue_User.put(msg)
            for i in range (10):
                self.Queue_User.put("{}-{}".format(msg, i))
    
        
        self.Text.continuous_update = False

        self.Text.observe(input_handler, 'value')
        
    # to get a message from user queue. this function can block the thread
    def get(self):
        return self.Queue_User.get()

In [3]:
class ALE_TR:
    
    def __init__(self, name, upper_Tx, lower_TR):
        
        self.Name     = name
        self.Upper_Tx = upper_Tx  # must provide get()
        self.Lower_TR = lower_TR  # must provide send(msg), and receive()
        
    def loop_Tx(self):

        global thread_running
        c = 0
    
        while (thread_running == True):

            c = c + 1

            # get text from Upper_Tx, which must provide a get method
            # this thread is blocked here
            msg = self.Upper_Tx.get()
            print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
            print("%s Tx: message %d: %s"%(self.Name, c, msg))

            # add the text to the queue
            self.Lower_TR.send(msg)
            
    def loop_Rx(self):
        
        global thread_running
        c = 0
    
        while (thread_running == True):

            c = c + 1

            # get message from a lower layer
            # this thread is blocked here
            msg = self.Lower_TR.receive()
            print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
            print("%s Rx: message %d: %s"%(self.Name, c, msg))

In [4]:
# state of DLE
STATE_READY_TO_SEND = 0 # ready to send a packet to the DLE entity in another node through lower layer
STATE_WAITING_ACK   = 1 # waiting for ACK from the DLE entity in another node

# events of DLE
EVENT_UPPER_TX  = 0 # upper layer wants to send a packet
EVENT_LOWER_DAT = 1 # lower layer forwards an incoming data packet
EVENT_LOWER_ACK = 2 # lower layer forwards an incoming data packet
EVENT_TIMEOUT   = 3 # timer timeout
        
class DLE_TR_FSM:
    
    def __init__(self, name, lower_TR):
        
        self.Name      = name
        self.Lower_TR  = lower_TR   # must provide send(msg) and receive()
        self.Queue_Tx  = Queue()
        self.Queue_Rx  = Queue()
        
        # create a queue for Finite State Machine
        self.Queue_FSM = Queue()
        
        # create a flag = condition to start sending
        self.cv_TxOp = Condition()
        self.TxOp    = True

        # init state to STATE_WAITING_UPPER
        self.State = STATE_READY_TO_SEND
        
        self.Procedure = [[self.FSM_upper_Tx, self.FSM_abnormal],
                          [self.FSM_lower_Rx, self.FSM_lower_Rx],
                          [self.FSM_abnormal, self.FSM_lower_Rx_ack],
                          [self.FSM_abnormal, self.FSM_timeout]]
        
        # timer
        self.cv_Timer      = Condition()
        self.timer_active  = False
        self.timer_counter = 0
        
        # transmission 
        self.tx_buffer  = 0
        self.tx_seq_num = 0
        
        # reception
        self.tx_ack_num = 0
        
        
    def loop_Tx(self):

        global thread_running
        c = 0
    
        while (thread_running == True):

            # waiting for the ready to sent signal
            with self.cv_TxOp: 
                while (self.TxOp != True): 
                    self.cv_TxOp.wait()
            
                self.TxOp = False
                
                c = c + 1

                # get a message from queue
                msg = self.Queue_Tx.get()
                print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
                print("%s Tx: message %d: %s"%(self.Name, c, msg))
            
                self.event_add(EVENT_UPPER_TX, msg) 
            
    def loop_Rx(self):
        
        
        
        global thread_running
        c = 0
    
        while (thread_running == True):

            c = c + 1

            # get message from a lower layer
            # this thread is blocked here
            msg = self.Lower_TR.receive()
            print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
            print("%s Rx: message %d: %s"%(self.Name, c, msg))
            
            # add an event for FSM, assuming that msg is a sequence of bytes
            msg_type = msg[0]
            if (msg_type == 0):
                self.event_add(EVENT_LOWER_DAT, msg[1:])
            elif (msg_type == 1):
                self.event_add(EVENT_LOWER_ACK, msg[1:])
            else:
                print("%s Rx: message type unknown %d"%(self.Name, msg_type))
            
    def receive(self):
        return self.Queue_Rx.get()
    
    def send(self, msg):
        self.Queue_Tx.put(msg)
        
    def event_add(self, ev_type, msg):
        
        # preparing an event
        if (isinstance(msg, str)):
            msg = msg.encode()
        
        # the event is a sequence of bytes
        event = ev_type.to_bytes(1, "big")+msg
        
        # add event to queue
        self.Queue_FSM.put(event)
        
    def loop_FSM(self):
        
        global thread_running
        
        while (thread_running == True):
            
            # get the next event
            event = self.Queue_FSM.get()
            
            ev_type = event[0]
            
            print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
            print("%s FSM: event type: %d"%(self.Name, ev_type))
            
            # process the event
            msg = event[1:]
            self.Procedure[ev_type][self.State](msg)

    def FSM_abnormal(self, msg):
        
        print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
        print("%s FSM: error! %s"%(self.Name, msg))
    
    def FSM_upper_Tx(self, msg):   
        
        print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
        print("%s FSM: to send frame %d: %s"%(self.Name, self.tx_seq_num, msg.decode('utf-8')))

        # buffer the message
        self.tx_buffer = msg

        # waiting for ACK
        self.State = STATE_WAITING_ACK
        
        # prepare to send a data message
        msg = bytes([0, self.tx_seq_num]) + self.tx_buffer    # inicating a new packet with sequence number
        self.Lower_TR.send(msg)

        # start timer
        with self.cv_Timer:
            self.timer_active  = True
            self.timer_counter = 10
            self.cv_Timer.notify()       
        
        
    
    def FSM_lower_Rx(self, msg):

        # received a new data packet
        seq = msg[0]              # get the sequence number
        msg = msg[1:].decode()
        print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
        print("%s FSM: received frame %d: %s"%(self.Name, seq, msg))

        if (seq == self.tx_ack_num):
            
            # put the message in a receiving queue
            self.Queue_Rx.put(msg)
            
            self.tx_ack_num = self.tx_ack_num + 1
            if (self.tx_ack_num == 256):
                self.tx_ack_num = 0
                
        else:
            print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
            print("%s FSM: received frame %d but expected %d"%(self.Name, seq, self.tx_ack_num))

        # to send an ACK
        self.Lower_TR.send(bytes([1, seq]))

                
    def FSM_lower_Rx_ack(self, msg):

        # received an ACK                
        
        ack = msg[0] # get the ack number
        if (ack != self.tx_seq_num):
            
            # the received ack does not match the seq
            print("ACK # %d does not match local seq # %d"%(ack, self.tx_seq_num))
            return
        
        # stop timer
        self.timer_active  = False
        
        # inc the seq
        self.tx_seq_num = self.tx_seq_num + 1
        if (self.tx_seq_num == 256):
                self.tx_seq_num = 0
        
        # ready to send the next upper layer packet
        with self.cv_TxOp:
            self.State = STATE_READY_TO_SEND
            self.TxOp  = True
            self.cv_TxOp.notify()
            
    def FSM_timeout(self, msg):
        
        print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
        print("%s FSM: to resend frame %d: %s"%(self.Name, self.tx_seq_num, self.tx_buffer.decode('utf-8')))

        # prepare to send a data message
        msg = bytes([0, self.tx_seq_num]) + self.tx_buffer    # inicating a new packet with sequence number
        self.Lower_TR.send(msg)

        # start timer
        with self.cv_Timer:
            self.timer_active  = True
            self.timer_counter = 10
            self.cv_Timer.notify()

    def loop_timer(self):
        
        global thread_running
        
        while (thread_running == True):
            
            with self.cv_Timer: 
                while (self.timer_active == False): 
                    self.cv_Timer.wait() 
            
                time.sleep(0.5)

                if (self.timer_counter == 0):
                    # add an event for timeout
                    self.event_add(EVENT_TIMEOUT, bytes([0]))
                    self.timer_active = False

                else:
                    self.timer_counter = self.timer_counter - 1

In [5]:
import random

class PLE_TR:

    def __init__(self, name, Socket, AP_Tx, AP_Rx):
        
        self.Name      = name
        self.Socket    = Socket
        self.AP_Tx     = AP_Tx
        self.AP_Rx     = AP_Rx
        self.Queue_Tx  = Queue()
        self.Queue_Rx  = Queue()
        
    def loop_Tx(self):
        
        global thread_running
    
        while (thread_running == True):
        
            # get a message from queue
            msg = self.Queue_Tx.get()
            
            # Original Implementation (Modified in Section 3 Part 1, Subpart 1)
            #time.sleep(1) 
            
            # Modification that adds a random delay between 0 and 2 seconds
            randDelay = random.uniform(0, 2)
            time.sleep(randDelay)
        
            print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
            print("%s Tx: %s"%(self.Name, msg))
            
            # Original Implementation (Modified in Section 3 Part 1, Subpart 2)
            #if (random.random() < 0.2): 
            #   print("packet lost")

            # Now the probability was changed from 20% (0.2) to 10% (0.1)
            if (random.random() < 0.1): 
                print("packet lost")
            else:
                # Modification implementing probability of duplicating packets of 5% 
                #(Section 3 Part 1 Subpart 3)
                if (random.random() < 0.05):
                    print("packet duplicated")
                    self.Socket.sendto(msg, self.AP_Tx)
                
                
                # sending the message using socket
                self.Socket.sendto(msg, self.AP_Tx)

    def loop_Rx(self):
        
        global thread_running
        global bufferSize
    
        # binding the socket with the IP and port
        self.Socket.bind(self.AP_Rx)
        
        while (thread_running == True):
        
            # get a message from socket, this thread is blocked here
            msg_addr = self.Socket.recvfrom(bufferSize)
    
            msg  = msg_addr[0]
            addr = msg_addr[1]
            
            print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
            print("%s Rx from %s: %s"%(self.Name, addr, msg))        
            self.Queue_Rx.put(msg)
            
    def send(self, msg):
        if (isinstance(msg, str)):
            msg = msg.encode()
        self.Queue_Tx.put(msg)
        
    def receive(self):
        return self.Queue_Rx.get()

In [6]:
thread_running = False
bufferSize = 1024

# (1) create physical layer entities
AP_local_1  = ("127.0.0.1", 30000)
AP_remote_1 = ("127.0.0.1", 31111)
Socket_1    = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
PLE_1       = PLE_TR("PLE_Alice", Socket_1, AP_remote_1, AP_local_1)

# (2) create date link layer entities
DLE_1 = DLE_TR_FSM("DLE_Alice", PLE_1)

# (3) create application layer entities
ALE_0 = ALE_TextInput()
ALE_1 = ALE_TR("ALE_Alice", ALE_0, DLE_1)

Text(value='')

In [7]:
# (4) create physical layer entities
AP_local_2  = ("127.0.0.1", 31111)
AP_remote_2 = ("127.0.0.1", 30000)
Socket_2    = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
PLE_2       = PLE_TR("PLE_Bob", Socket_2, AP_remote_2, AP_local_2)

# (5) create date link layer entities
DLE_2 = DLE_TR_FSM("DLE_Bob", PLE_2)

# (6) create application layer entities
ALE_3 = ALE_TextInput()
ALE_2 = ALE_TR("ALE_Bob", ALE_3, DLE_2)


Text(value='')

In [8]:
# start the loops of all entities
# all loops must be blocked at a certain position

t1_1 = Thread(target = ALE_1.loop_Tx, args = ()) 
t2_1 = Thread(target = ALE_1.loop_Rx, args = ()) 
t3_1 = Thread(target = DLE_1.loop_Tx, args = ())
t4_1 = Thread(target = DLE_1.loop_Rx, args = ())
t5_1 = Thread(target = PLE_1.loop_Tx, args = ()) 
t6_1 = Thread(target = PLE_1.loop_Rx, args = ())
f1_1 = Thread(target = DLE_1.loop_FSM, args = ())
f2_1 = Thread(target = DLE_1.loop_timer, args = ())

t1_2 = Thread(target = ALE_2.loop_Tx, args = ()) 
t2_2 = Thread(target = ALE_2.loop_Rx, args = ()) 
t3_2 = Thread(target = DLE_2.loop_Tx, args = ())
t4_2 = Thread(target = DLE_2.loop_Rx, args = ())
t5_2 = Thread(target = PLE_2.loop_Tx, args = ()) 
t6_2 = Thread(target = PLE_2.loop_Rx, args = ()) 
f1_2 = Thread(target = DLE_2.loop_FSM, args = ())
f2_2 = Thread(target = DLE_2.loop_timer, args = ())



thread_running = True

t1_1.start()
t2_1.start()
t3_1.start()
t4_1.start()
t5_1.start()
t6_1.start()
t1_2.start()
t2_2.start()
t3_2.start()
t4_2.start()
t5_2.start()
t6_2.start()
f1_1.start()
f1_2.start()
f2_1.start()
f2_2.start()



In [9]:
print(t1_1.is_alive())
print(t2_1.is_alive())
print(t3_1.is_alive())
print(t4_1.is_alive())
print(t5_1.is_alive())
print(t6_1.is_alive())

print(t1_2.is_alive())
print(t2_2.is_alive())
print(t3_2.is_alive())
print(t4_2.is_alive())
print(t5_2.is_alive())
print(t6_2.is_alive())

print(f1_1.is_alive())
print(f1_2.is_alive())
print(f2_1.is_alive())
print(f2_2.is_alive())



True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True


In [10]:
class MLE_TR(PLE_TR):
    def __init__(self, name, Socket, AP_Tx, AP_Rx):
        super().__init__(name, Socket, AP_Tx, AP_Rx)

        # Create the button using ipywidgets (Section 4 Part 2) 
        self.button = widgets.Button(description="Measure")
        self.button.on_click(self.buttonClick)
        display(self.button)
        
        # Variables that will receive packets and duplicate packets
        self.receivedPacket = []
        self.duplicatedPacketSet = set()
    #Callback Function Click Event (Section 4 Part 3)
    def buttonClick(self, button):
        # This method sends 100 messages using a lower layer interface 
        #(Section 4 Part 3 SubPart 1)
        for i in range(100):
            # Create a packet with experiment ID, message ID and the current time with nanosecond accuracy using time.time_ns() function
            # (Section 4 Part 3 SubPart 2)
            experimentID = 1
            messageID = i
            timestamp = time.time_ns()
            packet = f"Exp:{experimentID}, Msg:{messageID}, Time:{timestamp}"
            
            # Send the packet through the bottom layer interface 
            self.send(packet)
            
            # Generate Experiment Output (Sec 4 Part 5 & 6)
            bob_mle.measurementOutputGenerator()

    def loop_Rx(self):
        global thread_running
        global bufferSize

        # binding the socket with the IP and port
        self.Socket.bind(self.AP_Rx)

        while (thread_running == True):
            # get a message from socket, this thread is blocked here
            msg_addr = self.Socket.recvfrom(bufferSize)

            msg  = msg_addr[0]
            addr = msg_addr[1]

            print(time.strftime("%H:%M:%S", time.localtime()), end=' ')
            print("%s Rx from %s: %s"%(self.Name, addr, msg))
            self.Queue_Rx.put(msg)

            # Process the received packet by sending it to the parsePacket() function so that it is divided in the correct way
            experimentID, messageID, sentTimenNS = self.parsePacket(msg)
            
            # Verify and add to the list if the packet is duplicate or not in the packet list
            # This is done for Part 4 (Section 4 Part 4)
            if (experimentID, messageID) not in self.receivedPacket:
                self.receivedPacket.append((experimentID, messageID, sentTimenNS, time.time_ns()))
            else:
                self.duplicatedPacketSet.add((experimentID, messageID))
                
    # Method in charge of dividing the packets to perform the calculations for the results
    def parsePacket(self, packet):
        packet_parts = packet.decode().split(", ")
        experimentID = int(packet_parts[0].split(":")[1])
        messageID = int(packet_parts[1].split(":")[1])
        sentTimenNS = int(packet_parts[2].split(":")[1])

        return experimentID, messageID, sentTimenNS
    
    # Analyzes lost, duplicated packets and delay to return the required result in experiments
    def analyzer(self):
        delays = []
        lostPackets = 0
        duplicatedPacketSet = 0

        for i in range(100):
            successfulPackets = [p for p in self.receivedPacket if p[1] == i]

            if len(successfulPackets) == 0:
                lostPackets += 1
            else:
                sentTimenNS = successfulPackets[0][2]
                receivedTimeNS = successfulPackets[0][3]
                delays.append((receivedTimeNS - sentTimenNS) / 1e6)  # Convertir a milisegundos

                if len(successfulPackets) > 1:
                    duplicatedPacketSet += len(successfulPackets) - 1
        
        #Store Results
        if len(delays) > 0:
            averageDelay = statistics.mean(delays)
        else:
            averageDelay = 0

        averageLossRate = lostPackets / 100
        averageDuplicationRate = duplicatedPacketSet / 100

        results = { 'averageDelay': averageDelay, 'averageLossRate': averageLossRate, 'averageDuplicationRate': averageDuplicationRate }

        return results
    
    # Function in charge of monitoring the process of receiving packets   
    def measurementOutputGenerator(self, packetCount=90, delaySeconds=10):
        #Start Timer
        startTimer = time.time()
        
        # Check if a minimum of packets received has been met
        # A certain amount of time has passed
        while True:
            elapsedTime = time.time() - startTimer
            successPacketCount = len(self.receivedPacket)
            
            if successPacketCount >= packetCount or elapsedTime >= delaySeconds:
                break
            
            # Wait for 1 second before checking again
            time.sleep(1)
            
        #Analyzes the results of packet receipt, saves it and returns it once called
        results = self.analyzer()
        self.resultsOutput(results)
        return results
    
    #Prints out the results
    def resultsOutput(self, results):
        print("Measurement Output:")
        print(f"Average delay (ms): {results['averageDelay']}")
        print(f"Average message loss rate: {results['averageLossRate']}")
        print(f"Average message duplication rate: {results['averageDuplicationRate']}")
        print(" ")
    
        
#Tester Entities (MLE)        
AP_local_mle_1 = ("127.0.0.1", 32000)
AP_local_mle_2 = ("127.0.0.1", 32111)
AP_remote_mle_2 = ("127.0.0.1", 32000)
AP_remote_mle_1 = ("127.0.0.1", 32111)


# Create new sockets for alice_mle and bob_mle
alice_mle_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
bob_mle_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)

# Call class MLE_TR
alice_mle = MLE_TR("MLE_Alice", alice_mle_socket, AP_remote_mle_1, AP_local_mle_1)
bob_mle = MLE_TR("MLE_Bob", bob_mle_socket, AP_remote_mle_2, AP_local_mle_2)

# Start threads for MLE_Alice and MLE_Bob
thread_running = True

# start the loops of all entities
# all loops must be blocked at a certain position
alice_mle_thread_tx = Thread(target=alice_mle.loop_Tx, args = ())
alice_mle_thread_rx = Thread(target=alice_mle.loop_Rx, args = ())
bob_mle_thread_tx = Thread(target=bob_mle.loop_Tx, args = ())
bob_mle_thread_rx = Thread(target=bob_mle.loop_Rx, args = ())

alice_mle_thread_tx.start()
alice_mle_thread_rx.start()
bob_mle_thread_tx.start()
bob_mle_thread_rx.start()

# Check if Alice and Bob's transmitters and receivers are working
print(alice_mle_thread_tx.is_alive())
print(alice_mle_thread_rx.is_alive())
print(bob_mle_thread_tx.is_alive())
print(bob_mle_thread_rx.is_alive())

time.sleep(5)


# Run the generator
bob_mle.measurementOutputGenerator()

Button(description='Measure', style=ButtonStyle())

Button(description='Measure', style=ButtonStyle())

True
True
True
True
Measurement Output:
Average delay (ms): 0
Average message loss rate: 1.0
Average message duplication rate: 0.0
 


{'averageDelay': 0, 'averageLossRate': 1.0, 'averageDuplicationRate': 0.0}

21:47:38 MLE_Alice Tx: b'Exp:1, Msg:0, Time:1679881657590673700'
21:47:38 MLE_Bob Rx from ('127.0.0.1', 32000): b'Exp:1, Msg:0, Time:1679881657590673700'
Measurement Output:
Average delay (ms): 1064.6514
Average message loss rate: 0.99
Average message duplication rate: 0.0
 
21:47:47 MLE_Alice Tx: b'Exp:1, Msg:1, Time:1679881667608542700'
21:47:47 MLE_Bob Rx from ('127.0.0.1', 32000): b'Exp:1, Msg:1, Time:1679881667608542700'
Measurement Output:
Average delay (ms): 575.20665
Average message loss rate: 0.98
Average message duplication rate: 0.0
 
21:47:58 MLE_Alice Tx: b'Exp:1, Msg:2, Time:1679881677612981500'
21:47:58 MLE_Bob Rx from ('127.0.0.1', 32000): b'Exp:1, Msg:2, Time:1679881677612981500'
Measurement Output:
Average delay (ms): 547.4080666666666
Average message loss rate: 0.97
Average message duplication rate: 0.0
 
21:48:07 MLE_Alice Tx: b'Exp:1, Msg:3, Time:1679881687673218400'
21:48:07 MLE_Bob Rx from ('127.0.0.1', 32000): b'Exp:1, Msg:3, Time:1679881687673218400'
Measuremen