/
splitter_ems.cc
304 lines (270 loc) · 13 KB
/
splitter_ems.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
//
// splitter_ems.cc
// P2PSP
//
// This code is distributed under the GNU General Public License (see
// THE_GNU_GENERAL_PUBLIC_LICENSE.txt for extending this information).
// Copyright (C) 2016, the P2PSP team.
// http://www.p2psp.org
//
// EMS: Endpoint Masquerading Set of Rules
//
#include "splitter_ems.h"
#include "common_nts.h"
#include "common.h"
#include "../util/trace.h"
#include <cassert>
#include <thread>
#include <random>
namespace p2psp {
using namespace std;
using namespace boost::asio;
Splitter_EMS::Splitter_EMS() : Splitter_NTS(), peer_pairs_(0, &Splitter_DBS::GetHash){
INFO("Initialized EMS");
}
Splitter_EMS::~Splitter_EMS(){}
//method mostly identical to the NTS variant, with slight change to include public->private endpoint matching and lookup when peers are sent out
void Splitter_EMS::SendTheListOfPeers2(
const std::shared_ptr<ip::tcp::socket> &peer_serve_socket,
const ip::udp::endpoint& peer) {
ip::address target_address = peer_serve_socket->remote_endpoint().address();
// Send all peers except the monitor peers with their peer ID
// plus all peers currently being incorporated
uint16_t number_of_other_peers = this->peer_list_.size()
- this->number_of_monitors_ + this->incorporating_peers_.size();
if (Common_NTS::Contains(this->peers_, peer)) {
// Then peer is also in this->incorporating_peers_
// Do not send the peer endpoint to itself
number_of_other_peers = std::max(0, number_of_other_peers - 1);
}
INFO("Sending the list of peers except monitors (" << number_of_other_peers
<< " peers)");
std::ostringstream msg_str;
Common_NTS::Write<uint16_t>(msg_str, number_of_other_peers);
peer_serve_socket->send(buffer(msg_str.str()));
ip::udp::endpoint peer_endpoint;
for (std::vector<ip::udp::endpoint>::iterator peer_iter =
this->peer_list_.begin() + this->number_of_monitors_;
peer_iter != this->peer_list_.end(); ++peer_iter) {
// Also send the port step of the existing peer, in case
// it is behind a sequentially allocating NAT
const PeerInfo& peer_info = this->peers_[*peer_iter];
msg_str.str(std::string());
msg_str << peer_info.id_;
peer_endpoint = boost::asio::ip::udp::endpoint(peer_iter->address(), peer_iter->port());
//send prmagic_flags_ = Common::kEMS;ivate instead of public endpoint if target peer is in same private network
if (peer_iter->address() == target_address) {
peer_endpoint = peer_pairs_[peer_endpoint];
INFO("target peer at" << target_address.to_string() << " is in a private network with peer sent as("
<< peer_endpoint.address().to_string() << "," << std::to_string(peer_endpoint.port()) << ")");
Common_NTS::Write<uint32_t>(msg_str,
(uint32_t)peer_endpoint.address().to_v4().to_ulong());
Common_NTS::Write<uint16_t>(msg_str, peer_endpoint.port());
Common_NTS::Write<uint16_t>(msg_str, peer_info.port_step_);
} else {
INFO("sent peer("
<< peer_iter->address().to_string() << "," << std::to_string(peer_info.last_source_port_) << ")");
Common_NTS::Write<uint32_t>(msg_str,
(uint32_t)peer_iter->address().to_v4().to_ulong());
Common_NTS::Write<uint16_t>(msg_str, peer_info.last_source_port_);
Common_NTS::Write<uint16_t>(msg_str, peer_info.port_step_);
}
peer_serve_socket->send(buffer(msg_str.str()));
}
// Send the peers currently being incorporated
for (const auto& peer_iter : this->incorporating_peers_) {
const std::string& peer_id = peer_iter.first;
// Do not send the peer endpoint to itself
if (Common_NTS::Contains(this->peers_, peer)
&& peer_id == this->peers_[peer].id_) {
continue;
}
INFO("Sending peer " << peer_id << " to " << peer);
const ip::udp::endpoint& peer = peer_iter.second.peer_;
const PeerInfo& peer_info = this->peers_[peer];
msg_str.str(std::string());
msg_str << peer_id;
if (peer.address() == target_address) {
peer_endpoint = peer_pairs_[peer];
INFO("target peer at" << target_address.to_string() << " is in a private network with peer sent as("
<< peer_endpoint.address().to_string() << "," << std::to_string(peer_endpoint.port()) << ")");
Common_NTS::Write<uint32_t>(msg_str,
(uint32_t)peer_endpoint.address().to_v4().to_ulong());
Common_NTS::Write<uint16_t>(msg_str, peer_endpoint.port());
Common_NTS::Write<uint16_t>(msg_str, peer_info.port_step_);
} else {
INFO("sent peer("
<< peer.address().to_string() << "," << std::to_string(peer_info.last_source_port_) << ")");
Common_NTS::Write<uint32_t>(msg_str,
(uint32_t)peer.address().to_v4().to_ulong());
Common_NTS::Write<uint16_t>(msg_str, peer_info.last_source_port_);
Common_NTS::Write<uint16_t>(msg_str, peer_info.port_step_);
}
peer_serve_socket->send(buffer(msg_str.str()));
}
}
void Splitter_EMS::RemovePeer(const ip::udp::endpoint& peer) {
if(std::find(peer_list_.begin(),peer_list_.begin()+number_of_monitors_,peer)!=peer_list_.begin()+number_of_monitors_)
number_of_monitors_--;
Splitter_LRS::RemovePeer(peer);
try {
this->peers_.erase(peer);
peer_pairs_.erase(peer);
} catch (std::exception e) {
TRACE(e.what());
// ignore
}
}
//Method mostly identical to NTS variant, with small change to include receiving of private endpoint message from incoming peer
void Splitter_EMS::HandleAPeerArrival(
std::shared_ptr<boost::asio::ip::tcp::socket> serve_socket) {
// This method implements the NAT traversal algorithms.
boost::asio::ip::tcp::endpoint new_peer_tcp = serve_socket->remote_endpoint();
boost::asio::ip::udp::endpoint new_peer(new_peer_tcp.address(), new_peer_tcp.port());
INFO("Accepted connection from peer " << new_peer);
boost::array<char, 7> buf;
char *raw_data = buf.data();
boost::asio::ip::address ip_addr;
int port;
read((*serve_socket), boost::asio::buffer(buf));
in_addr ip_raw = *(in_addr *)(raw_data);
ip_addr = boost::asio::ip::address::from_string(inet_ntoa(ip_raw));
port = ntohs(*(short *)(raw_data + 4));
char sig = *(raw_data+6);
boost::asio::ip::udp::endpoint peer_local_endpoint_ = boost::asio::ip::udp::endpoint(ip_addr, port);
TRACE("peer local endpoint = (" << peer_local_endpoint_.address().to_string() << ","
<< std::to_string(peer_local_endpoint_.port()) << ")");
Splitter_EMS::peer_pairs_.emplace(boost::asio::ip::udp::endpoint(new_peer_tcp.address(),
new_peer_tcp.port()), peer_local_endpoint_);
//read((*serve_socket),boost::asio::buffer(message));
//std::string s(message.begin(),message.end());
TRACE("Contents of Signalling message: "<<sig);
if(sig=='M'){
if(number_of_monitors_!=1){
number_of_monitors_++;
TRACE("The number of monitors increased to "<<number_of_monitors_);
}
else{
if(this->peer_list_.size()>=1)
number_of_monitors_++;
TRACE("The number of monitors increased to "<<number_of_monitors_);
}
}
this->SendConfiguration(serve_socket);
// Send the generated ID to peer
std::string peer_id = this->GenerateId();
INFO("Sending ID " << peer_id << " to peer " << new_peer);
serve_socket->send(boost::asio::buffer(peer_id));
std::unique_lock<std::mutex> lock(arriving_incorporating_peers_mutex_);
if (this->peer_list_.size() < (unsigned int) this->number_of_monitors_ || sig=='M') {
// Directly incorporate the monitor peer into the team.
// The source ports are all set to the same, as the monitor peers
// should be publicly accessible
this->peers_[new_peer] = PeerInfo{peer_id, .port_step_ = 0,
.last_source_port_ = new_peer.port()};
this->SendNewPeer(peer_id, new_peer,
std::vector<uint16_t>(this->number_of_monitors_, new_peer.port()), 0);
this->InsertPeer(new_peer);
serve_socket->close();
} else {
this->arriving_peers_[peer_id] = ArrivingPeerInfo{serve_socket,
new_peer.address(), 0,
std::vector<uint16_t>(this->number_of_monitors_, 0),
std::chrono::steady_clock::now()};
// Splitter will continue with IncorporatePeer() as soon as the
// arriving peer has sent UDP packets to splitter and monitor
}
}
//mostly identical to same method in NTS, with addition EMS checks
void Splitter_EMS::SendNewPeer(const std::string& peer_id,
const ip::udp::endpoint& new_peer,
const std::vector<uint16_t>& source_ports_to_monitors, uint16_t port_step) {
// Recreate this->extra_socket_, listening to a random port
// TODO: is the extra_socket recreated too often?
if (this->extra_socket_)
{
this->extra_socket_->close();
}
this->extra_socket_.reset(new ip::udp::socket(this->io_service_));
this->extra_socket_->open(ip::udp::v4());
try {
this->extra_socket_->bind(ip::udp::endpoint(ip::udp::v4(), 0));
} catch (std::exception e) {
ERROR(e.what());
}
// Do not block the thread forever:
this->extra_socket_->set_option(socket_base::linger(true, 1));
uint16_t extra_listen_port = this->extra_socket_->local_endpoint().port();
DEBUG("Listening to the extra port " << extra_listen_port);
DEBUG("Sending [send hello to " << new_peer << ']');
// The peers start port prediction at the minimum known source port,
// counting up using their peer_number
std::vector<uint16_t> source_ports(source_ports_to_monitors);
source_ports.push_back(new_peer.port());
uint16_t min_known_source_port = *std::min_element(source_ports.begin(),
source_ports.end());
// Send packets to all peers;
unsigned int peer_number = 0;
for (auto peer_iter = this->peer_list_.begin();
peer_iter != this->peer_list_.end(); ++peer_iter, ++peer_number) {
std::ostringstream msg_str;
msg_str << peer_id;
if (peer_number < (unsigned int) this->number_of_monitors_) {
// Send only the endpoint of the peer to the monitor,
// as the arriving peer and the monitor already communicated
Common_NTS::Write<uint32_t>(msg_str,
(uint32_t)new_peer.address().to_v4().to_ulong());
Common_NTS::Write<uint16_t>(msg_str,
source_ports_to_monitors[peer_number]);
} else {
// Send all information necessary for port prediction to the
// existing peers
//EMS check
ip::udp::endpoint sent_peer = new_peer;
if (new_peer.address() == peer_iter->address())
{
sent_peer = peer_pairs_[new_peer];
}
Common_NTS::Write<uint32_t>(msg_str,
(uint32_t) sent_peer.address().to_v4().to_ulong());
Common_NTS::Write<uint16_t>(msg_str, min_known_source_port);
Common_NTS::Write<uint16_t>(msg_str, port_step);
// Splitter is "peer number 0", thus add 1
Common_NTS::Write<uint16_t>(msg_str, peer_number+1);
if (this->peers_[*peer_iter].port_step_ != 0) {
// Send the port of this->extra_socket_ to determine the
// currently allocated source port of the incorporated peer
Common_NTS::Write<uint16_t>(msg_str, extra_listen_port);
}
}
// Hopefully one of these packets arrives
this->EnqueueMessage(3, std::make_pair(msg_str.str(), *peer_iter));
}
// Send packets to peers currently being incorporated
for (const auto& peer_iter : this->incorporating_peers_) {
const std::string& inc_peer_id = peer_iter.first;
if (inc_peer_id == peer_id) {
// Do not send the peer endpoint to the peer itself
continue;
}
INFO("Sending peer " << new_peer << " to " << inc_peer_id);
const ip::udp::endpoint& peer = peer_iter.second.peer_;
//EMS check
ip::udp::endpoint sent_peer = new_peer;
if (new_peer.address() == peer.address())
{
sent_peer = peer_pairs_[new_peer];
}
std::ostringstream msg_str;
msg_str << peer_id;
Common_NTS::Write<uint32_t>(msg_str,
(uint32_t)sent_peer.address().to_v4().to_ulong());
Common_NTS::Write<uint16_t>(msg_str, min_known_source_port);
Common_NTS::Write<uint16_t>(msg_str, port_step);
// Send the length of the peer_list as peer_number
Common_NTS::Write<uint16_t>(msg_str, this->peer_list_.size()+1);
// Hopefully one of these packets arrives
this->EnqueueMessage(3, std::make_pair(msg_str.str(), peer));
}
}
}