Skip to content

Commit

Permalink
Update HighSpeedNetIO to take in two ports instead of one (#119)
Browse files Browse the repository at this point in the history
* Update HighSpeedNetIO to take in two ports instead of one

* Remove unused variables

* revert some formatting change

Co-authored-by: Xiao Wang <wangxiao1254@gmail.com>
  • Loading branch information
ajaybhargavb and wangxiao1254 committed Apr 16, 2021
1 parent ef6b235 commit fa237b7
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 110 deletions.
167 changes: 80 additions & 87 deletions emp-tool/io/highspeed_net_io_channel.h
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
#ifndef EMP_HIGHSPEED_NETWORK_IO_CHANNEL_H__
#define EMP_HIGHSPEED_NETWORK_IO_CHANNEL_H__

#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <iostream>
#include <string>

#include "emp-tool/io/io_channel.h"
using std::string;

#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

namespace emp {

class SubChannel {
public:
class SubChannel { public:
int sock;
FILE * stream = nullptr;
char * buf = nullptr;
FILE *stream = nullptr;
char *buf = nullptr;
int ptr;
char * stream_buf = nullptr;
char *stream_buf = nullptr;
uint64_t counter = 0;
uint64_t flushes = 0;
SubChannel (int sock): sock(sock) {
SubChannel(int sock) : sock(sock) {
stream_buf = new char[NETWORK_BUFFER_SIZE];
buf = new char[NETWORK_BUFFER_SIZE2];
stream = fdopen(sock, "wb+");
Expand All @@ -41,65 +42,61 @@ class SubChannel {
}
};

class SenderSubChannel: public SubChannel {public:
SenderSubChannel(int sock): SubChannel(sock) {
ptr = 0;
}

class SenderSubChannel : public SubChannel { public:
SenderSubChannel(int sock) : SubChannel(sock) { ptr = 0; }

void flush() {
flushes++;
send_data_raw(buf, ptr);
if (counter % NETWORK_BUFFER_SIZE2 != 0)
send_data_raw(buf+ptr, NETWORK_BUFFER_SIZE2 - counter % NETWORK_BUFFER_SIZE2);
send_data_raw(buf + ptr, NETWORK_BUFFER_SIZE2 - counter % NETWORK_BUFFER_SIZE2);
fflush(stream);
ptr = 0;
}

void send_data(const void * data, int len) {
void send_data(const void *data, int len) {
if (len <= NETWORK_BUFFER_SIZE2 - ptr) {
memcpy(buf + ptr, data, len);
ptr +=len;
ptr += len;
} else {
send_data_raw(buf, ptr);
send_data_raw(data, len);
ptr = 0;
}
}

void send_data_raw(const void * data, int len) {
void send_data_raw(const void *data, int len) {
counter += len;
int sent = 0;
while(sent < len) {
int res = fwrite(sent + (char*)data, 1, len - sent, stream);
while (sent < len) {
int res = fwrite(sent + (char *)data, 1, len - sent, stream);
if (res >= 0)
sent+=res;
sent += res;
else
fprintf(stderr,"error: net_send_data %d\n", res);
fprintf(stderr, "error: net_send_data %d\n", res);
}
}
};

class RecverSubChannel: public SubChannel {public:
RecverSubChannel(int sock): SubChannel(sock) {
ptr = NETWORK_BUFFER_SIZE2;
}
class RecverSubChannel : public SubChannel { public:
RecverSubChannel(int sock) : SubChannel(sock) { ptr = NETWORK_BUFFER_SIZE2; }
void flush() {
flushes++;
ptr = NETWORK_BUFFER_SIZE2;
}

void recv_data(void * data, int len) {
if(len <= NETWORK_BUFFER_SIZE2 - ptr) {
void recv_data(void *data, int len) {
if (len <= NETWORK_BUFFER_SIZE2 - ptr) {
memcpy(data, buf + ptr, len);
ptr += len;
} else {
int remain = len;
memcpy(data, buf + ptr, NETWORK_BUFFER_SIZE2 - ptr);
remain -=NETWORK_BUFFER_SIZE2 - ptr;
remain -= NETWORK_BUFFER_SIZE2 - ptr;

while(true) {
while (true) {
recv_data_raw(buf, NETWORK_BUFFER_SIZE2);
if(remain <= NETWORK_BUFFER_SIZE2) {
if (remain <= NETWORK_BUFFER_SIZE2) {
memcpy(len - remain + (char *)data, buf, remain);
ptr = remain;
break;
Expand All @@ -111,27 +108,45 @@ class RecverSubChannel: public SubChannel {public:
}
}

void recv_data_raw(void * data, int len) {
counter+=len;
void recv_data_raw(void *data, int len) {
counter += len;
int sent = 0;
while(sent < len) {
int res = fread(sent + (char*)data, 1, len - sent, stream);
while (sent < len) {
int res = fread(sent + (char *)data, 1, len - sent, stream);
if (res >= 0)
sent += res;
else
fprintf(stderr,"error: net_send_data %d\n", res);
else
fprintf(stderr, "error: net_send_data %d\n", res);
}
}
};
class HighSpeedNetIO: public IOChannel<HighSpeedNetIO> { public:

class HighSpeedNetIO : public IOChannel<HighSpeedNetIO> { public:
bool is_server, quiet;
int send_sock = 0;
int recv_sock = 0;
int FSM = 0;
SenderSubChannel * schannel;
RecverSubChannel * rchannel;
string addr;
int port;
SenderSubChannel *schannel;
RecverSubChannel *rchannel;

HighSpeedNetIO(const char *address, int send_port, int recv_port, bool quiet = true) : quiet(quiet) {
is_server = (address == nullptr);
if (is_server) {
recv_sock = server_listen(send_port);
usleep(2000);
send_sock = server_listen(recv_port & 0xFFFF);
} else {
send_sock = client_connect(address, send_port);
recv_sock = client_connect(address, recv_port & 0xFFFF);
}
FSM = 0;
set_delay_opt(send_sock, true);
set_delay_opt(recv_sock, true);
schannel = new SenderSubChannel(send_sock);
rchannel = new RecverSubChannel(recv_sock);
if (not quiet) std::cout << "connected\n";
}

int server_listen(int port) {
int mysocket;
struct sockaddr_in dest;
Expand All @@ -140,81 +155,58 @@ class HighSpeedNetIO: public IOChannel<HighSpeedNetIO> { public:
memset(&serv, 0, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_addr.s_addr = htonl(INADDR_ANY); /* set our address to any interface */
serv.sin_port = htons(port); /* set the server port number */
serv.sin_port = htons(port); /* set the server port number */
mysocket = socket(AF_INET, SOCK_STREAM, 0);
int reuse = 1;
setsockopt(mysocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse));
if(bind(mysocket, (struct sockaddr *)&serv, sizeof(struct sockaddr)) < 0) {
setsockopt(mysocket, SOL_SOCKET, SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
if (bind(mysocket, (struct sockaddr *)&serv, sizeof(struct sockaddr)) < 0) {
perror("error: bind");
exit(1);
}
if(listen(mysocket, 1) < 0) {
if (listen(mysocket, 1) < 0) {
perror("error: listen");
exit(1);
}
int sock = accept(mysocket, (struct sockaddr *)&dest, &socksize);
close(mysocket);
return sock;
}
int client_connect(const char * address, int port) {
int client_connect(const char *address, int port) {
int sock;
struct sockaddr_in dest;
memset(&dest, 0, sizeof(dest));
dest.sin_family = AF_INET;
dest.sin_addr.s_addr = inet_addr(address);
dest.sin_port = htons(port);

while(1) {
while (1) {
sock = socket(AF_INET, SOCK_STREAM, 0);
if (connect(sock, (struct sockaddr *)&dest, sizeof(struct sockaddr)) == 0)
break;

if (connect(sock, (struct sockaddr *)&dest, sizeof(struct sockaddr)) == 0) break;

close(sock);
usleep(1000);
}
return sock;

}
HighSpeedNetIO(const char * address, int port, bool quiet = true): quiet(quiet), port(port & 0xFFFF) {
is_server = (address == nullptr);
if (is_server) {
recv_sock = server_listen(port);
usleep(2000);
send_sock = server_listen((port+1)&0xFFFF);
}
else {
addr = string(address);
send_sock = client_connect(address, port);
recv_sock = client_connect(address, (port+1)&0xFFFF);
}
FSM = 0;
set_delay_opt(send_sock, true);
set_delay_opt(recv_sock, true);
schannel = new SenderSubChannel(send_sock);
rchannel = new RecverSubChannel(recv_sock);
if(not quiet)
std::cout << "connected\n";
}

~HighSpeedNetIO() {
flush();
if(not quiet) {
std::cout <<"Data Sent: \t"<<schannel->counter<<"\n";
std::cout <<"Data Received: \t"<<rchannel->counter<<"\n";
std::cout <<"Flushes:\t"<<schannel->flushes<<"\t"<<rchannel->flushes<<"\n";
if (not quiet) {
std::cout << "Data Sent: \t" << schannel->counter << "\n";
std::cout << "Data Received: \t" << rchannel->counter << "\n";
std::cout << "Flushes:\t" << schannel->flushes << "\t" << rchannel->flushes << "\n";
}
delete schannel;
delete rchannel;
close(send_sock);
close(recv_sock);
}

void sync() {
}
void sync() {}

void set_delay_opt(int sock, bool enable_nodelay) {
if (enable_nodelay) {
const int one=1;
const int one = 1;
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
} else {
const int zero = 0;
Expand All @@ -223,7 +215,7 @@ class HighSpeedNetIO: public IOChannel<HighSpeedNetIO> { public:
}

void flush() {
if(is_server) {
if (is_server) {
schannel->flush();
rchannel->flush();
} else {
Expand All @@ -232,22 +224,23 @@ class HighSpeedNetIO: public IOChannel<HighSpeedNetIO> { public:
}
FSM = 0;
}
void send_data_internal(const void * data, int len) {
if(FSM == 1) {

void send_data_internal(const void *data, int len) {
if (FSM == 1) {
rchannel->flush();
}
schannel->send_data(data, len);
FSM = 2;
}
void recv_data_internal(void * data, int len) {
if(FSM == 2) {

void recv_data_internal(void *data, int len) {
if (FSM == 2) {
schannel->flush();
}
rchannel->recv_data(data, len);
FSM = 1;
}
};

}
} // namespace emp
#endif

0 comments on commit fa237b7

Please sign in to comment.