-
Notifications
You must be signed in to change notification settings - Fork 0
/
gbn.py
140 lines (120 loc) · 5.11 KB
/
gbn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import config
import threading
import time
import udt
import util
# Go-Back-N reliable transport protocol.
class GoBackN:
NO_PREV_ACK_MSG = "Don't have previous ACK to send, will wait for server to timeout."
# "msg_handler" is used to deliver messages to application layer
def __init__(self, local_port, remote_port, msg_handler):
util.log("Starting up `Go Back N` protocol ... ")
self.network_layer = udt.NetworkLayer(local_port, remote_port, self)
self.msg_handler = msg_handler
self.sender_base = 0
self.next_sequence_number = 0
self.set_timer()
self.window = [b'']*config.WINDOW_SIZE
self.expected_sequence_number = 0
self.receiver_last_ack = b''
self.is_receiver = True
self.sender_lock = threading.Lock()
def set_timer(self):
self.timer = threading.Timer((config.TIMEOUT_MSEC/1000.0), self._timeout)
# "send" is called by application. Return true on success, false otherwise.
def send(self, msg):
self.is_receiver = False
if self.next_sequence_number < (self.sender_base + config.WINDOW_SIZE):
self._send_helper(msg)
return True
else:
util.log("Window is full. App data rejected.")
time.sleep(1)
return False
# Helper fn for thread to send the next packet
def _send_helper(self, msg):
self.sender_lock.acquire()
packet = util.make_packet(msg, config.MSG_TYPE_DATA, self.next_sequence_number)
packet_data = util.extract_data(packet)
self.window[self.next_sequence_number%config.WINDOW_SIZE] = packet
util.log("Sending data: " + util.pkt_to_string(packet_data))
self.network_layer.send(packet)
if self.sender_base == self.next_sequence_number:
if self.timer.is_alive(): self.timer.cancel()
self.set_timer()
self.timer.start()
self.next_sequence_number += 1
self.sender_lock.release()
return
# "handler" to be called by network layer when packet is ready.
def handle_arrival_msg(self):
msg = self.network_layer.recv()
msg_data = util.extract_data(msg)
if(msg_data.is_corrupt):
if(self.is_receiver):
if self.expected_sequence_number == 0:
util.log("Packet received is corrupted. " + self.NO_PREV_ACK_MSG)
return
self.network_layer.send(self.receiver_last_ack)
util.log("Received corrupted data. Resending ACK: "
+ util.pkt_to_string(util.extract_data(self.receiver_last_ack)))
return
# If ACK message, assume its for sender
if msg_data.msg_type == config.MSG_TYPE_ACK:
self.sender_lock.acquire()
self.sender_base = msg_data.seq_num + 1
if(self.sender_base == self.next_sequence_number):
util.log("Received ACK with seq # matching the end of the window: "
+ util.pkt_to_string(msg_data) + ". Cancelling timer.")
self.timer.cancel()
else:
util.log("Received ACK: " + util.pkt_to_string(msg_data)
+ ". There are messages in-flight. Restarting the timer.")
if self.timer.is_alive(): self.timer.cancel()
self.set_timer()
self.timer.start()
self.sender_lock.release()
# If DATA message, assume its for receiver
else:
assert msg_data.msg_type == config.MSG_TYPE_DATA
util.log("Received DATA: " + util.pkt_to_string(msg_data))
if msg_data.seq_num == self.expected_sequence_number:
self.msg_handler(msg_data.payload)
ack_pkt = util.make_packet(b'', config.MSG_TYPE_ACK, self.expected_sequence_number)
self.network_layer.send(ack_pkt)
self.receiver_last_ack = ack_pkt
self.expected_sequence_number += 1
util.log("Sent ACK: " + util.pkt_to_string(util.extract_data(ack_pkt)))
else:
if self.expected_sequence_number == 0:
util.log("Packet received is out of order. " + self.NO_PREV_ACK_MSG)
return
util.log("DATA message had unexpected sequence #"
+ str(int(msg_data.seq_num)) + ". Resending ACK message with sequence # "
+ str(int(self.expected_sequence_number-1)) + ".")
self.network_layer.send(self.receiver_last_ack)
return
# Cleanup resources.
def shutdown(self):
if not self.is_receiver: self._wait_for_last_ACK()
if self.timer.is_alive(): self.timer.cancel()
util.log("Connection shutting down...")
self.network_layer.shutdown()
def _wait_for_last_ACK(self):
while self.sender_base < self.next_sequence_number-1:
util.log("Waiting for last ACK from receiver with sequence # "
+ str(int(self.next_sequence_number-1)) + ".")
time.sleep(1)
def _timeout(self):
util.log("Timeout! Resending all packets in window. Resending packets with seq #s "
+ str(self.sender_base) + "-" + str(self.next_sequence_number-1) +".")
self.sender_lock.acquire()
if self.timer.is_alive(): self.timer.cancel()
self.set_timer()
for i in range(self.sender_base,self.next_sequence_number):
pkt = self.window[(i%config.WINDOW_SIZE)]
self.network_layer.send(pkt)
util.log("Resending packet: " + util.pkt_to_string(util.extract_data(pkt)))
self.timer.start()
self.sender_lock.release()
return