Skip to content

Commit

Permalink
Merge pull request #2 from kezhuw/master
Browse files Browse the repository at this point in the history
Ask a question
  • Loading branch information
cloudwu committed Apr 11, 2012
2 parents 88bde77 + 3954dd2 commit b75b6d0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
29 changes: 17 additions & 12 deletions mread.c
Expand Up @@ -25,6 +25,8 @@
#define SOCKET_READ 3
#define SOCKET_POLLIN 4

#define SOCKET_ALIVE SOCKET_SUSPEND

struct socket {
int fd;
struct ringbuffer_block * node;
Expand Down Expand Up @@ -144,7 +146,7 @@ mread_close(struct mread_pool *self) {
int i;
struct socket * s = self->sockets;
for (i=0;i<self->max_connection;i++) {
if (s[i].status != SOCKET_INVALID) {
if (s[i].status >= SOCKET_ALIVE) {
close(s[i].fd);
}
}
Expand Down Expand Up @@ -291,6 +293,8 @@ static void
_close_client(struct mread_pool * self, int id) {
struct socket * s = &self->sockets[id];
s->status = SOCKET_CLOSED;
ringbuffer_free(self->rb, s->temp);
ringbuffer_free(self->rb, s->node);
s->node = NULL;
s->temp = NULL;
close(s->fd);
Expand All @@ -303,9 +307,6 @@ _close_client(struct mread_pool * self, int id) {
static void
_close_active(struct mread_pool * self) {
int id = self->active;
struct socket * s = &self->sockets[id];
ringbuffer_free(self->rb, s->temp);
ringbuffer_free(self->rb, s->node);
_close_client(self, id);
}

Expand Down Expand Up @@ -334,17 +335,16 @@ mread_pull(struct mread_pool * self , int size) {
self->skip += size;
return buffer;
}
if (s->status == SOCKET_CLOSED) {
ringbuffer_free(self->rb , s->node);
s->node = NULL;
return NULL;
}

if (s->status == SOCKET_READ) {
switch (s->status) {
case SOCKET_READ:
s->status = SOCKET_SUSPEND;
case SOCKET_CLOSED:
case SOCKET_SUSPEND:
return NULL;
default:
assert(s->status == SOCKET_POLLIN);
break;
}
assert(s->status == SOCKET_POLLIN);

int sz = size - rd_size;
int rd = READBLOCKSIZE;
Expand Down Expand Up @@ -380,16 +380,20 @@ mread_pull(struct mread_pool * self , int size) {
break;
}
if (bytes == 0) {
ringbuffer_resize(rb, blk, 0);
_close_active(self);
return NULL;
}
if (bytes == -1) {
switch(errno) {
case EWOULDBLOCK:
ringbuffer_resize(rb, blk, 0);
s->status = SOCKET_SUSPEND;
return NULL;
case EINTR:
continue;
default:
ringbuffer_resize(rb, blk, 0);
_close_active(self);
return NULL;
}
Expand All @@ -406,6 +410,7 @@ mread_pull(struct mread_pool * self , int size) {
struct ringbuffer_block * temp = ringbuffer_alloc(rb, size);
while (temp == NULL) {
int collect_id = ringbuffer_collect(rb);
_close_client(self , collect_id);
if (id == collect_id) {
return NULL;
}
Expand Down
5 changes: 5 additions & 0 deletions ringbuffer.c
Expand Up @@ -67,6 +67,7 @@ _alloc(struct ringbuffer * rb, int total_size , int size) {
blk->length = sizeof(struct ringbuffer_block) + size;
blk->offset = 0;
blk->next = -1;
blk->id = -1;
struct ringbuffer_block * next = block_next(rb, blk);
rb->head = block_offset(rb, next);
if (align_length < total_size) {
Expand Down Expand Up @@ -129,6 +130,10 @@ ringbuffer_collect(struct ringbuffer * rb) {

void
ringbuffer_resize(struct ringbuffer * rb, struct ringbuffer_block * blk, int size) {
if (size == 0) {
rb->head = block_offset(rb, blk);
return;
}
int align_length = ALIGN(sizeof(struct ringbuffer_block) + size);
int old_length = ALIGN(blk->length);
assert(align_length < old_length);
Expand Down

0 comments on commit b75b6d0

Please sign in to comment.