Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added logic to ping over links which seem bad in order to prove that …

…the neighbor is ok.
  • Loading branch information...
commit 291eb313ae5e67ebf87ba2022a3093ef3f43a626 1 parent 259e49d
Caleb James DeLisle authored
View
29 cjdroute.c
@@ -41,6 +41,8 @@
#include "memory/BufferAllocator.h"
#include "memory/Allocator.h"
#include "net/Ducttape.h"
+#include "net/SwitchPinger.h"
+#include "net/SwitchPinger_admin.h"
#include "switch/SwitchCore.h"
#include "util/Base32.h"
#include "util/Hex.h"
@@ -535,6 +537,21 @@ int main(int argc, char** argv)
SerializationModule_register(context.registry, context.allocator);
+ struct Ducttape* dt = Ducttape_register(&config,
+ privateKey,
+ context.registry,
+ context.routerModule,
+ context.tunInterface,
+ context.switchCore,
+ context.base,
+ context.allocator,
+ context.logger,
+ context.admin);
+
+ struct SwitchPinger* sp =
+ SwitchPinger_new(&dt->switchPingerIf, context.base, context.logger, context.allocator);
+ SwitchPinger_admin_register(sp, context.admin, context.allocator);
+
// Interfaces.
struct InterfaceController* ifController =
InterfaceController_new(context.ca,
@@ -542,6 +559,7 @@ int main(int argc, char** argv)
context.routerModule,
context.logger,
context.base,
+ sp,
context.allocator);
Dict* interfaces = Dict_getDict(&config, String_CONST("interfaces"));
@@ -591,17 +609,6 @@ int main(int argc, char** argv)
&logger,
context.allocator);
- Ducttape_register(&config,
- privateKey,
- context.registry,
- context.routerModule,
- context.tunInterface,
- context.switchCore,
- context.base,
- context.allocator,
- context.logger,
- context.admin);
-
uint8_t address[53];
Base32_encode(address, 53, myAddr.key, 32);
Log_info(context.logger, "Your address is: %s.k\n", address);
View
2  interface/CMakeLists.txt
@@ -28,7 +28,7 @@ add_library(interface
SessionManager.c
)
-target_link_libraries(interface util ${LIBEVENT2_LIBRARIES})
+target_link_libraries(interface util cjdnet ${LIBEVENT2_LIBRARIES})
enable_testing()
add_subdirectory(test)
View
132 interface/InterfaceController.c
@@ -16,8 +16,10 @@
#include "interface/InterfaceController.h"
#include "interface/InterfaceMap.h"
#include "memory/Allocator.h"
+#include "net/SwitchPinger.h"
#include "util/Bits.h"
#include "util/Time.h"
+#include "util/Timeout.h"
#include "wire/Error.h"
#include "wire/Message.h"
@@ -28,6 +30,10 @@
/** After this number of milliseconds, a node will be regarded as unresponsive. */
#define UNRESPONSIVE_AFTER_MILLISECONDS 10000
+#define PING_AFTER_MILLISECONDS 3000
+
+#define PING_INTERVAL 1000
+
/*--------------------Structs--------------------*/
struct Endpoint
@@ -52,6 +58,9 @@ struct Endpoint
/** The lookup key for this endpoint (ip/mac address) */
uint8_t key[InterfaceController_KEY_SIZE];
+ /** The label for this endpoint, needed to ping the endpoint. */
+ uint64_t switchLabel;
+
/**
* True after the CryptoAuth handshake is completed.
* This is used to check for a registeration of a node which is already registered in a
@@ -59,9 +68,6 @@ struct Endpoint
*/
bool authenticated : 1;
- /** If true then the entry will be removed if it sits too long with no incoming message. */
- bool removeIfExpires : 1;
-
/**
* The time of the last incoming message, used to clear out endpoints
* if they are not responsive.
@@ -90,30 +96,105 @@ struct InterfaceController
struct event_base* const eventBase;
- /** After this number of milliseconds, a node will be regarded as unresponsive. */
+ /** After this number of milliseconds, a neoghbor will be regarded as unresponsive. */
uint32_t unresponsiveAfterMilliseconds;
+
+ /** The number of milliseconds to wait before pinging. */
+ uint32_t pingAfterMilliseconds;
+
+ /** The timeout event to use for pinging potentially unresponsive neighbors. */
+ struct Timeout* const pingInterval;
+
+ /** For pinging lazy/unresponsive nodes. */
+ struct SwitchPinger* const switchPinger;
};
+//---------------//
+
+static inline struct InterfaceController* interfaceControllerForEndpoint(struct Endpoint* ep)
+{
+ return ep->internal.senderContext;
+}
+
+static void onPingResponse(enum SwitchPinger_Result result,
+ uint64_t label,
+ String* data,
+ uint32_t millisecondsLag,
+ void* onResponseContext)
+{
+ // Do nothing, the ping response is handled since it's incoming data.
+ #ifdef Log_DEBUG
+ struct Endpoint* ep = onResponseContext;
+ struct InterfaceController* ic = interfaceControllerForEndpoint(ep);
+ Assert_true(label == ep->switchLabel);
+ uint8_t path[20];
+ AddrTools_printPath(path, label);
+ Log_debug(ic->logger, "Received pong from lazy endpoint [%s]", path);
+ #endif
+}
+
+// Called from the pingInteral timeout.
+static void pingCallback(void* vic)
+{
+ struct InterfaceController* ic = vic;
+ uint32_t now = Time_currentTimeMilliseconds(ic->eventBase);
+
+ // scan for endpoints have not sent anything recently.
+ for (int i = 0; i < CJDNS_MAX_PEERS; i++) {
+ struct Endpoint* ep = &ic->endpoints[i];
+ if (ep->external != NULL && now > ep->timeOfLastMessage + ic->pingAfterMilliseconds) {
+ uint8_t path[20];
+ AddrTools_printPath(path, ep->switchLabel);
+ if (now > ep->timeOfLastMessage + ic->unresponsiveAfterMilliseconds) {
+ // Lets skip 75% of pings when they're really down.
+ if (now % 4) {
+ continue;
+ }
+ Log_debug(ic->logger, "Pinging unresponsive neighbor [%s].", path);
+ } else {
+ Log_debug(ic->logger, "Pinging lazy neighbor [%s].", path);
+ }
+
+ struct SwitchPinger_Ping* ping =
+ SwitchPinger_ping(ep->switchLabel,
+ String_CONST(""),
+ 0,
+ onPingResponse,
+ ic->switchPinger);
+
+ ping->onResponseContext = ep;
+ }
+ }
+}
+
struct InterfaceController* InterfaceController_new(struct CryptoAuth* ca,
struct SwitchCore* switchCore,
struct RouterModule* routerModule,
struct Log* logger,
struct event_base* eventBase,
+ struct SwitchPinger* switchPinger,
struct Allocator* allocator)
{
- return allocator->clone(sizeof(struct InterfaceController),
- allocator,
- &(struct InterfaceController)
- {
- .imap = InterfaceMap_new(InterfaceController_KEY_SIZE, allocator),
- .allocator = allocator,
- .ca = ca,
- .switchCore = switchCore,
- .routerModule = routerModule,
- .logger = logger,
- .eventBase = eventBase,
- .unresponsiveAfterMilliseconds = UNRESPONSIVE_AFTER_MILLISECONDS
- });
+ struct InterfaceController* out =
+ allocator->malloc(sizeof(struct InterfaceController), allocator);
+ Bits_memcpyConst(out, (&(struct InterfaceController) {
+ .imap = InterfaceMap_new(InterfaceController_KEY_SIZE, allocator),
+ .allocator = allocator,
+ .ca = ca,
+ .switchCore = switchCore,
+ .routerModule = routerModule,
+ .logger = logger,
+ .eventBase = eventBase,
+ .switchPinger = switchPinger,
+ .unresponsiveAfterMilliseconds = UNRESPONSIVE_AFTER_MILLISECONDS,
+ .pingAfterMilliseconds = PING_AFTER_MILLISECONDS,
+
+ .pingInterval = (switchPinger)
+ ? Timeout_setInterval(pingCallback, out, PING_INTERVAL, eventBase, allocator)
+ : NULL
+
+ }), sizeof(struct InterfaceController));
+ return out;
}
static inline struct Endpoint* endpointForInternalInterface(struct Interface* iface)
@@ -121,14 +202,8 @@ static inline struct Endpoint* endpointForInternalInterface(struct Interface* if
return (struct Endpoint*) (((char*)iface) - offsetof(struct Endpoint, internal));
}
-static inline struct InterfaceController* interfaceControllerForEndpoint(struct Endpoint* ep)
-{
- return ep->internal.senderContext;
-}
-
/** If there's already an endpoint with the same public key, merge the new one with the old one. */
-static inline struct Endpoint* moveEndpointIfNeeded(struct Endpoint* ep,
- struct InterfaceController* ic)
+static void moveEndpointIfNeeded(struct Endpoint* ep, struct InterfaceController* ic)
{
Log_debug(ic->logger, "Checking for old sessions to merge with.");
@@ -147,13 +222,12 @@ static inline struct Endpoint* moveEndpointIfNeeded(struct Endpoint* ep,
if (!memcmp(thisKey, key, 32)) {
Log_info(ic->logger, "Moving endpoint to merge new session with old.");
+ ep->switchLabel = thisEp->switchLabel;
SwitchCore_swapInterfaces(&thisEp->switchIf, &ep->switchIf);
thisEp->internal.allocator->free(thisEp->internal.allocator);
-
- return ep;
+ return;
}
}
- return ep;
}
// Incoming message which has passed through the cryptoauth and needs to be forwarded to the switch.
@@ -165,7 +239,7 @@ static uint8_t receivedAfterCryptoAuth(struct Message* msg, struct Interface* cr
ep->timeOfLastMessage = Time_currentTimeSeconds(ic->eventBase);
if (!ep->authenticated) {
if (CryptoAuth_getState(cryptoAuthIf) == CryptoAuth_ESTABLISHED) {
- ep = moveEndpointIfNeeded(ep, ic);
+ moveEndpointIfNeeded(ep, ic);
ep->authenticated = true;
}
}
@@ -327,6 +401,8 @@ static struct Endpoint* insertEndpoint(uint8_t key[InterfaceController_KEY_SIZE]
return NULL;
}
+ ep->switchLabel = addr.path;
+
authedIf->receiveMessage = receivedAfterCryptoAuth;
authedIf->receiverContext = ep;
View
2  interface/InterfaceController.h
@@ -20,6 +20,7 @@
#include "interface/Interface.h"
#include "memory/Allocator.h"
#include "switch/SwitchCore.h"
+#include "net/SwitchPinger.h"
#include "util/Log.h"
#include <stdint.h>
@@ -35,6 +36,7 @@ struct InterfaceController* InterfaceController_new(struct CryptoAuth* ca,
struct RouterModule* routerModule,
struct Log* logger,
struct event_base* eventBase,
+ struct SwitchPinger* switchPinger,
struct Allocator* allocator);
/**
View
2  interface/test/InterfaceController_test.c
@@ -166,7 +166,7 @@ int main()
RouterModule_register(registry, alloc, publicKey, eventBase, logger, NULL);
struct InterfaceController* ifController =
- InterfaceController_new(ca, switchCore, rm, logger, eventBase, alloc);
+ InterfaceController_new(ca, switchCore, rm, logger, eventBase, NULL, alloc);
////////////////////////
View
59 net/Ducttape.c
@@ -29,8 +29,6 @@
#include "memory/BufferAllocator.h"
#include "net/Ducttape.h"
#include "net/Ducttape_struct.h"
-#include "net/SwitchPinger.h"
-#include "net/SwitchPinger_admin.h"
#include "switch/SwitchCore.h"
#include "util/Bits.h"
#include "util/checksum/Checksum.h"
@@ -68,7 +66,7 @@ static void outOfMemory(void* towel)
static inline uint8_t incomingDHT(struct Message* message,
struct Address* addr,
- struct Ducttape* context)
+ struct Ducttape_Private* context)
{
struct DHTMessage dht;
memset(&dht, 0, sizeof(struct DHTMessage));
@@ -97,7 +95,7 @@ static inline uint8_t incomingDHT(struct Message* message,
/** Header must not be encrypted and must be aligned on the beginning of the ipv6 header. */
static inline uint8_t sendToRouter(struct Address* sendTo,
struct Message* message,
- struct Ducttape* context)
+ struct Ducttape_Private* context)
{
// We have to copy out the switch header because it
// will probably be clobbered by the crypto headers.
@@ -119,7 +117,7 @@ static inline uint8_t sendToRouter(struct Address* sendTo,
static int handleOutgoing(struct DHTMessage* dmessage,
void* vcontext)
{
- struct Ducttape* context = (struct Ducttape*) vcontext;
+ struct Ducttape_Private* context = (struct Ducttape_Private*) vcontext;
struct Message message =
{ .length = dmessage->length, .bytes = (uint8_t*) dmessage->bytes, .padding = 512 };
@@ -182,7 +180,7 @@ static inline bool isRouterTraffic(struct Message* message, struct Headers_IP6He
* this is called from core() which calls through an interfaceMap.
*/
static inline uint8_t incomingForMe(struct Message* message,
- struct Ducttape* context,
+ struct Ducttape_Private* context,
uint8_t herPubKey[32])
{
struct Address addr;
@@ -267,9 +265,10 @@ static inline uint8_t incomingForMe(struct Message* message,
}
uint8_t Ducttape_injectIncomingForMe(struct Message* message,
- struct Ducttape* context,
+ struct Ducttape* dt,
uint8_t herPublicKey[32])
{
+ struct Ducttape_Private* context = (struct Ducttape_Private*) dt;
struct Headers_SwitchHeader sh;
Bits_memcpyConst(&sh, message->bytes, Headers_SwitchHeader_SIZE);
context->switchHeader = &sh;
@@ -289,7 +288,7 @@ uint8_t Ducttape_injectIncomingForMe(struct Message* message,
*/
static inline uint8_t sendToSwitch(struct Message* message,
struct Headers_SwitchHeader* destinationSwitchHeader,
- struct Ducttape* context)
+ struct Ducttape_Private* context)
{
Message_shift(message, Headers_SwitchHeader_SIZE);
struct Headers_SwitchHeader* switchHeaderLocation =
@@ -311,7 +310,7 @@ static inline bool validEncryptedIP6(struct Message* message)
&& header->destinationAddr[0] == 0xFC;
}
-static inline bool isForMe(struct Message* message, struct Ducttape* context)
+static inline bool isForMe(struct Message* message, struct Ducttape_Private* context)
{
struct Headers_IP6Header* header = (struct Headers_IP6Header*) message->bytes;
return (memcmp(header->destinationAddr, context->myAddr.ip6.bytes, 16) == 0);
@@ -321,7 +320,7 @@ static inline bool isForMe(struct Message* message, struct Ducttape* context)
static inline uint8_t ip6FromTun(struct Message* message,
struct Interface* interface)
{
- struct Ducttape* context = (struct Ducttape*) interface->receiverContext;
+ struct Ducttape_Private* context = (struct Ducttape_Private*) interface->receiverContext;
struct Headers_IP6Header* header = (struct Headers_IP6Header*) message->bytes;
@@ -373,7 +372,7 @@ static inline uint8_t ip6FromTun(struct Message* message,
* they may come from us, or from another node and may be to us or to any other node.
* Message is aligned on the beginning of the ipv6 header.
*/
-static inline int core(struct Message* message, struct Ducttape* context)
+static inline int core(struct Message* message, struct Ducttape_Private* context)
{
context->ip6Header = (struct Headers_IP6Header*) message->bytes;
@@ -445,7 +444,7 @@ static inline int core(struct Message* message, struct Ducttape* context)
* for the content level crypto then it goes to outgoingFromCryptoAuth then comes here.
* Message is aligned on the beginning of the crypto header, ip6 header must be reapplied.
*/
-static inline uint8_t outgoingFromMe(struct Message* message, struct Ducttape* context)
+static inline uint8_t outgoingFromMe(struct Message* message, struct Ducttape_Private* context)
{
// Need to set the length field to take into account
// the crypto headers which are hidden under the ipv6 packet.
@@ -472,7 +471,7 @@ static inline uint8_t outgoingFromMe(struct Message* message, struct Ducttape* c
return core(message, context);
}
-static inline int incomingFromRouter(struct Message* message, struct Ducttape* context)
+static inline int incomingFromRouter(struct Message* message, struct Ducttape_Private* context)
{
if (!validEncryptedIP6(message)) {
Log_debug(context->logger, "Dropping message because of invalid ipv6 header.\n");
@@ -485,7 +484,7 @@ static inline int incomingFromRouter(struct Message* message, struct Ducttape* c
static uint8_t incomingFromCryptoAuth(struct Message* message, struct Interface* iface)
{
- struct Ducttape* context = iface->receiverContext;
+ struct Ducttape_Private* context = iface->receiverContext;
int layer = context->layer;
context->layer = INVALID_LAYER;
if (layer == INNER_LAYER) {
@@ -499,7 +498,7 @@ static uint8_t incomingFromCryptoAuth(struct Message* message, struct Interface*
static uint8_t outgoingFromCryptoAuth(struct Message* message, struct Interface* iface)
{
- struct Ducttape* context = (struct Ducttape*) iface->senderContext;
+ struct Ducttape_Private* context = (struct Ducttape_Private*) iface->senderContext;
int layer = context->layer;
context->layer = INVALID_LAYER;
if (layer == INNER_LAYER) {
@@ -537,7 +536,7 @@ static inline uint8_t* extractPublicKey(struct Message* message,
*/
static uint8_t incomingFromSwitch(struct Message* message, struct Interface* switchIf)
{
- struct Ducttape* context = switchIf->senderContext;
+ struct Ducttape_Private* context = switchIf->senderContext;
struct Headers_SwitchHeader* switchHeader = (struct Headers_SwitchHeader*) message->bytes;
Message_shift(message, -Headers_SwitchHeader_SIZE);
@@ -620,10 +619,11 @@ static uint8_t incomingFromSwitch(struct Message* message, struct Interface* swi
labelStr, Endian_bigEndianToHost16(ctrl->type_be));
}
- if (pong) {
+ if (pong && context->public.switchPingerIf.receiveMessage) {
// Shift back over the header
Message_shift(message, Headers_SwitchHeader_SIZE);
- context->switchPingerIf->receiveMessage(message, context->switchPingerIf);
+ context->public.switchPingerIf.receiveMessage(
+ message, &context->public.switchPingerIf);
}
return Error_NONE;
}
@@ -681,7 +681,7 @@ static uint8_t incomingFromSwitch(struct Message* message, struct Interface* swi
static uint8_t incomingFromPinger(struct Message* message, struct Interface* iface)
{
- struct Ducttape* context = iface->senderContext;
+ struct Ducttape_Private* context = iface->senderContext;
return context->switchInterface.receiveMessage(message, &context->switchInterface);
}
@@ -696,7 +696,8 @@ struct Ducttape* Ducttape_register(Dict* config,
struct Log* logger,
struct Admin* admin)
{
- struct Ducttape* context = allocator->calloc(sizeof(struct Ducttape), 1, allocator);
+ struct Ducttape_Private* context =
+ allocator->calloc(sizeof(struct Ducttape_Private), 1, allocator);
context->registry = registry;
context->routerModule = routerModule;
context->logger = logger;
@@ -740,15 +741,11 @@ struct Ducttape* Ducttape_register(Dict* config,
return NULL;
}
- // setup the switch pinger.
- context->switchPingerIf =
- allocator->clone(sizeof(struct Interface), allocator, &(struct Interface) {
- .sendMessage = incomingFromPinger,
- .senderContext = context
- });
- struct SwitchPinger* sp =
- SwitchPinger_new(context->switchPingerIf, eventBase, logger, allocator);
- SwitchPinger_admin_register(sp, admin, allocator);
-
- return context;
+ // setup the switch pinger interface.
+ Bits_memcpyConst(&context->public.switchPingerIf, (&(struct Interface) {
+ .sendMessage = incomingFromPinger,
+ .senderContext = context
+ }), sizeof(struct Interface));
+
+ return &context->public;
}
View
5 net/Ducttape.h
@@ -23,7 +23,10 @@
#include <event2/event.h>
-struct Ducttape;
+struct Ducttape
+{
+ struct Interface switchPingerIf;
+};
struct Ducttape* Ducttape_register(Dict* config,
uint8_t privateKey[32],
View
5 net/Ducttape_struct.h
@@ -35,8 +35,11 @@
* and send the message toward the DHT core.
*/
-struct Ducttape
+struct Ducttape_Private
{
+ /** the public fields. */
+ struct Ducttape public;
+
/** The network module for the DHT. */
struct DHTModule module;
View
8 test/cjdroute_routerPing_test.c
@@ -33,7 +33,7 @@ int main()
{
char* pingBenc = "d1:q4:ping4:txid4:abcde";
- struct Ducttape* dt = TestFramework_setUp();
+ struct Ducttape_Private* dt = (struct Ducttape_Private*) TestFramework_setUp();
struct Allocator* allocator = MallocAllocator_new(85000);
uint16_t buffLen = sizeof(struct Ducttape_IncomingForMe) + 8 + strlen(pingBenc);
@@ -61,19 +61,19 @@ int main()
// bad checksum
udp->checksum_be = 1;
struct Message m = { .bytes = buff, .length = buffLen, .padding = 0 };
- Ducttape_injectIncomingForMe(&m, dt, herPublicKey);
+ Ducttape_injectIncomingForMe(&m, &dt->public, herPublicKey);
Assert_always(!dt->switchInterface.receiverContext);
// zero checksum
udp->checksum_be = 0;
struct Message m2 = { .bytes = buff, .length = buffLen, .padding = 0 };
- Ducttape_injectIncomingForMe(&m2, dt, herPublicKey);
+ Ducttape_injectIncomingForMe(&m2, &dt->public, herPublicKey);
Assert_always(dt->switchInterface.receiverContext);
// good checksum
udp->checksum_be =
Checksum_udpIp6(ip6->sourceAddr, (uint8_t*) udp, strlen(pingBenc) + Headers_UDPHeader_SIZE);
struct Message m3 = { .bytes = buff, .length = buffLen, .padding = 0 };
- Ducttape_injectIncomingForMe(&m3, dt, herPublicKey);
+ Ducttape_injectIncomingForMe(&m3, &dt->public, herPublicKey);
Assert_always(dt->switchInterface.receiverContext);
}
Please sign in to comment.
Something went wrong with that request. Please try again.