Skip to content

Commit

Permalink
libknet: make tap to links and links to tap full duplex
Browse files Browse the repository at this point in the history
this cuts down by 40% latency on ping test

Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Sep 20, 2012
1 parent 4359cb7 commit fe9e3ee
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 57 deletions.
142 changes: 93 additions & 49 deletions libknet/handle.c
Expand Up @@ -14,7 +14,8 @@
#define KNET_DATABUFSIZE 131072 /* 128k */
#define KNET_PINGBUFSIZE sizeof(struct knet_frame)

static void *_handle_control_thread(void *data);
static void *_handle_tap_to_links_thread(void *data);
static void *_handle_recv_from_links_thread(void *data);
static void *_handle_heartbt_thread(void *data);

knet_handle_t knet_handle_new(int fd, uint16_t node_id)
Expand All @@ -27,62 +28,82 @@ knet_handle_t knet_handle_new(int fd, uint16_t node_id)

memset(knet_h, 0, sizeof(struct knet_handle));

if ((knet_h->databuf = malloc(KNET_DATABUFSIZE))== NULL)
if ((knet_h->tap_to_links_buf = malloc(KNET_DATABUFSIZE))== NULL)
goto exit_fail1;

memset(knet_h->databuf, 0, KNET_DATABUFSIZE);
memset(knet_h->tap_to_links_buf, 0, KNET_DATABUFSIZE);

if ((knet_h->pingbuf = malloc(KNET_PINGBUFSIZE))== NULL)
if ((knet_h->recv_from_links_buf = malloc(KNET_DATABUFSIZE))== NULL)
goto exit_fail2;

memset(knet_h->recv_from_links_buf, 0, KNET_DATABUFSIZE);

if ((knet_h->pingbuf = malloc(KNET_PINGBUFSIZE))== NULL)
goto exit_fail3;

memset(knet_h->pingbuf, 0, KNET_PINGBUFSIZE);

if (pthread_rwlock_init(&knet_h->list_rwlock, NULL) != 0)
goto exit_fail3;
goto exit_fail4;

knet_h->sockfd = fd;
knet_h->epollfd = epoll_create(KNET_MAX_EVENTS);
knet_h->tap_to_links_epollfd = epoll_create(KNET_MAX_EVENTS);
knet_h->recv_from_links_epollfd = epoll_create(KNET_MAX_EVENTS);
knet_h->node_id = node_id;

if (knet_h->epollfd < 0)
goto exit_fail4;
if ((knet_h->tap_to_links_epollfd < 0) ||
(knet_h->recv_from_links_epollfd < 0))
goto exit_fail5;

if (_fdset_cloexec(knet_h->epollfd) != 0)
if ((_fdset_cloexec(knet_h->tap_to_links_epollfd) != 0) ||
(_fdset_cloexec(knet_h->recv_from_links_epollfd != 0)))
goto exit_fail5;

memset(&ev, 0, sizeof(struct epoll_event));

ev.events = EPOLLIN;
ev.data.fd = knet_h->sockfd;

if (epoll_ctl(knet_h->epollfd,
if (epoll_ctl(knet_h->tap_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->sockfd, &ev) != 0)
goto exit_fail5;

if (pthread_create(&knet_h->control_thread, 0,
_handle_control_thread, (void *) knet_h) != 0)
if (pthread_create(&knet_h->tap_to_links_thread, 0,
_handle_tap_to_links_thread, (void *) knet_h) != 0)
goto exit_fail5;

if (pthread_create(&knet_h->recv_from_links_thread, 0,
_handle_recv_from_links_thread, (void *) knet_h) != 0)
goto exit_fail6;

if (pthread_create(&knet_h->heartbt_thread, 0,
_handle_heartbt_thread, (void *) knet_h) != 0)
goto exit_fail6;
goto exit_fail7;

return knet_h;

exit_fail7:
pthread_cancel(knet_h->recv_from_links_thread);

exit_fail6:
pthread_cancel(knet_h->control_thread);
pthread_cancel(knet_h->tap_to_links_thread);

exit_fail5:
close(knet_h->epollfd);
if (knet_h->tap_to_links_epollfd >= 0)
close(knet_h->tap_to_links_epollfd);
if (knet_h->recv_from_links_epollfd >= 0)
close(knet_h->recv_from_links_epollfd);

exit_fail4:
pthread_rwlock_destroy(&knet_h->list_rwlock);

exit_fail4:
free(knet_h->pingbuf);

exit_fail3:
free(knet_h->databuf);
free(knet_h->recv_from_links_buf);

exit_fail2:
free(knet_h->pingbuf);
free(knet_h->tap_to_links_buf);

exit_fail1:
free(knet_h);
Expand All @@ -102,17 +123,24 @@ int knet_handle_free(knet_handle_t knet_h)
if (retval != PTHREAD_CANCELED)
goto exit_busy;

pthread_cancel(knet_h->control_thread);
pthread_join(knet_h->control_thread, &retval);
pthread_cancel(knet_h->tap_to_links_thread);
pthread_join(knet_h->tap_to_links_thread, &retval);
if (retval != PTHREAD_CANCELED)
goto exit_busy;

pthread_cancel(knet_h->recv_from_links_thread);
pthread_join(knet_h->recv_from_links_thread, &retval);

if (retval != PTHREAD_CANCELED)
goto exit_busy;

close(knet_h->epollfd);
close(knet_h->tap_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);

pthread_rwlock_destroy(&knet_h->list_rwlock);

free(knet_h->databuf);
free(knet_h->tap_to_links_buf);
free(knet_h->recv_from_links_buf);
free(knet_h->pingbuf);

free(knet_h);
Expand All @@ -139,13 +167,13 @@ void knet_link_timeout(struct knet_link *lnk,
((lnk->ping_interval * precision) / 8000000);
}

static void _handle_data_send(knet_handle_t knet_h)
static void _handle_tap_to_links(knet_handle_t knet_h)
{
int j;
ssize_t len, snt;
struct knet_host *i;

len = read(knet_h->sockfd, knet_h->databuf->kf_data,
len = read(knet_h->sockfd, knet_h->tap_to_links_buf->kf_data,
KNET_DATABUFSIZE - KNET_FRAME_SIZE);

if (len == 0) {
Expand All @@ -160,7 +188,7 @@ static void _handle_data_send(knet_handle_t knet_h)

/* TODO: packet inspection */

knet_h->databuf->kf_type = KNET_FRAME_DATA;
knet_h->tap_to_links_buf->kf_type = KNET_FRAME_DATA;

if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0)
return;
Expand All @@ -171,7 +199,7 @@ static void _handle_data_send(knet_handle_t knet_h)
continue;

snt = sendto(i->link[j].sock,
knet_h->databuf, len, MSG_DONTWAIT,
knet_h->tap_to_links_buf, len, MSG_DONTWAIT,
(struct sockaddr *) &i->link[j].address,
sizeof(struct sockaddr_storage));

Expand All @@ -183,7 +211,7 @@ static void _handle_data_send(knet_handle_t knet_h)
pthread_rwlock_unlock(&knet_h->list_rwlock);
}

static void _handle_recv_frame(knet_handle_t knet_h, int sockfd)
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd)
{
ssize_t len;
struct sockaddr_storage address;
Expand All @@ -196,54 +224,54 @@ static void _handle_recv_frame(knet_handle_t knet_h, int sockfd)
return;

addrlen = sizeof(struct sockaddr_storage);
len = recvfrom(sockfd, knet_h->databuf, KNET_DATABUFSIZE,
len = recvfrom(sockfd, knet_h->recv_from_links_buf, KNET_DATABUFSIZE,
MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen);

if (len < (KNET_FRAME_SIZE + 1))
goto exit_unlock;

if (ntohl(knet_h->databuf->kf_magic) != KNET_FRAME_MAGIC)
if (ntohl(knet_h->recv_from_links_buf->kf_magic) != KNET_FRAME_MAGIC)
goto exit_unlock;

if (knet_h->databuf->kf_version != KNET_FRAME_VERSION)
if (knet_h->recv_from_links_buf->kf_version != KNET_FRAME_VERSION)
goto exit_unlock;

src_host = NULL;
src_link = NULL;

if ((knet_h->databuf->kf_type & KNET_FRAME_PMSK) != 0) {
knet_h->databuf->kf_node = ntohs(knet_h->databuf->kf_node);
src_host = knet_h->host_index[knet_h->databuf->kf_node];
if ((knet_h->recv_from_links_buf->kf_type & KNET_FRAME_PMSK) != 0) {
knet_h->recv_from_links_buf->kf_node = ntohs(knet_h->recv_from_links_buf->kf_node);
src_host = knet_h->host_index[knet_h->recv_from_links_buf->kf_node];

if (src_host == NULL) /* host not found */
goto exit_unlock;

src_link = src_host->link +
(knet_h->databuf->kf_link % KNET_MAX_LINK);
(knet_h->recv_from_links_buf->kf_link % KNET_MAX_LINK);
}

switch (knet_h->databuf->kf_type) {
switch (knet_h->recv_from_links_buf->kf_type) {
case KNET_FRAME_DATA:
if (knet_h->enabled != 1) /* data forward is disabled */
break;

write(knet_h->sockfd,
knet_h->databuf->kf_data, len - KNET_FRAME_SIZE);
knet_h->recv_from_links_buf->kf_data, len - KNET_FRAME_SIZE);

break;
case KNET_FRAME_PING:
knet_h->databuf->kf_type = KNET_FRAME_PONG;
knet_h->databuf->kf_node = htons(knet_h->node_id);
knet_h->recv_from_links_buf->kf_type = KNET_FRAME_PONG;
knet_h->recv_from_links_buf->kf_node = htons(knet_h->node_id);

sendto(src_link->sock, knet_h->databuf, len, MSG_DONTWAIT,
sendto(src_link->sock, knet_h->recv_from_links_buf, len, MSG_DONTWAIT,
(struct sockaddr *) &src_link->address,
sizeof(struct sockaddr_storage));

break;
case KNET_FRAME_PONG:
clock_gettime(CLOCK_MONOTONIC, &src_link->pong_last);

timespec_diff(knet_h->databuf->kf_time,
timespec_diff(knet_h->recv_from_links_buf->kf_time,
src_link->pong_last, &latency_last);

src_link->latency =
Expand Down Expand Up @@ -331,7 +359,27 @@ static void *_handle_heartbt_thread(void *data)
return NULL;
}

static void *_handle_control_thread(void *data)
static void *_handle_tap_to_links_thread(void *data)
{
knet_handle_t knet_h;
struct epoll_event events[KNET_MAX_EVENTS];

knet_h = (knet_handle_t) data;

/* preparing data buffer */
knet_h->tap_to_links_buf->kf_magic = htonl(KNET_FRAME_MAGIC);
knet_h->tap_to_links_buf->kf_version = KNET_FRAME_VERSION;

while (1) {
if (epoll_wait(knet_h->tap_to_links_epollfd, events, KNET_MAX_EVENTS, -1) >= 1)
_handle_tap_to_links(knet_h);
}

return NULL;

}

static void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h;
Expand All @@ -340,18 +388,14 @@ static void *_handle_control_thread(void *data)
knet_h = (knet_handle_t) data;

/* preparing data buffer */
knet_h->databuf->kf_magic = htonl(KNET_FRAME_MAGIC);
knet_h->databuf->kf_version = KNET_FRAME_VERSION;
knet_h->recv_from_links_buf->kf_magic = htonl(KNET_FRAME_MAGIC);
knet_h->recv_from_links_buf->kf_version = KNET_FRAME_VERSION;

while (1) {
nev = epoll_wait(knet_h->epollfd, events, KNET_MAX_EVENTS, -1);
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_MAX_EVENTS, -1);

for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->sockfd) {
_handle_data_send(knet_h);
} else {
_handle_recv_frame(knet_h, events[i].data.fd);
}
_handle_recv_from_links(knet_h, events[i].data.fd);
}
}

Expand Down
9 changes: 6 additions & 3 deletions libknet/libknet-private.h
Expand Up @@ -18,15 +18,18 @@ do { \

struct knet_handle {
int sockfd;
int epollfd;
int tap_to_links_epollfd;
int recv_from_links_epollfd;
uint16_t node_id;
unsigned int enabled:1;
struct knet_host *host_head;
struct knet_host *host_index[KNET_MAX_HOST];
struct knet_listener *listener_head;
struct knet_frame *databuf;
struct knet_frame *tap_to_links_buf;
struct knet_frame *recv_from_links_buf;
struct knet_frame *pingbuf;
pthread_t control_thread;
pthread_t tap_to_links_thread;
pthread_t recv_from_links_thread;
pthread_t heartbt_thread;
pthread_rwlock_t list_rwlock;
};
Expand Down
6 changes: 3 additions & 3 deletions libknet/listener.c
Expand Up @@ -53,7 +53,7 @@ int knet_listener_add(knet_handle_t knet_h, struct knet_listener *listener)
ev.events = EPOLLIN;
ev.data.fd = listener->sock;

if (epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, listener->sock, &ev) != 0)
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, listener->sock, &ev) != 0)
goto exit_fail1;

if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0)
Expand All @@ -68,7 +68,7 @@ int knet_listener_add(knet_handle_t knet_h, struct knet_listener *listener)
return 0;

exit_fail2:
epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, listener->sock, &ev);
epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev);

exit_fail1:
close(listener->sock);
Expand Down Expand Up @@ -111,7 +111,7 @@ int knet_listener_remove(knet_handle_t knet_h, struct knet_listener *listener)
}
}

epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, listener->sock, &ev);
epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev);
close(listener->sock);

exit_fail1:
Expand Down
4 changes: 2 additions & 2 deletions tests/listener_test.c
Expand Up @@ -78,7 +78,7 @@ int main(int argc, char *argv[])
memset(&ev, 0, sizeof(struct epoll_event));

/* don't try this at home :) */
err = epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, listener->sock, &ev);
err = epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, listener->sock, &ev);

if (err != -1) {
printf("Listener file descriptor not found in epollfd\n");
Expand Down Expand Up @@ -116,7 +116,7 @@ int main(int argc, char *argv[])
}

/* don't try this at home :) */
err = epoll_ctl(knet_h->epollfd, EPOLL_CTL_DEL, listener->sock, &ev);
err = epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev);

if (err != -1) {
printf("Listener file was present in epollfd\n");
Expand Down

0 comments on commit fe9e3ee

Please sign in to comment.