Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update HighSpeedNetIO to take in two ports instead of one #119

Merged
merged 3 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
167 changes: 80 additions & 87 deletions emp-tool/io/highspeed_net_io_channel.h
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
#ifndef EMP_HIGHSPEED_NETWORK_IO_CHANNEL_H__
#define EMP_HIGHSPEED_NETWORK_IO_CHANNEL_H__

#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <iostream>
#include <string>

#include "emp-tool/io/io_channel.h"
using std::string;

#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

namespace emp {

class SubChannel {
public:
class SubChannel { public:
int sock;
FILE * stream = nullptr;
char * buf = nullptr;
FILE *stream = nullptr;
char *buf = nullptr;
int ptr;
char * stream_buf = nullptr;
char *stream_buf = nullptr;
uint64_t counter = 0;
uint64_t flushes = 0;
SubChannel (int sock): sock(sock) {
SubChannel(int sock) : sock(sock) {
stream_buf = new char[NETWORK_BUFFER_SIZE];
buf = new char[NETWORK_BUFFER_SIZE2];
stream = fdopen(sock, "wb+");
Expand All @@ -41,65 +42,61 @@ class SubChannel {
}
};

class SenderSubChannel: public SubChannel {public:
SenderSubChannel(int sock): SubChannel(sock) {
ptr = 0;
}

class SenderSubChannel : public SubChannel { public:
SenderSubChannel(int sock) : SubChannel(sock) { ptr = 0; }

void flush() {
flushes++;
send_data_raw(buf, ptr);
if (counter % NETWORK_BUFFER_SIZE2 != 0)
send_data_raw(buf+ptr, NETWORK_BUFFER_SIZE2 - counter % NETWORK_BUFFER_SIZE2);
send_data_raw(buf + ptr, NETWORK_BUFFER_SIZE2 - counter % NETWORK_BUFFER_SIZE2);
fflush(stream);
ptr = 0;
}

void send_data(const void * data, int len) {
void send_data(const void *data, int len) {
if (len <= NETWORK_BUFFER_SIZE2 - ptr) {
memcpy(buf + ptr, data, len);
ptr +=len;
ptr += len;
} else {
send_data_raw(buf, ptr);
send_data_raw(data, len);
ptr = 0;
}
}

void send_data_raw(const void * data, int len) {
void send_data_raw(const void *data, int len) {
counter += len;
int sent = 0;
while(sent < len) {
int res = fwrite(sent + (char*)data, 1, len - sent, stream);
while (sent < len) {
int res = fwrite(sent + (char *)data, 1, len - sent, stream);
if (res >= 0)
sent+=res;
sent += res;
else
fprintf(stderr,"error: net_send_data %d\n", res);
fprintf(stderr, "error: net_send_data %d\n", res);
}
}
};

class RecverSubChannel: public SubChannel {public:
RecverSubChannel(int sock): SubChannel(sock) {
ptr = NETWORK_BUFFER_SIZE2;
}
class RecverSubChannel : public SubChannel { public:
RecverSubChannel(int sock) : SubChannel(sock) { ptr = NETWORK_BUFFER_SIZE2; }
void flush() {
flushes++;
ptr = NETWORK_BUFFER_SIZE2;
}

void recv_data(void * data, int len) {
if(len <= NETWORK_BUFFER_SIZE2 - ptr) {
void recv_data(void *data, int len) {
if (len <= NETWORK_BUFFER_SIZE2 - ptr) {
memcpy(data, buf + ptr, len);
ptr += len;
} else {
int remain = len;
memcpy(data, buf + ptr, NETWORK_BUFFER_SIZE2 - ptr);
remain -=NETWORK_BUFFER_SIZE2 - ptr;
remain -= NETWORK_BUFFER_SIZE2 - ptr;

while(true) {
while (true) {
recv_data_raw(buf, NETWORK_BUFFER_SIZE2);
if(remain <= NETWORK_BUFFER_SIZE2) {
if (remain <= NETWORK_BUFFER_SIZE2) {
memcpy(len - remain + (char *)data, buf, remain);
ptr = remain;
break;
Expand All @@ -111,27 +108,45 @@ class RecverSubChannel: public SubChannel {public:
}
}

void recv_data_raw(void * data, int len) {
counter+=len;
void recv_data_raw(void *data, int len) {
counter += len;
int sent = 0;
while(sent < len) {
int res = fread(sent + (char*)data, 1, len - sent, stream);
while (sent < len) {
int res = fread(sent + (char *)data, 1, len - sent, stream);
if (res >= 0)
sent += res;
else
fprintf(stderr,"error: net_send_data %d\n", res);
else
fprintf(stderr, "error: net_send_data %d\n", res);
}
}
};
class HighSpeedNetIO: public IOChannel<HighSpeedNetIO> { public:

class HighSpeedNetIO : public IOChannel<HighSpeedNetIO> { public:
bool is_server, quiet;
int send_sock = 0;
int recv_sock = 0;
int FSM = 0;
SenderSubChannel * schannel;
RecverSubChannel * rchannel;
string addr;
int port;
SenderSubChannel *schannel;
RecverSubChannel *rchannel;

HighSpeedNetIO(const char *address, int send_port, int recv_port, bool quiet = true) : quiet(quiet) {
is_server = (address == nullptr);
if (is_server) {
recv_sock = server_listen(send_port);
usleep(2000);
send_sock = server_listen(recv_port & 0xFFFF);
} else {
send_sock = client_connect(address, send_port);
recv_sock = client_connect(address, recv_port & 0xFFFF);
}
FSM = 0;
set_delay_opt(send_sock, true);
set_delay_opt(recv_sock, true);
schannel = new SenderSubChannel(send_sock);
rchannel = new RecverSubChannel(recv_sock);
if (not quiet) std::cout << "connected\n";
}

int server_listen(int port) {
int mysocket;
struct sockaddr_in dest;
Expand All @@ -140,81 +155,58 @@ class HighSpeedNetIO: public IOChannel<HighSpeedNetIO> { public:
memset(&serv, 0, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_addr.s_addr = htonl(INADDR_ANY); /* set our address to any interface */
serv.sin_port = htons(port); /* set the server port number */
serv.sin_port = htons(port); /* set the server port number */
mysocket = socket(AF_INET, SOCK_STREAM, 0);
int reuse = 1;
setsockopt(mysocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse));
if(bind(mysocket, (struct sockaddr *)&serv, sizeof(struct sockaddr)) < 0) {
setsockopt(mysocket, SOL_SOCKET, SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
if (bind(mysocket, (struct sockaddr *)&serv, sizeof(struct sockaddr)) < 0) {
perror("error: bind");
exit(1);
}
if(listen(mysocket, 1) < 0) {
if (listen(mysocket, 1) < 0) {
perror("error: listen");
exit(1);
}
int sock = accept(mysocket, (struct sockaddr *)&dest, &socksize);
close(mysocket);
return sock;
}
int client_connect(const char * address, int port) {
int client_connect(const char *address, int port) {
int sock;
struct sockaddr_in dest;
memset(&dest, 0, sizeof(dest));
dest.sin_family = AF_INET;
dest.sin_addr.s_addr = inet_addr(address);
dest.sin_port = htons(port);

while(1) {
while (1) {
sock = socket(AF_INET, SOCK_STREAM, 0);
if (connect(sock, (struct sockaddr *)&dest, sizeof(struct sockaddr)) == 0)
break;

if (connect(sock, (struct sockaddr *)&dest, sizeof(struct sockaddr)) == 0) break;

close(sock);
usleep(1000);
}
return sock;

}
HighSpeedNetIO(const char * address, int port, bool quiet = true): quiet(quiet), port(port & 0xFFFF) {
is_server = (address == nullptr);
if (is_server) {
recv_sock = server_listen(port);
usleep(2000);
send_sock = server_listen((port+1)&0xFFFF);
}
else {
addr = string(address);
send_sock = client_connect(address, port);
recv_sock = client_connect(address, (port+1)&0xFFFF);
}
FSM = 0;
set_delay_opt(send_sock, true);
set_delay_opt(recv_sock, true);
schannel = new SenderSubChannel(send_sock);
rchannel = new RecverSubChannel(recv_sock);
if(not quiet)
std::cout << "connected\n";
}

~HighSpeedNetIO() {
flush();
if(not quiet) {
std::cout <<"Data Sent: \t"<<schannel->counter<<"\n";
std::cout <<"Data Received: \t"<<rchannel->counter<<"\n";
std::cout <<"Flushes:\t"<<schannel->flushes<<"\t"<<rchannel->flushes<<"\n";
if (not quiet) {
std::cout << "Data Sent: \t" << schannel->counter << "\n";
std::cout << "Data Received: \t" << rchannel->counter << "\n";
std::cout << "Flushes:\t" << schannel->flushes << "\t" << rchannel->flushes << "\n";
}
delete schannel;
delete rchannel;
close(send_sock);
close(recv_sock);
}

void sync() {
}
void sync() {}

void set_delay_opt(int sock, bool enable_nodelay) {
if (enable_nodelay) {
const int one=1;
const int one = 1;
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
} else {
const int zero = 0;
Expand All @@ -223,7 +215,7 @@ class HighSpeedNetIO: public IOChannel<HighSpeedNetIO> { public:
}

void flush() {
if(is_server) {
if (is_server) {
schannel->flush();
rchannel->flush();
} else {
Expand All @@ -232,22 +224,23 @@ class HighSpeedNetIO: public IOChannel<HighSpeedNetIO> { public:
}
FSM = 0;
}
void send_data_internal(const void * data, int len) {
if(FSM == 1) {

void send_data_internal(const void *data, int len) {
if (FSM == 1) {
rchannel->flush();
}
schannel->send_data(data, len);
FSM = 2;
}
void recv_data_internal(void * data, int len) {
if(FSM == 2) {

void recv_data_internal(void *data, int len) {
if (FSM == 2) {
schannel->flush();
}
rchannel->recv_data(data, len);
FSM = 1;
}
};

}
} // namespace emp
#endif