/
DefaultInterfaceController.c
599 lines (499 loc) · 22.8 KB
/
DefaultInterfaceController.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "crypto/AddressCalc.h"
#include "crypto/CryptoAuth_pvt.h"
#include "net/DefaultInterfaceController.h"
#include "memory/Allocator.h"
#include "net/SwitchPinger.h"
#include "util/Base32.h"
#include "util/Bits.h"
#include "util/events/Time.h"
#include "util/events/Timeout.h"
#include "util/Identity.h"
#include "util/version/Version.h"
#include "wire/Error.h"
#include "wire/Message.h"
#include <stddef.h> // offsetof
/** After this number of milliseconds, a node will be regarded as unresponsive. */
#define UNRESPONSIVE_AFTER_MILLISECONDS (20*1024)
/**
* After this number of milliseconds without a valid incoming message,
* a peer is "lazy" and should be pinged.
*/
#define PING_AFTER_MILLISECONDS (3*1024)
/** How often to ping "lazy" peers, "unresponsive" peers are only pinged 20% of the time. */
#define PING_INTERVAL_MILLISECONDS 1024
/** The number of milliseconds to wait for a ping response. */
#define TIMEOUT_MILLISECONDS (2*1024)
/**
* The number of seconds to wait before an unresponsive peer
* making an incoming connection is forgotten.
*/
#define FORGET_AFTER_MILLISECONDS (256*1024)
/*--------------------Structs--------------------*/
struct IFCPeer
{
/** The interface which is registered with the switch. */
struct Interface switchIf;
/** The internal (wrapped by CryptoAuth) interface. */
struct Interface* cryptoAuthIf;
/** The external (network side) interface. */
struct Interface* external;
/** The label for this endpoint, needed to ping the endpoint. */
uint64_t switchLabel;
/** Milliseconds since the epoch when the last *valid* message was received. */
uint64_t timeOfLastMessage;
/** The handle which can be used to look up this endpoint in the endpoint set. */
uint32_t handle;
/** True if we should forget about the peer if they do not respond. */
bool isIncomingConnection : 1;
/**
* If InterfaceController_PeerState_UNAUTHENTICATED, no permanent state will be kept.
* During transition from HANDSHAKE to ESTABLISHED, a check is done for a registeration of a
* node which is already registered in a different switch slot, if there is one and the
* handshake completes, it will be moved.
*/
int state : 31;
// traffic counters
uint64_t bytesOut;
uint64_t bytesIn;
Identity
};
#define Map_NAME OfIFCPeerByExernalIf
#define Map_ENABLE_HANDLES
#define Map_KEY_TYPE struct Interface*
#define Map_VALUE_TYPE struct IFCPeer*
#include "util/Map.h"
struct Context
{
/** Public functions and fields for this ifcontroller. */
struct InterfaceController pub;
/** Used to get a peer by its handle. */
struct Map_OfIFCPeerByExernalIf peerMap;
struct Allocator* const allocator;
struct CryptoAuth* const ca;
/** Switch for adding nodes when they are discovered. */
struct SwitchCore* const switchCore;
/** Router needed to inject newly added nodes to bootstrap the system. */
struct RouterModule* const routerModule;
struct Log* const logger;
struct EventBase* const eventBase;
/** After this number of milliseconds, a neoghbor will be regarded as unresponsive. */
uint32_t unresponsiveAfterMilliseconds;
/** The number of milliseconds to wait before pinging. */
uint32_t pingAfterMilliseconds;
/** The number of milliseconds to let a ping go before timing it out. */
uint32_t timeoutMilliseconds;
/** After this number of milliseconds, an incoming connection is forgotten entirely. */
uint32_t forgetAfterMilliseconds;
/** A counter to allow for 3/4 of all pings to be skipped when a node is definitely down. */
uint32_t pingCount;
/** The timeout event to use for pinging potentially unresponsive neighbors. */
struct Timeout* const pingInterval;
/** For pinging lazy/unresponsive nodes. */
struct SwitchPinger* const switchPinger;
/** A password which is generated per-startup and sent out in beacon messages. */
uint8_t beaconPassword[Headers_Beacon_PASSWORD_LEN];
Identity
};
//---------------//
static inline struct Context* ifcontrollerForPeer(struct IFCPeer* ep)
{
return Identity_cast((struct Context*) ep->switchIf.senderContext);
}
static void onPingResponse(enum SwitchPinger_Result result,
uint64_t label,
String* data,
uint32_t millisecondsLag,
uint32_t version,
void* onResponseContext)
{
if (SwitchPinger_Result_OK != result) {
return;
}
struct IFCPeer* ep = Identity_cast((struct IFCPeer*) onResponseContext);
struct Context* ic = ifcontrollerForPeer(ep);
struct Address addr;
Bits_memset(&addr, 0, sizeof(struct Address));
Bits_memcpyConst(addr.key, CryptoAuth_getHerPublicKey(ep->cryptoAuthIf), 32);
addr.path = ep->switchLabel;
Log_debug(ic->logger, "got switch pong from node with version [%d]", version);
RouterModule_addNode(ic->routerModule, &addr, version);
#ifdef Log_DEBUG
// This will be false if it times out.
//Assert_true(label == ep->switchLabel);
uint8_t path[20];
AddrTools_printPath(path, label);
uint8_t sl[20];
AddrTools_printPath(sl, ep->switchLabel);
Log_debug(ic->logger, "Received [%s] from lazy endpoint [%s] [%s]",
SwitchPinger_resultString(result)->bytes, path, sl);
#endif
}
// Called from the pingInteral timeout.
static void pingCallback(void* vic)
{
struct Context* ic = Identity_cast((struct Context*) vic);
uint64_t now = Time_currentTimeMilliseconds(ic->eventBase);
ic->pingCount++;
// scan for endpoints have not sent anything recently.
for (uint32_t i = 0; i < ic->peerMap.count; i++) {
struct IFCPeer* ep = ic->peerMap.values[i];
// This is here because of a pathological state where the connection is in ESTABLISHED
// state but the *direct peer* has somehow been dropped from the routing table.
// TODO: understand the cause of this issue rather than checking for it once per second.
struct Node* peerNode = RouterModule_getNode(ep->switchLabel, ic->routerModule);
if (now > ep->timeOfLastMessage + ic->pingAfterMilliseconds || !peerNode) {
#ifdef Log_DEBUG
uint8_t key[56];
Base32_encode(key, 56, CryptoAuth_getHerPublicKey(ep->cryptoAuthIf), 32);
#endif
if (ep->isIncomingConnection
&& now > ep->timeOfLastMessage + ic->forgetAfterMilliseconds)
{
Log_debug(ic->logger, "Unresponsive peer [%s.k] has not responded in [%u] "
"seconds, dropping connection",
key, ic->forgetAfterMilliseconds / 1024);
Allocator_free(ep->external->allocator);
return;
}
bool unresponsive = (now > ep->timeOfLastMessage + ic->unresponsiveAfterMilliseconds);
uint32_t lag = ~0u;
if (unresponsive) {
// flush the peer from the table...
RouterModule_brokenPath(ep->switchLabel, ic->routerModule);
// Lets skip 87% of pings when they're really down.
if (ic->pingCount % 8) {
continue;
}
ep->state = InterfaceController_PeerState_UNRESPONSIVE;
lag = ((now - ep->timeOfLastMessage) / 1024);
} else {
lag = ((now - ep->timeOfLastMessage) / 1024);
}
struct SwitchPinger_Ping* ping =
SwitchPinger_newPing(ep->switchLabel,
String_CONST(""),
ic->timeoutMilliseconds,
onPingResponse,
ic->allocator,
ic->switchPinger);
if (!ping) {
Log_debug(ic->logger,
"Failed to ping %s peer [%s.k] lag [%u], out of ping slots.",
(unresponsive ? "unresponsive" : "lazy"), key, lag);
return;
}
ping->onResponseContext = ep;
SwitchPinger_sendPing(ping);
Log_debug(ic->logger,
"Pinging %s peer [%s.k] lag [%u]",
(unresponsive ? "unresponsive" : "lazy"), key, lag);
}
}
}
/** If there's already an endpoint with the same public key, merge the new one with the old one. */
static void moveEndpointIfNeeded(struct IFCPeer* ep, struct Context* ic)
{
Log_debug(ic->logger, "Checking for old sessions to merge with.");
uint8_t* key = CryptoAuth_getHerPublicKey(ep->cryptoAuthIf);
for (uint32_t i = 0; i < ic->peerMap.count; i++) {
struct IFCPeer* thisEp = ic->peerMap.values[i];
uint8_t* thisKey = CryptoAuth_getHerPublicKey(thisEp->cryptoAuthIf);
if (thisEp != ep && !Bits_memcmp(thisKey, key, 32)) {
Log_info(ic->logger, "Moving endpoint to merge new session with old.");
ep->switchLabel = thisEp->switchLabel;
SwitchCore_swapInterfaces(&thisEp->switchIf, &ep->switchIf);
Allocator_free(thisEp->external->allocator);
return;
}
}
}
// Incoming message which has passed through the cryptoauth and needs to be forwarded to the switch.
static uint8_t receivedAfterCryptoAuth(struct Message* msg, struct Interface* cryptoAuthIf)
{
struct IFCPeer* ep = Identity_cast((struct IFCPeer*) cryptoAuthIf->receiverContext);
struct Context* ic = ifcontrollerForPeer(ep);
ep->bytesIn += msg->length;
if (ep->state < InterfaceController_PeerState_ESTABLISHED) {
if (CryptoAuth_getState(cryptoAuthIf) >= CryptoAuth_HANDSHAKE3) {
moveEndpointIfNeeded(ep, ic);
ep->state = InterfaceController_PeerState_ESTABLISHED;
} else {
ep->state = InterfaceController_PeerState_HANDSHAKE;
// prevent some kinds of nasty things which could be done with packet replay.
// This is checking the message switch header and will drop it unless the label
// directs it to *this* router.
if (msg->length < 8 || msg->bytes[7] != 1) {
Log_info(ic->logger, "Dropping message because CA is not established.");
return Error_NONE;
} else {
// When a "server" gets a new connection from a "client" the router doesn't
// know about that client so if the client sends a packet to the server, the
// server will be unable to handle it until the client has sent inter-router
// communication to the server. Here we will ping the client so when the
// server gets the ping response, it will insert the client into its table
// and know its version.
// prevent DoS by limiting the number of times this can be called per second
// limit it to 7, this will affect innocent packets but it doesn't matter much
// since this is mostly just an optimization and for keeping the tests happy.
if ((ic->pingCount + 1) % 7) {
pingCallback(ic);
}
}
}
} else if (ep->state == InterfaceController_PeerState_UNRESPONSIVE
&& CryptoAuth_getState(cryptoAuthIf) >= CryptoAuth_HANDSHAKE3)
{
ep->state = InterfaceController_PeerState_ESTABLISHED;
} else {
ep->timeOfLastMessage = Time_currentTimeMilliseconds(ic->eventBase);
}
return ep->switchIf.receiveMessage(msg, &ep->switchIf);
}
// This is directly called from SwitchCore, message is not encrypted.
static uint8_t sendFromSwitch(struct Message* msg, struct Interface* switchIf)
{
struct IFCPeer* ep = Identity_cast((struct IFCPeer*) switchIf);
ep->bytesOut += msg->length;
// This sucks but cryptoauth trashes the content when it encrypts
// and we need to be capable of sending back a coherent error message.
uint8_t top[255];
uint8_t* messageBytes = msg->bytes;
uint16_t padding = msg->padding;
uint16_t len = (msg->length < 255) ? msg->length : 255;
Bits_memcpy(top, msg->bytes, len);
struct Context* ic = ifcontrollerForPeer(ep);
uint8_t ret;
uint64_t now = Time_currentTimeMilliseconds(ic->eventBase);
if (now - ep->timeOfLastMessage > ic->unresponsiveAfterMilliseconds) {
// XXX: This is a hack because if the time of last message exceeds the
// unresponsive time, we need to send back an error and that means
// mangling the message which would otherwise be in the queue.
struct Allocator* tempAlloc = Allocator_child(ic->allocator);
struct Message* toSend = Message_clone(msg, tempAlloc);
ret = Interface_sendMessage(ep->cryptoAuthIf, toSend);
Allocator_free(tempAlloc);
} else {
ret = Interface_sendMessage(ep->cryptoAuthIf, msg);
}
// If this node is unresponsive then return an error.
if (ret || now - ep->timeOfLastMessage > ic->unresponsiveAfterMilliseconds)
{
msg->bytes = messageBytes;
msg->padding = padding;
msg->length = len;
Bits_memcpy(msg->bytes, top, len);
return ret ? ret : Error_UNDELIVERABLE;
} else {
/* Way way way too much noise
Log_debug(ic->logger, "Sending to neighbor, last message from this node was [%u] ms ago.",
(now - ep->timeOfLastMessage));
*/
}
return Error_NONE;
}
static int closeInterface(struct Allocator_OnFreeJob* job)
{
struct IFCPeer* toClose = Identity_cast((struct IFCPeer*) job->userData);
struct Context* ic = ifcontrollerForPeer(toClose);
// flush the peer from the table...
RouterModule_brokenPath(toClose->switchLabel, ic->routerModule);
int index = Map_OfIFCPeerByExernalIf_indexForHandle(toClose->handle, &ic->peerMap);
Assert_true(index >= 0);
Map_OfIFCPeerByExernalIf_remove(index, &ic->peerMap);
return 0;
}
static int registerPeer(struct InterfaceController* ifController,
uint8_t herPublicKey[32],
String* password,
bool requireAuth,
bool isIncomingConnection,
struct Interface* externalInterface)
{
struct Context* ic = Identity_cast((struct Context*) ifController);
Log_debug(ic->logger, "registerPeer [%p] total [%u]",
(void*)externalInterface, ic->peerMap.count);
if (Map_OfIFCPeerByExernalIf_indexForKey(&externalInterface, &ic->peerMap) > -1) {
Log_debug(ic->logger, "Skipping registerPeer [%p] because peer is already registered",
(void*)externalInterface);
return 0;
}
uint8_t ip6[16];
if (herPublicKey) {
AddressCalc_addressForPublicKey(ip6, herPublicKey);
if (!AddressCalc_validAddress(ip6)) {
return InterfaceController_registerPeer_BAD_KEY;
}
}
struct Allocator* epAllocator = externalInterface->allocator;
struct IFCPeer* ep = Allocator_calloc(epAllocator, sizeof(struct IFCPeer), 1);
ep->bytesOut = 0;
ep->bytesIn = 0;
ep->external = externalInterface;
int setIndex = Map_OfIFCPeerByExernalIf_put(&externalInterface, &ep, &ic->peerMap);
ep->handle = ic->peerMap.handles[setIndex];
Identity_set(ep);
Allocator_onFree(epAllocator, closeInterface, ep);
// If the other end need not supply a valid password to connect
// we will set the connection state to HANDSHAKE because we don't
// want the connection to be trashed after the first invalid packet.
if (!requireAuth) {
ep->state = InterfaceController_PeerState_HANDSHAKE;
}
ep->cryptoAuthIf =
CryptoAuth_wrapInterface(externalInterface, herPublicKey, requireAuth, true, ic->ca);
ep->cryptoAuthIf->receiveMessage = receivedAfterCryptoAuth;
ep->cryptoAuthIf->receiverContext = ep;
// Always use authType 1 until something else comes along, then we'll have to refactor.
if (password) {
CryptoAuth_setAuth(password, 1, ep->cryptoAuthIf);
}
ep->isIncomingConnection = isIncomingConnection;
Bits_memcpyConst(&ep->switchIf, (&(struct Interface) {
.sendMessage = sendFromSwitch,
// ifcontrollerForPeer uses this.
// sendFromSwitch relies on the fact that the
// switchIf is the same memory location as the Peer.
.senderContext = ic,
.allocator = epAllocator
}), sizeof(struct Interface));
int ret = SwitchCore_addInterface(&ep->switchIf, 0, &ep->switchLabel, ic->switchCore);
if (ret) {
return (ret == SwitchCore_addInterface_OUT_OF_SPACE)
? InterfaceController_registerPeer_OUT_OF_SPACE
: InterfaceController_registerPeer_INTERNAL;
}
// We want the node to immedietly be pinged but we don't want it to appear unresponsive because
// the pinger will only ping every (PING_INTERVAL * 8) so we set timeOfLastMessage to
// (now - pingAfterMilliseconds - 1) so it will be considered a "lazy node".
ep->timeOfLastMessage =
Time_currentTimeMilliseconds(ic->eventBase) - ic->pingAfterMilliseconds - 1;
if (herPublicKey) {
#ifdef Log_INFO
uint8_t printAddr[60];
AddrTools_printIp(printAddr, ip6);
Log_info(ic->logger, "Adding peer [%s]", printAddr);
#endif
// Kick the ping callback so that the node will be pinged ASAP.
pingCallback(ic);
}
return 0;
}
static enum InterfaceController_PeerState getPeerState(struct Interface* iface)
{
struct Interface* cryptoAuthIf = CryptoAuth_getConnectedInterface(iface);
struct IFCPeer* p = Identity_cast((struct IFCPeer*) cryptoAuthIf->receiverContext);
return p->state;
}
static void populateBeacon(struct InterfaceController* ifc, struct Headers_Beacon* beacon)
{
struct Context* ic = Identity_cast((struct Context*) ifc);
beacon->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
Bits_memcpyConst(beacon->password, ic->beaconPassword, Headers_Beacon_PASSWORD_LEN);
Bits_memcpyConst(beacon->publicKey, ic->ca->publicKey, 32);
}
static int getPeerStats(struct InterfaceController* ifController,
struct Allocator* alloc,
struct InterfaceController_peerStats** statsOut)
{
struct Context* ic = Identity_cast((struct Context*) ifController);
int count = ic->peerMap.count;
struct InterfaceController_peerStats* stats =
Allocator_malloc(alloc, sizeof(struct InterfaceController_peerStats)*count);
for (int i = 0; i < count; i++) {
struct IFCPeer* peer = ic->peerMap.values[i];
struct InterfaceController_peerStats* s = &stats[i];
s->pubKey = CryptoAuth_getHerPublicKey(peer->cryptoAuthIf);
s->bytesOut = peer->bytesOut;
s->bytesIn = peer->bytesIn;
s->timeOfLastMessage = peer->timeOfLastMessage;
s->state = peer->state;
s->switchLabel = peer->switchLabel;
s->isIncomingConnection = peer->isIncomingConnection;
s->user = NULL;
if (s->isIncomingConnection) {
s->user = CryptoAuth_getUser(peer->cryptoAuthIf);
}
struct ReplayProtector* rp = CryptoAuth_getReplayProtector(peer->cryptoAuthIf);
s->duplicates = rp->duplicates;
s->lostPackets = rp->lostPackets;
s->receivedOutOfRange = rp->receivedOutOfRange;
}
*statsOut = stats;
return count;
}
static int disconnectPeer(struct InterfaceController* ifController, uint8_t herPublicKey[32])
{
struct Context* ic = Identity_cast((struct Context*) ifController);
for (uint32_t i = 0; i < ic->peerMap.count; i++) {
struct IFCPeer* peer = ic->peerMap.values[i];
if (!Bits_memcmp(herPublicKey, CryptoAuth_getHerPublicKey(peer->cryptoAuthIf), 32)) {
Allocator_free(peer->external->allocator);
return 0;
}
}
return InterfaceController_disconnectPeer_NOTFOUND;
}
struct InterfaceController* DefaultInterfaceController_new(struct CryptoAuth* ca,
struct SwitchCore* switchCore,
struct RouterModule* routerModule,
struct Log* logger,
struct EventBase* eventBase,
struct SwitchPinger* switchPinger,
struct Random* rand,
struct Allocator* allocator)
{
struct Context* out = Allocator_malloc(allocator, sizeof(struct Context));
Bits_memcpyConst(out, (&(struct Context) {
.pub = {
.registerPeer = registerPeer,
.disconnectPeer = disconnectPeer,
.getPeerState = getPeerState,
.populateBeacon = populateBeacon,
.getPeerStats = getPeerStats,
},
.peerMap = {
.allocator = allocator
},
.allocator = allocator,
.ca = ca,
.switchCore = switchCore,
.routerModule = routerModule,
.logger = logger,
.eventBase = eventBase,
.switchPinger = switchPinger,
.unresponsiveAfterMilliseconds = UNRESPONSIVE_AFTER_MILLISECONDS,
.pingAfterMilliseconds = PING_AFTER_MILLISECONDS,
.timeoutMilliseconds = TIMEOUT_MILLISECONDS,
.forgetAfterMilliseconds = FORGET_AFTER_MILLISECONDS,
.pingInterval = (switchPinger)
? Timeout_setInterval(pingCallback,
out,
PING_INTERVAL_MILLISECONDS,
eventBase,
allocator)
: NULL
}), sizeof(struct Context));
Identity_set(out);
// Add the beaconing password.
Random_bytes(rand, out->beaconPassword, Headers_Beacon_PASSWORD_LEN);
String strPass = { .bytes=(char*)out->beaconPassword, .len=Headers_Beacon_PASSWORD_LEN };
int ret = CryptoAuth_addUser(&strPass, 1, String_CONST("Local Peers"), ca);
if (ret) {
Log_warn(logger, "CryptoAuth_addUser() returned [%d]", ret);
}
return &out->pub;
}