Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdelisle committed Feb 3, 2015
1 parent bce821d commit 2389090
Showing 1 changed file with 163 additions and 62 deletions.
225 changes: 163 additions & 62 deletions net/ControlHandler.c
Expand Up @@ -34,6 +34,156 @@ struct ControlHandler_pvt
Identity
};

/**
* Expects [ Ctrl ][ Error ][ cause SwitchHeader ][ cause handle ][ cause etc.... ]
*/
#define handleError_MIN_SIZE (Control_HEADER_SIZE + Control_Error_MIN_SIZE + SwitchHeader_SIZE + 4)
static int handleError(struct ControlHandler_pvt* ch,
struct Message* msg,
uint64_t label,
uint8_t* labelStr)
{
if (msg->length < handleError_MIN_SIZE) {
Log_info(ch->logger, "DROP runt error packet from [%s]", labelStr);
return 0;
}
struct Control* ctrl = (struct Control*) msg->bytes;
struct Control_Error* err = ctrl->content.error;
struct SwitchHeader* causeHeader = &err->cause;
uint64_t causeLabel = Endian_bigEndianToHost64(causeHeader->label_be);
uint32_t errorType = Endian_bigEndianToHost32(err->errorType_be);

if (!NumberCompress_isOneHop(label)) {
Router_brokenLink(ch->router, label, causeLabel);
} else (errorType == Error_UNDELIVERABLE) {
// this is our own InterfaceController complaining
// because the node isn't responding to pings.
return 0;
}

Log_debug(context->logger,
"error packet from [%s] [%s]%s",
labelStr,
Error_strerror(errorType),
((err->causeHandle == 0xffffffff) ? " caused by ctrl" : ""));

return 0;
}

/**
* Expects [ SwitchHeader ][ Ctrl ][ (key)Ping ][ data etc.... ]
*/
#define handlePing_MIN_SIZE (Control_HEADER_SIZE + Control_Ping_MIN_SIZE)
static int handlePing(struct ControlHandler_pvt* ch,
struct Message* msg,
uint64_t label,
uint8_t* labelStr,
uint16_t messageType_be)
{
if (msg->length < handlePing_MIN_SIZE) {
Log_info(ch->log, "DROP runt ping");
return 0;
}

struct Control* ctrl = (struct Control*) msg->bytes;
Message_shift(msg, -Control_HEADER_SIZE, NULL);

// Ping and keyPing share version location
struct Control_Ping* ping = (struct Control_Ping*) message->bytes;
uint32_t herVersion = Endian_bigEndianToHost32(ping->version_be);
if (!Version_isCompatible(Version_CURRENT_PROTOCOL, herVersion)) {
Log_debug(ch->log, "DROP ping from incompatible version [%s]", herVersion);
return 0;
}

if (messageType_be == Control_KEYPING_be) {
if (msg->length < Control_KeyPing_HEADER_SIZE) {
// min keyPing size is longer
Log_debug(ch->log, "DROP runt keyPing");
return 0;
}
if (msg->length > Control_KeyPing_MAX_SIZE) {
Log_debug(ch->log, "DROP long keyPing");
return 0;
}
if (ping->magic != Control_KeyPing_MAGIC) {
Log_debug(ch->log, "DROP keyPing (bad magic)");
return 0;
}

struct Control_KeyPing* keyPing = (struct Control_KeyPing*) msg->bytes;
keyPing->magic = Control_KeyPong_MAGIC;
ctrl->type_be = Control_KEYPONG_be;
Bits_memcpyConst(keyPing->key, ch->myAddr.key, 32);

} else if (messageType_be == Control_PING_be) {
if (ping->magic != Control_Ping_MAGIC) {
Log_debug(ch->log, "DROP ping (bad magic)");
return 0;
}
ping->magic = Control_Pong_MAGIC;
ctrl->type_be = Control_PONG_be;

} else {
Assert_failure("2+2=5");
}

ping->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);

Message_shift(message, Control_HEADER_SIZE, NULL);

ctrl->checksum_be = 0;
ctrl->checksum_be = Checksum_engine(message->bytes, message->length);

Message_shift(message, SwitchHeader_SIZE, NULL);

Log_debug(context->logger, "got switch ping from [%s]", labelStr);
SwitchHeader_setLabelShift(switchHeader, 0);
SwitchHeader_setCongestion(switchHeader, 0);
Interface_receiveMessage(switchIf, message);
//////

Message_shift(message, -Control_HEADER_SIZE, NULL);

if (message->length < Control_KeyPing_HEADER_SIZE
|| message->length > Control_KeyPing_MAX_SIZE)
{
Log_info(context->logger, "DROP incorrect size keyping");
return Error_INVALID;
}

struct Control_KeyPing* keyPing = (struct Control_KeyPing*) message->bytes;

#ifdef Log_DEBUG
struct Address herAddr = {
.protocolVersion = Endian_bigEndianToHost32(keyPing->version_be),
.path = label
};
Bits_memcpyConst(herAddr.key, keyPing->key, 32);
String* addrStr = Address_toString(&herAddr, message->alloc);
Log_debug(context->logger, "got switch keyPing from [%s]", addrStr->bytes);
#endif

keyPing->magic = Control_KeyPong_MAGIC;
uint32_t herVersion = Endian_bigEndianToHost32(keyPing->version_be);
keyPing->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
Bits_memcpyConst(keyPing->key, context->myAddr.key, 32);
Message_shift(message, Control_HEADER_SIZE, NULL);

ctrl->type_be = Control_KEYPONG_be;
ctrl->checksum_be = 0;
ctrl->checksum_be = Checksum_engine(message->bytes, message->length);

Message_shift(message, 4, NULL);
Assert_true(((uint32_t*)message->bytes)[0] == 0xffffffff);

Message_shift(message, SwitchHeader_SIZE, NULL);
SwitchHeader_setLabelShift(switchHeader, 0);
SwitchHeader_setCongestion(switchHeader, 0);
Interface_receiveMessage(switchIf, message);

}

/**
* Handle an incoming control message from a switch.
*
Expand All @@ -48,10 +198,10 @@ static int incomingFromCore(struct Interface_Two* coreIf, struct Message* msg)
{
struct ControlHandler_pvt* ch = Identity_check((struct ControlHandler_pvt*) coreIf);

struct SwitchHeader* switchHeader = (struct SwitchHeader*) msg->bytes;
Assert_true(msg->length >= SwitchHeader_SIZE);
struct SwitchHeader switchHeader;
Message_pop(msg, &switchHeader, SwitchHeader_SIZE, NULL);
uint8_t labelStr[20];
uint64_t label = Endian_bigEndianToHost64(switchHeader->label_be);
uint64_t label = Endian_bigEndianToHost64(switchHeader.label_be);
AddrTools_printPath(labelStr, label);
Log_debug(ch->logger, "ctrl packet from [%s]", labelStr);

Expand All @@ -60,74 +210,25 @@ static int incomingFromCore(struct Interface_Two* coreIf, struct Message* msg)
return Error_NONE;
}

Message_shift(msg, -SwitchHeader_SIZE, NULL);
Assert_true(Message_pop32(msg, NULL) == 0xffffffff);
struct Control* ctrl = (struct Control*) msg->bytes;

if (Checksum_engine(msg->bytes, msg->length)) {
Log_info(ch->logger, "DROP ctrl packet from [%s] with invalid checksum", labelStr);
return Error_NONE;
}

bool pong = false;
if (ctrl->type_be == Control_ERROR_be) {
if (msg->length < Control_Error_MIN_SIZE) {
Log_info(ch->logger, "DROP runt error packet from [%s]", labelStr);
return Error_NONE;
}
struct Control* ctrl = (struct Control*) msg->bytes;

uint64_t path = Endian_bigEndianToHost64(switchHeader->label_be);
if (!NumberCompress_isOneHop(path)) {
uint64_t labelAtStop = Endian_bigEndianToHost64(ctrl->content.error.cause.label_be);
Router_brokenLink(ch->router, path, labelAtStop);
}
switch (ctrl->type_be) {
case Control_ERROR_be: return handleError(ch, msg, label, labelStr);

case Control_KEYPING_be:
case Control_PING_be: return handlePing(ch, msg, label, labelStr, ctrl->type_be);

case Control_KEYPONG_be:
case Control_PONG_be: return handlePong(ch, msg, label, labelStr, ctrl->type_be);
}

if (ctrl->content.error.causeHandle == 0xffffffff) {
// Determine whether the "cause" packet is a control message.
if (message->length < Control_Error_MIN_SIZE + Control_HEADER_SIZE) {
Log_info(context->logger,
"error packet from [%s] containing runt cause packet",
labelStr);
return Error_NONE;
}
struct Control* causeCtrl = (struct Control*) &(&ctrl->content.error.cause)[1];
if (causeCtrl->type_be != Control_PING_be && causeCtrl->type_be != Control_KEYPING_be) {
#ifdef Log_INFO
uint32_t errorType =
Endian_bigEndianToHost32(ctrl->content.error.errorType_be);
Log_info(context->logger,
"error packet from [%s] caused by [%s] packet ([%s])",
labelStr,
Control_typeString(causeCtrl->type_be),
Error_strerror(errorType));
#endif
} else {
if (LabelSplicer_isOneHop(label)
&& ctrl->content.error.errorType_be
== Endian_hostToBigEndian32(Error_UNDELIVERABLE))
{
// this is our own InterfaceController complaining
// because the node isn't responding to pings.
return Error_NONE;
}
Log_debug(context->logger,
"error packet from [%s] in response to ping, err [%u], length: [%u].",
labelStr,
Endian_bigEndianToHost32(ctrl->content.error.errorType_be),
message->length);
// errors resulting from pings are forwarded back to the pinger.
pong = true;
}
} else {
uint32_t errorType = Endian_bigEndianToHost32(ctrl->content.error.errorType_be);
if (errorType != Error_RETURN_PATH_INVALID) {
// Error_RETURN_PATH_INVALID is impossible to prevent so will appear all the time.
Log_info(context->logger,
"error packet from [%s] [%s]",
labelStr,
Error_strerror(errorType));
}
}
} else if (ctrl->type_be == Control_PONG_be) {
pong = true;
} else if (ctrl->type_be == Control_PING_be) {
Expand Down

0 comments on commit 2389090

Please sign in to comment.