Skip to content

Commit

Permalink
knet: add acquire/release functions to lock the host structure list
Browse files Browse the repository at this point in the history
  • Loading branch information
simon3z committed Oct 20, 2010
1 parent b4c0fdf commit f9ed864
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 91 deletions.
146 changes: 64 additions & 82 deletions ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ struct __knet_handle {
};

static void *knet_control_thread(void *data);
static void knet_send_data(knet_handle_t knet_h);
static void knet_recv_frame(knet_handle_t knet_h, struct knet_link *link);

knet_handle_t knet_handle_new(void)
{
Expand Down Expand Up @@ -59,8 +57,8 @@ knet_handle_t knet_handle_new(void)

memset(&ev, 0, sizeof(struct epoll_event));

ev.events = EPOLLIN;
ev.data.ptr = knet_h;
ev.events = EPOLLIN;
ev.data.fd = knet_h->sock[0];

if (epoll_ctl(knet_h->epollfd,
EPOLL_CTL_ADD, knet_h->sock[0], &ev) != 0)
Expand Down Expand Up @@ -90,28 +88,35 @@ knet_handle_t knet_handle_new(void)
return NULL;
}

int knet_host_acquire(knet_handle_t knet_h, struct knet_host **head, int writelock)
{
int ret;

if (writelock != 0)
ret = pthread_rwlock_wrlock(&knet_h->host_rwlock);
else
ret = pthread_rwlock_rdlock(&knet_h->host_rwlock);

*head = (ret == 0) ? knet_h->host_head : NULL;

return ret;
}

int knet_host_release(knet_handle_t knet_h)
{
return pthread_rwlock_unlock(&knet_h->host_rwlock);
}

int knet_handle_getfd(knet_handle_t knet_h)
{
return knet_h->sock[1];
}

int knet_host_add(knet_handle_t knet_h, struct knet_host *host)
{
struct knet_link *lp;
struct epoll_event ev;

memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;

if (pthread_rwlock_wrlock(&knet_h->host_rwlock) != 0)
return -1;

for (lp = host->link; lp != NULL; lp = lp->next) {
ev.data.ptr = lp;
/* TODO: check for errors? */
epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, lp->sock, &ev);
}

/* pushing new host to the front */
host->next = knet_h->host_head;
knet_h->host_head = host;
Expand Down Expand Up @@ -149,86 +154,40 @@ int knet_host_remove(knet_handle_t knet_h, struct knet_host *host)
return 0;
}

int knet_host_foreach(knet_handle_t knet_h,
int (*action)(struct knet_host *, void *), void *data)
{
struct knet_host *i;

if (pthread_rwlock_rdlock(&knet_h->host_rwlock) != 0)
return -1;

for (i = knet_h->host_head; i != NULL; i = i->next) {
if (action && action(i, data) != 0)
break;
}

pthread_rwlock_unlock(&knet_h->host_rwlock);
return 0;
}

int knet_bind(struct sockaddr *address, socklen_t addrlen)
int knet_handle_bind(knet_handle_t knet_h, struct sockaddr *address, socklen_t addrlen)
{
int sockfd, err, value;
int sockfd, value;
struct epoll_event ev;

sockfd = socket(address->sa_family, SOCK_DGRAM, 0);

if (sockfd < 0) {
log_error("Unable to open netsocket error");
if (sockfd < 0)
return sockfd;
}

value = KNET_RING_RCVBUFF;
err = setsockopt(sockfd,
SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value));
setsockopt(sockfd, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value));

if (err != 0)
log_error("Unable to set receive buffer");
if (knet_fdset_cloexec(sockfd) != 0)
goto exit_fail1;

err = knet_fdset_cloexec(sockfd);
if (bind(sockfd, address, addrlen) != 0)
goto exit_fail1;

if (err != 0) {
log_error("Unable to get close-on-exec flag");
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));

err = bind(sockfd, address, addrlen);
ev.events = EPOLLIN;
ev.data.fd = sockfd;

if (err < 0) {
log_error("Unable to bind to ring socket");
goto exit_fail;
}
if (epoll_ctl(knet_h->epollfd, EPOLL_CTL_ADD, sockfd, &ev) != 0)
goto exit_fail1;

return sockfd;

exit_fail:
exit_fail1:
close(sockfd);
return -1;
}

static void *knet_control_thread(void *data)
{
int i, nev;
knet_handle_t knet_h;
struct epoll_event events[KNET_MAX_EVENTS];

knet_h = (knet_handle_t) data;

while (1) {
nev = epoll_wait(knet_h->epollfd,
events, KNET_MAX_EVENTS, KNET_PING_TIMERES);

for (i = 0; i < nev; i++) {
if (events[i].data.ptr == knet_h) {
knet_send_data(knet_h);
} else {
knet_recv_frame(knet_h, events[i].data.ptr);
}
}
}

return NULL;
}

static void knet_send_data(knet_handle_t knet_h)
{
ssize_t len, snt;
Expand All @@ -246,9 +205,9 @@ static void knet_send_data(knet_handle_t knet_h)

len += sizeof(struct knet_frame);

knet_h->buff->magic = htonl(KNET_FRAME_MAGIC);
knet_h->buff->version = KNET_FRAME_VERSION;
knet_h->buff->type = KNET_FRAME_DATA;
knet_h->buff->magic = htonl(KNET_FRAME_MAGIC);
knet_h->buff->version = KNET_FRAME_VERSION;
knet_h->buff->type = KNET_FRAME_DATA;

/* TODO: packet inspection */

Expand All @@ -268,13 +227,13 @@ static void knet_send_data(knet_handle_t knet_h)
pthread_rwlock_unlock(&knet_h->host_rwlock);
}

static void knet_recv_frame(knet_handle_t knet_h, struct knet_link *hlnk)
static void knet_recv_frame(knet_handle_t knet_h, int sockfd)
{
ssize_t len;
struct sockaddr_storage address;
socklen_t addrlen;

len = recvfrom(hlnk->sock, knet_h->buff, KNET_BUFSIZE,
len = recvfrom(sockfd, knet_h->buff, KNET_BUFSIZE,
MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen);

if (len < sizeof(struct knet_frame))
Expand All @@ -300,3 +259,26 @@ static void knet_recv_frame(knet_handle_t knet_h, struct knet_link *hlnk)
}
}

static void *knet_control_thread(void *data)
{
int i, nev;
knet_handle_t knet_h;
struct epoll_event events[KNET_MAX_EVENTS];

knet_h = (knet_handle_t) data;

while (1) {
nev = epoll_wait(knet_h->epollfd,
events, KNET_MAX_EVENTS, KNET_PING_TIMERES);

for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->sock[0]) {
knet_send_data(knet_h);
} else {
knet_recv_frame(knet_h, events[i].data.fd);
}
}
}

return NULL;
}
9 changes: 6 additions & 3 deletions ring.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ struct knet_frame {
} __attribute__((packed));

knet_handle_t knet_handle_new(void);

int knet_handle_bind(knet_handle_t knet_h, struct sockaddr *address, socklen_t addrlen);

int knet_handle_getfd(knet_handle_t knet_h);

int knet_host_acquire(knet_handle_t knet_h, struct knet_host **head, int writelock);
int knet_host_release(knet_handle_t knet_h);

int knet_host_add(knet_handle_t khandle, struct knet_host *host);
int knet_host_remove(knet_handle_t khandle, struct knet_host *host);
int knet_host_foreach(knet_handle_t khandle, int (*action)(struct knet_host *, void *), void *data);

int knet_bind(struct sockaddr *address, socklen_t addrlen);

#endif
22 changes: 17 additions & 5 deletions tests/khandle_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@

struct knet_host *host_list[HOST_LIST_SIZE];

static int my_action(struct knet_host *host, void *data)
static int host_loop(knet_handle_t knet_h, size_t *loopnum)
{
host->active = 1;
*(size_t *) data += 1;
struct knet_host *kh;

knet_host_acquire(knet_h, &kh, 1);

while (kh != NULL) {
kh->active = 1;
kh = kh->next;
(*loopnum)++;
}

knet_host_release(knet_h);

return 0;
}

Expand All @@ -25,6 +35,7 @@ int main(int argc, char *argv[])
int i, j;
size_t loopnum;
knet_handle_t knet_h;


knet_h = knet_handle_new();

Expand All @@ -36,8 +47,9 @@ int main(int argc, char *argv[])
loopnum = 0;

for (i = 0; i < HOST_LIST_SIZE; i++) {
for (j = 0; j < HOST_LIST_LOOP; j++)
knet_host_foreach(knet_h, my_action, &loopnum);
for (j = 0; j < HOST_LIST_LOOP; j++) {
host_loop(knet_h, &loopnum);
}
knet_host_remove(knet_h, host_list[i]);
}

Expand Down
2 changes: 1 addition & 1 deletion tests/ping_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static void argv_to_hosts(int argc, char *argv[])
exit(EXIT_FAILURE);
}

sockfd = knet_bind((struct sockaddr *) &address, sizeof(struct sockaddr_in));
sockfd = knet_handle_bind(knet_h, (struct sockaddr *) &address, sizeof(struct sockaddr_in));

if (sockfd < 0) {
log_error("Unable to bind knet");
Expand Down

0 comments on commit f9ed864

Please sign in to comment.