Skip to content

Commit

Permalink
Initial handshake implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
simon3z committed Sep 30, 2010
1 parent 96ee26e commit bf44f50
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 14 deletions.
58 changes: 44 additions & 14 deletions main.c
Expand Up @@ -28,6 +28,8 @@

#define DEFAULT_NET_NAME "kronosnet%d"

#define TX_KNET_DATASIZE 131072

int daemonize = 1;
int debug = 0;
int daemon_quit = 0;
Expand All @@ -41,6 +43,7 @@ static pthread_t eth_thread;
static pthread_t hb_thread;
struct node *mainconf;
uint32_t our_nodeid;
struct knet_header *tx_knet_h;

static void print_usage(void)
{
Expand Down Expand Up @@ -242,11 +245,13 @@ static void sigpipe_handler(int sig)
return;
}

static void dispatch_buffer(struct node *next, uint32_t nodeid, char *read_buf, ssize_t read_len)
static void dispatch_buffer(struct node *next, uint32_t nodeid, struct knet_header *read_buf, ssize_t read_len)
{
while (next) {
struct conn *conn;

if ((read_buf->pckt_type == KNETD_PKCT_TYPE_DATA) && (next->status == NODE_STATUS_OFFLINE)) goto next;

if ((nodeid) && (next->nodeid != nodeid)) {
log_printf(LOGSYS_LEVEL_INFO, "Requested nodeid: %u current: %u\n", nodeid, next->nodeid);
goto next;
Expand Down Expand Up @@ -290,19 +295,17 @@ static void *eth_to_knet_thread(void *arg)
{
fd_set rfds;
int se_result;
char read_buf[131072+sizeof(struct knet_header)];
ssize_t read_len = 0;
struct timeval tv;
struct knet_header *knet_h = (struct knet_header *)read_buf;

/* we need to prepare the header only once for now */
memset(knet_h, 0, sizeof(struct knet_header));
knet_h->magic = KNETD_MAGIC;
knet_h->src_nodeid = our_nodeid;
knet_h->seq_num = 0;
knet_h->pckt_type = KNETD_PKCT_TYPE_DATA;
knet_h->compress = KNETD_COMPRESS_OFF;
knet_h->encryption = KNETD_ENCRYPTION_OFF;
memset(tx_knet_h, 0, sizeof(struct knet_header));
tx_knet_h->magic = KNETD_MAGIC;
tx_knet_h->src_nodeid = our_nodeid;
tx_knet_h->seq_num = 0;
tx_knet_h->pckt_type = KNETD_PKCT_TYPE_DATA;
tx_knet_h->compress = KNETD_COMPRESS_OFF;
tx_knet_h->encryption = KNETD_ENCRYPTION_OFF;

do {
FD_ZERO (&rfds);
Expand All @@ -321,10 +324,10 @@ static void *eth_to_knet_thread(void *arg)
continue;

if (FD_ISSET(eth_fd, &rfds)) {
read_len = read(eth_fd, read_buf + sizeof(struct knet_header), sizeof(read_buf) - sizeof(struct knet_header));
read_len = read(eth_fd, tx_knet_h + 1, TX_KNET_DATASIZE);
if (read_len > 0) {
knet_h->seq_num++;
dispatch_buffer(mainconf, 0, read_buf, read_len + sizeof(struct knet_header));
tx_knet_h->seq_num++;
dispatch_buffer(mainconf, 0, tx_knet_h, read_len + sizeof(struct knet_header));
} else if (read_len < 0) {
log_printf(LOGSYS_LEVEL_INFO, "Error reading from localnet error: %s\n", strerror(errno));
} else
Expand All @@ -335,6 +338,21 @@ static void *eth_to_knet_thread(void *arg)
return NULL;
}

static void knet_send_synack(struct node *next, uint32_t nodeid, uint32_t type)
{
struct knet_header knet_h;

memset(&knet_h, 0, sizeof(struct knet_header));
knet_h.magic = KNETD_MAGIC;
knet_h.src_nodeid = our_nodeid;
knet_h.seq_num = tx_knet_h->seq_num;
knet_h.pckt_type = (type == KNETD_PKCT_TYPE_SYN) ? KNETD_PKCT_TYPE_SYN : KNETD_PKCT_TYPE_ACK;
knet_h.compress = KNETD_COMPRESS_OFF;
knet_h.encryption = KNETD_ENCRYPTION_OFF;

dispatch_buffer(next, nodeid, &knet_h, sizeof(struct knet_header));
}

static void loop(void) {
int se_result;
fd_set rfds;
Expand Down Expand Up @@ -389,6 +407,14 @@ static void loop(void) {
peer = peer->next;
}
switch(knet_h->pckt_type) {
case KNETD_PKCT_TYPE_SYN:
knet_send_synack(mainconf, peer->nodeid, KNETD_PKCT_TYPE_ACK);
case KNETD_PKCT_TYPE_ACK:
log_printf(LOGSYS_LEVEL_DEBUG, "syn/ack request from: %u\n", knet_h->src_nodeid);
peer->seq_num = knet_h->seq_num;
peer->status = NODE_STATUS_ONLINE;
memset(peer->circular_buffer, 0, CBUFFER_SIZE);
break;
case KNETD_PKCT_TYPE_DATA:
if (should_deliver(peer, knet_h->seq_num) > 0) {
//log_printf(LOGSYS_LEVEL_DEBUG, "Act pkct from node %s[%u]: %u\n", peer->nodename, peer->nodeid, knet_h->seq_num);
Expand All @@ -407,7 +433,7 @@ static void loop(void) {
/* reply */
knet_h->pckt_type = KNETD_PKCT_TYPE_PONG;
knet_h->src_nodeid = our_nodeid;
dispatch_buffer(mainconf, peer_nodeid, read_buf, read_len);
dispatch_buffer(mainconf, peer_nodeid, knet_h, read_len);
break;
case KNETD_PKCT_TYPE_PONG:
log_printf(LOGSYS_LEVEL_DEBUG, "Got a PONG reply\n");
Expand Down Expand Up @@ -505,6 +531,7 @@ int main(int argc, char **argv)
}

log_printf(LOGSYS_LEVEL_DEBUG, "Initializing local ethernet delivery thread\n");
tx_knet_h = malloc(sizeof(struct knet_header) + TX_KNET_DATASIZE);

rv = pthread_create(&eth_thread, NULL, eth_to_knet_thread, NULL);
if (rv < 0) {
Expand Down Expand Up @@ -533,6 +560,9 @@ int main(int argc, char **argv)
goto out;
}

log_printf(LOGSYS_LEVEL_DEBUG, "Sending syn packet\n");
knet_send_synack(mainconf, 0, KNETD_PKCT_TYPE_SYN);

log_printf(LOGSYS_LEVEL_DEBUG, "Entering main loop\n");
loop();

Expand Down
2 changes: 2 additions & 0 deletions netsocket.h
Expand Up @@ -9,6 +9,8 @@
#define KNETD_PKCT_TYPE_DATA 0
#define KNETD_PKCT_TYPE_PING 1
#define KNETD_PKCT_TYPE_PONG 2
#define KNETD_PKCT_TYPE_SYN 3
#define KNETD_PKCT_TYPE_ACK 4

#define KNETD_COMPRESS_OFF 0
#define KNETD_COMPRESS_ON 1
Expand Down
4 changes: 4 additions & 0 deletions nodes.h
Expand Up @@ -9,6 +9,9 @@

#define CBUFFER_SIZE 4096

#define NODE_STATUS_OFFLINE 0
#define NODE_STATUS_ONLINE 1

struct conn {
struct conn *next;
struct conn *tail;
Expand Down Expand Up @@ -38,6 +41,7 @@ struct node {
char circular_buffer[CBUFFER_SIZE];
int start;
int end;
int status;
};

struct node *parse_nodes_config(confdb_handle_t handle);
Expand Down

0 comments on commit bf44f50

Please sign in to comment.