-
Notifications
You must be signed in to change notification settings - Fork 1
/
PacketBuffer.cpp
161 lines (131 loc) · 5.9 KB
/
PacketBuffer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include "PacketBuffer.h"
using namespace std;
PacketBuffer::PacketBuffer(mutex& consoleMutex):consoleMutex(consoleMutex),
shutdownFlag(false), numberOfPackets(0) {}
void PacketBuffer::addPacket(unique_ptr<ENetPacket> packet) {
{
unique_lock<mutex> lock(bufferMutex);
if(shutdownFlag){
{
std::lock_guard<std::mutex> guard(consoleMutex);
cout << "Packet Buffer is in shutdown. Packets cannot be queued for service in shutdown.\n" << endl;
}
// TODO: deal with packets already in the system. Don't want data loss
return;
}
packetQueue.push(std::move(packet));
numberOfPackets++;
{
std::lock_guard<std::mutex> guard(consoleMutex);
// printf("Packet received in Output Buffer.\n\ttick = %zu\n", GetOD_Packet(packetQueue.back()->data)->tick()->tick_number());
printf("Packet received in Output Buffer.\n\ttick = %zu\n", GetOD_Packet(packetQueue.back()->data)->client_tick()->tick_number());
}
}
buffer_Condition.notify_all();
}
void PacketBuffer::addBufferHandler(unique_ptr<BufferHandler> packet) {
{
unique_lock<mutex> lock(bufferMutex);
if(shutdownFlag){
{
std::lock_guard<std::mutex> guard(consoleMutex);
cout << "Input Buffer is in shutdown. Packets cannot be queued for service in shutdown.\n" << endl;
}
// TODO: deal with packets already in the system. Don't want data loss
return;
}
packetQueueIn.push(std::move(packet));
numberOfPackets++;
{
std::lock_guard<std::mutex> guard(consoleMutex);
printf("Packet received in Input Buffer.\n\ttick = %zu\n", packetQueueIn.back()->getPacketView()->client_tick()->tick_number());
}
}
buffer_Condition.notify_all();
}
vector<unique_ptr<BufferHandler>> PacketBuffer::removePacketInstant() {
unique_lock<mutex> lock(bufferMutex); // lock the buffer
// enter wait state and unlock lock until the packetQueue is notified, then check if it satisfies the lambda function if not
// go back to waiting. This approach prevents random wakeups as even if it is woken up randomly it will not proceed unless it
// can
buffer_Condition.wait(lock, [this] {
return (!packetQueue.empty() || shutdownFlag.load());
});
if(packetQueueIn.empty() && shutdownFlag.load()){
cout << "empty or shutdown in packet buffer instant method" << endl;
vector<std::unique_ptr<BufferHandler>> temp2;
return temp2;
}
vector<unique_ptr<BufferHandler>> packetList;
packetList.reserve(packetQueueIn.size());
while(!packetQueueIn.empty()){
auto packet = std::move(packetQueueIn.front()); // pull out the packet
{
std::lock_guard<std::mutex> guard(consoleMutex);
cout <<"Number of packets(pulling from buffer in transmitter): " <<numberOfPackets << endl;
cout <<"Tick no. in transmitter: " <<packet->getPacketView()->client_tick()->tick_number() << endl;
}
packetQueueIn.pop();
packetList.push_back(std::move(packet));
numberOfPackets--;
}
// cout << numberOfPackets << endl;
return std::move(packetList);
}
/*
* In removePacket() a lock based on the bufferMutex mutex is acquired, once acquired if the queue is empty the thread enters a wait
* state based upon the buffer_Condition condition variable and it gives up the lock. This condition variable is passed the lock. Thus when this thread is
* notified to wake up by cv.notify_one() or cv.notify_all() it will wake up and try to reacquire the lock (handled by the wait funciton)
* , and once it reacquires the lock it will begin the service the request.
*/
vector<unique_ptr<ENetPacket>> PacketBuffer::removePacketWait() {
unique_lock<mutex> lock(bufferMutex); // lock the buffer
{
std::lock_guard<std::mutex> guard(consoleMutex);
cout << "transmitter is attempting to pull from buffer" << endl;
}
// enter wait state and unlock lock until the packetQueue is notified, then check if it satisfies the lambda function if not
// go back to waiting. This approach prevents random wakeups as even if it is woken up randomly it will not proceed unless it
// can
buffer_Condition.wait(lock, [this] {
return (!packetQueue.empty() || shutdownFlag.load());
});
{
std::lock_guard<std::mutex> guard(consoleMutex);
cout << "transmitter got the green light" << endl;
}
if(packetQueue.empty() || shutdownFlag.load()){
{
std::lock_guard<std::mutex> guard(consoleMutex);
cout << "Packet Buffer is in shutDown. " << shutdownFlag.load() << endl << "All existing packets have been serviced." << endl;
}
vector<std::unique_ptr<ENetPacket>> temp2;
return temp2; //
}
// cout << numberOfPackets << endl;
vector<unique_ptr<ENetPacket>> packetList;
packetList.reserve(packetQueue.size());
while(!packetQueue.empty()){
auto packet = std::move(packetQueue.front()); // pull out the packet
{
std::lock_guard<std::mutex> guard(consoleMutex);
cout <<"Number of packets(pulling from buffer in transmitter): " <<numberOfPackets << endl;
cout <<"Tick no. in transmitter: " <<GetOD_Packet(packet->data)->client_tick()->tick_number() << endl;
}
packetQueue.pop();
packetList.push_back(std::move(packet));
numberOfPackets--;
}
// cout << numberOfPackets << endl;
return std::move(packetList);
}
void PacketBuffer::notifyAll() {
buffer_Condition.notify_all();
}
void PacketBuffer::shutdown() {
shutdownFlag.store(true);
buffer_Condition.notify_all(); // wake all threads waiting of this shutdown command
}
int PacketBuffer::getNumberOfPackets() const {
return numberOfPackets;
}