From bf44f50faed6b458ad35d25030f060ef3560a42c Mon Sep 17 00:00:00 2001 From: Federico Simoncelli Date: Thu, 30 Sep 2010 14:10:19 +0200 Subject: [PATCH] Initial handshake implementation --- main.c | 58 ++++++++++++++++++++++++++++++++++++++++------------- netsocket.h | 2 ++ nodes.h | 4 ++++ 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/main.c b/main.c index 1c293316a..29c8852dd 100644 --- a/main.c +++ b/main.c @@ -28,6 +28,8 @@ #define DEFAULT_NET_NAME "kronosnet%d" +#define TX_KNET_DATASIZE 131072 + int daemonize = 1; int debug = 0; int daemon_quit = 0; @@ -41,6 +43,7 @@ static pthread_t eth_thread; static pthread_t hb_thread; struct node *mainconf; uint32_t our_nodeid; +struct knet_header *tx_knet_h; static void print_usage(void) { @@ -242,11 +245,13 @@ static void sigpipe_handler(int sig) return; } -static void dispatch_buffer(struct node *next, uint32_t nodeid, char *read_buf, ssize_t read_len) +static void dispatch_buffer(struct node *next, uint32_t nodeid, struct knet_header *read_buf, ssize_t read_len) { while (next) { struct conn *conn; + if ((read_buf->pckt_type == KNETD_PKCT_TYPE_DATA) && (next->status == NODE_STATUS_OFFLINE)) goto next; + if ((nodeid) && (next->nodeid != nodeid)) { log_printf(LOGSYS_LEVEL_INFO, "Requested nodeid: %u current: %u\n", nodeid, next->nodeid); goto next; @@ -290,19 +295,17 @@ static void *eth_to_knet_thread(void *arg) { fd_set rfds; int se_result; - char read_buf[131072+sizeof(struct knet_header)]; ssize_t read_len = 0; struct timeval tv; - struct knet_header *knet_h = (struct knet_header *)read_buf; /* we need to prepare the header only once for now */ - memset(knet_h, 0, sizeof(struct knet_header)); - knet_h->magic = KNETD_MAGIC; - knet_h->src_nodeid = our_nodeid; - knet_h->seq_num = 0; - knet_h->pckt_type = KNETD_PKCT_TYPE_DATA; - knet_h->compress = KNETD_COMPRESS_OFF; - knet_h->encryption = KNETD_ENCRYPTION_OFF; + memset(tx_knet_h, 0, sizeof(struct knet_header)); + tx_knet_h->magic = KNETD_MAGIC; + tx_knet_h->src_nodeid = our_nodeid; + tx_knet_h->seq_num = 0; + tx_knet_h->pckt_type = KNETD_PKCT_TYPE_DATA; + tx_knet_h->compress = KNETD_COMPRESS_OFF; + tx_knet_h->encryption = KNETD_ENCRYPTION_OFF; do { FD_ZERO (&rfds); @@ -321,10 +324,10 @@ static void *eth_to_knet_thread(void *arg) continue; if (FD_ISSET(eth_fd, &rfds)) { - read_len = read(eth_fd, read_buf + sizeof(struct knet_header), sizeof(read_buf) - sizeof(struct knet_header)); + read_len = read(eth_fd, tx_knet_h + 1, TX_KNET_DATASIZE); if (read_len > 0) { - knet_h->seq_num++; - dispatch_buffer(mainconf, 0, read_buf, read_len + sizeof(struct knet_header)); + tx_knet_h->seq_num++; + dispatch_buffer(mainconf, 0, tx_knet_h, read_len + sizeof(struct knet_header)); } else if (read_len < 0) { log_printf(LOGSYS_LEVEL_INFO, "Error reading from localnet error: %s\n", strerror(errno)); } else @@ -335,6 +338,21 @@ static void *eth_to_knet_thread(void *arg) return NULL; } +static void knet_send_synack(struct node *next, uint32_t nodeid, uint32_t type) +{ + struct knet_header knet_h; + + memset(&knet_h, 0, sizeof(struct knet_header)); + knet_h.magic = KNETD_MAGIC; + knet_h.src_nodeid = our_nodeid; + knet_h.seq_num = tx_knet_h->seq_num; + knet_h.pckt_type = (type == KNETD_PKCT_TYPE_SYN) ? KNETD_PKCT_TYPE_SYN : KNETD_PKCT_TYPE_ACK; + knet_h.compress = KNETD_COMPRESS_OFF; + knet_h.encryption = KNETD_ENCRYPTION_OFF; + + dispatch_buffer(next, nodeid, &knet_h, sizeof(struct knet_header)); +} + static void loop(void) { int se_result; fd_set rfds; @@ -389,6 +407,14 @@ static void loop(void) { peer = peer->next; } switch(knet_h->pckt_type) { + case KNETD_PKCT_TYPE_SYN: + knet_send_synack(mainconf, peer->nodeid, KNETD_PKCT_TYPE_ACK); + case KNETD_PKCT_TYPE_ACK: + log_printf(LOGSYS_LEVEL_DEBUG, "syn/ack request from: %u\n", knet_h->src_nodeid); + peer->seq_num = knet_h->seq_num; + peer->status = NODE_STATUS_ONLINE; + memset(peer->circular_buffer, 0, CBUFFER_SIZE); + break; case KNETD_PKCT_TYPE_DATA: if (should_deliver(peer, knet_h->seq_num) > 0) { //log_printf(LOGSYS_LEVEL_DEBUG, "Act pkct from node %s[%u]: %u\n", peer->nodename, peer->nodeid, knet_h->seq_num); @@ -407,7 +433,7 @@ static void loop(void) { /* reply */ knet_h->pckt_type = KNETD_PKCT_TYPE_PONG; knet_h->src_nodeid = our_nodeid; - dispatch_buffer(mainconf, peer_nodeid, read_buf, read_len); + dispatch_buffer(mainconf, peer_nodeid, knet_h, read_len); break; case KNETD_PKCT_TYPE_PONG: log_printf(LOGSYS_LEVEL_DEBUG, "Got a PONG reply\n"); @@ -505,6 +531,7 @@ int main(int argc, char **argv) } log_printf(LOGSYS_LEVEL_DEBUG, "Initializing local ethernet delivery thread\n"); + tx_knet_h = malloc(sizeof(struct knet_header) + TX_KNET_DATASIZE); rv = pthread_create(ð_thread, NULL, eth_to_knet_thread, NULL); if (rv < 0) { @@ -533,6 +560,9 @@ int main(int argc, char **argv) goto out; } + log_printf(LOGSYS_LEVEL_DEBUG, "Sending syn packet\n"); + knet_send_synack(mainconf, 0, KNETD_PKCT_TYPE_SYN); + log_printf(LOGSYS_LEVEL_DEBUG, "Entering main loop\n"); loop(); diff --git a/netsocket.h b/netsocket.h index 7adb20ee4..cc7b396fb 100644 --- a/netsocket.h +++ b/netsocket.h @@ -9,6 +9,8 @@ #define KNETD_PKCT_TYPE_DATA 0 #define KNETD_PKCT_TYPE_PING 1 #define KNETD_PKCT_TYPE_PONG 2 +#define KNETD_PKCT_TYPE_SYN 3 +#define KNETD_PKCT_TYPE_ACK 4 #define KNETD_COMPRESS_OFF 0 #define KNETD_COMPRESS_ON 1 diff --git a/nodes.h b/nodes.h index 959dc803f..31bbd4e32 100644 --- a/nodes.h +++ b/nodes.h @@ -9,6 +9,9 @@ #define CBUFFER_SIZE 4096 +#define NODE_STATUS_OFFLINE 0 +#define NODE_STATUS_ONLINE 1 + struct conn { struct conn *next; struct conn *tail; @@ -38,6 +41,7 @@ struct node { char circular_buffer[CBUFFER_SIZE]; int start; int end; + int status; }; struct node *parse_nodes_config(confdb_handle_t handle);