Skip to content

Commit

Permalink
Merge branch 'master' of github.com:cloudwu/mread
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwu committed May 17, 2012
2 parents 2ced0dc + 71a1b9a commit e5f23a1
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
2 changes: 1 addition & 1 deletion map.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,6 @@ map_dump(struct map *m) {
int i;
for (i=0;i<m->size;i++) {
struct node * n = &(m->hash[i]);
printf("[%d] fd = %d , id = %d , next = %d\n",i,n->fd,n->id,(n->next - m->hash));
printf("[%d] fd = %d , id = %d , next = %ld\n",i,n->fd,n->id,(n->next - m->hash));
}
}
42 changes: 32 additions & 10 deletions mread.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <fcntl.h>

#define BACKLOG 32
#define READQUEUE 32
Expand All @@ -25,6 +26,8 @@
#define SOCKET_READ 3
#define SOCKET_POLLIN 4

#define SOCKET_ALIVE SOCKET_SUSPEND

struct socket {
int fd;
struct ringbuffer_block * node;
Expand Down Expand Up @@ -78,12 +81,27 @@ _release_rb(struct ringbuffer * rb) {
ringbuffer_delete(rb);
}

static int
_set_nonblocking(int fd)
{
int flag = fcntl(fd, F_GETFL, 0);
if ( -1 == flag ) {
return -1;
}

return fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}

struct mread_pool *
mread_create(int port , int max , int buffer_size) {
int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
return NULL;
}
if ( -1 == _set_nonblocking(listen_fd) ) {
return NULL;
}

int reuse = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int));

Expand Down Expand Up @@ -144,7 +162,7 @@ mread_close(struct mread_pool *self) {
int i;
struct socket * s = self->sockets;
for (i=0;i<self->max_connection;i++) {
if (s[i].status != SOCKET_INVALID) {
if (s[i].status >= SOCKET_ALIVE) {
close(s[i].fd);
}
}
Expand Down Expand Up @@ -334,17 +352,16 @@ mread_pull(struct mread_pool * self , int size) {
self->skip += size;
return buffer;
}
if (s->status == SOCKET_CLOSED) {
ringbuffer_free(self->rb , s->node);
s->node = NULL;
return NULL;
}

if (s->status == SOCKET_READ) {
switch (s->status) {
case SOCKET_READ:
s->status = SOCKET_SUSPEND;
case SOCKET_CLOSED:
case SOCKET_SUSPEND:
return NULL;
default:
assert(s->status == SOCKET_POLLIN);
break;
}
assert(s->status == SOCKET_POLLIN);

int sz = size - rd_size;
int rd = READBLOCKSIZE;
Expand Down Expand Up @@ -380,16 +397,20 @@ mread_pull(struct mread_pool * self , int size) {
break;
}
if (bytes == 0) {
ringbuffer_resize(rb, blk, 0);
_close_active(self);
return NULL;
}
if (bytes == -1) {
switch(errno) {
case EWOULDBLOCK:
ringbuffer_resize(rb, blk, 0);
s->status = SOCKET_SUSPEND;
return NULL;
case EINTR:
continue;
default:
ringbuffer_resize(rb, blk, 0);
_close_active(self);
return NULL;
}
Expand All @@ -406,6 +427,7 @@ mread_pull(struct mread_pool * self , int size) {
struct ringbuffer_block * temp = ringbuffer_alloc(rb, size);
while (temp == NULL) {
int collect_id = ringbuffer_collect(rb);
_close_client(self , collect_id);
if (id == collect_id) {
return NULL;
}
Expand Down
5 changes: 5 additions & 0 deletions ringbuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ _alloc(struct ringbuffer * rb, int total_size , int size) {
blk->length = sizeof(struct ringbuffer_block) + size;
blk->offset = 0;
blk->next = -1;
blk->id = -1;
struct ringbuffer_block * next = block_next(rb, blk);
rb->head = block_offset(rb, next);
if (align_length < total_size) {
Expand Down Expand Up @@ -129,6 +130,10 @@ ringbuffer_collect(struct ringbuffer * rb) {

void
ringbuffer_resize(struct ringbuffer * rb, struct ringbuffer_block * blk, int size) {
if (size == 0) {
rb->head = block_offset(rb, blk);
return;
}
int align_length = ALIGN(sizeof(struct ringbuffer_block) + size);
int old_length = ALIGN(blk->length);
assert(align_length < old_length);
Expand Down

0 comments on commit e5f23a1

Please sign in to comment.