Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

first commit

  • Loading branch information...
commit d5e8370fbb4e30231f31fa9c1e60763d54c39a92 0 parents
@cloudwu authored
2  Makefile
@@ -0,0 +1,2 @@
+all:
+ gcc -g -o mread -Wall mread.c ringbuffer.c map.c main.c
38 main.c
@@ -0,0 +1,38 @@
+#include "mread.h"
+
+#include <stdio.h>
+#include <unistd.h>
+
+static void
+test(struct mread_pool *m) {
+ int id = mread_poll(m,0);
+ if (id >= 0) {
+ for (;;) {
+ char * buffer = mread_pull(m, 4);
+ if (buffer == NULL) {
+ if (mread_closed(m)) {
+ printf("%d: CLOSED\n",id);
+ }
+ break;
+ } else {
+ printf("%d : %d %d %d %d\n",id, buffer[0],buffer[1],buffer[2],buffer[3]);
+ mread_yield(m);
+ }
+ }
+ }
+}
+
+int
+main() {
+ struct mread_pool * m = mread_create(2525 , 10, 0);
+ if (m == NULL) {
+ perror("error:");
+ return 1;
+ }
+ for (;;) {
+ test(m);
+ sleep(1);
+ }
+ mread_close(m);
+ return 0;
+}
92 map.c
@@ -0,0 +1,92 @@
+#include "map.h"
+
+#include <stdlib.h>
+#include <assert.h>
+
+struct node {
+ int fd;
+ int id;
+ int next;
+};
+
+struct map {
+ int size;
+ struct node * hash;
+};
+
+struct map *
+map_new(int max) {
+ int sz = 1;
+ while (sz <= max) {
+ sz *= 2;
+ }
+ struct map * m = malloc(sizeof(*m));
+ m->size = sz;
+ m->hash = malloc(sizeof(struct node) * sz);
+ int i;
+ for (i=0;i<sz;i++) {
+ m->hash[i].fd = -1;
+ m->hash[i].id = 0;
+ m->hash[i].next = -1;
+ }
+ return m;
+}
+
+void
+map_delete(struct map * m) {
+ free(m->hash);
+ free(m);
+}
+
+int
+map_search(struct map * m, int fd) {
+ int hash = fd & (m->size-1);
+ struct node * n = &m->hash[hash];
+ for(;;) {
+ if (n->fd == fd)
+ return n->id;
+ if (n->next < 0)
+ return -1;
+ n = &m->hash[n->next];
+ }
+}
+
+void
+map_insert(struct map * m, int fd, int id) {
+ int hash = fd & (m->size-1);
+ struct node * n = &m->hash[hash];
+ if (n->fd < 0) {
+ n->fd = fd;
+ n->id = id;
+ return;
+ }
+ while (n->next >=0 ) {
+ n = &m->hash[n->next];
+ }
+ int i;
+ for (i=0;i<m->size;i++) {
+ struct node * temp = &m->hash[i];
+ if (temp->fd < 0) {
+ temp->fd = fd;
+ temp->id = id;
+ n->next = i;
+ return;
+ }
+ }
+ assert(0);
+}
+
+void
+map_erase(struct map *m , int fd) {
+ int hash = fd & (m->size-1);
+ struct node * n = &m->hash[hash];
+ for(;;) {
+ if (n->fd == fd) {
+ n->fd = -1;
+ return;
+ }
+ if (n->next < 0)
+ return;
+ n = &m->hash[n->next];
+ }
+}
12 map.h
@@ -0,0 +1,12 @@
+#ifndef MREAD_MAP_H
+#define MREAD_MAP_H
+
+struct map;
+
+struct map * map_new(int max);
+void map_delete(struct map *);
+int map_search(struct map * , int fd);
+void map_insert(struct map * , int fd, int id);
+void map_erase(struct map *, int fd);
+
+#endif
464 mread.c
@@ -0,0 +1,464 @@
+#include "mread.h"
+#include "ringbuffer.h"
+#include "map.h"
+
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <string.h>
+#include <assert.h>
+#include <stdio.h>
+
+#define BACKLOG 32
+#define READQUEUE 32
+#define READBLOCKSIZE 2048
+#define RINGBUFFER_DEFAULT 1024 * 1024
+
+#define SOCKET_INVALID 0
+#define SOCKET_CLOSED 1
+#define SOCKET_SUSPEND 2
+#define SOCKET_READ 3
+#define SOCKET_POLLIN 4
+
+struct socket {
+ int fd;
+ struct ringbuffer_block * node;
+ struct ringbuffer_block * temp;
+ int status;
+};
+
+struct mread_pool {
+ int listen_fd;
+ int epoll_fd;
+ int max_connection;
+ int closed;
+ int active;
+ int skip;
+ struct socket * sockets;
+ struct socket * free_socket;
+ struct map * socket_hash;
+ int queue_len;
+ int queue_head;
+ struct epoll_event ev[READQUEUE];
+ struct ringbuffer * rb;
+};
+
+static struct socket *
+_create_sockets(int max) {
+ int i;
+ struct socket * s = malloc(max * sizeof(struct socket));
+ for (i=0;i<max;i++) {
+ s[i].fd = i+1;
+ s[i].node = NULL;
+ s[i].temp = NULL;
+ s[i].status = SOCKET_INVALID;
+ }
+ s[max-1].fd = -1;
+ return s;
+}
+
+static struct ringbuffer *
+_create_rb(int size) {
+ size = (size + 3) & ~3;
+ if (size < READBLOCKSIZE * 2) {
+ size = READBLOCKSIZE * 2;
+ }
+ struct ringbuffer * rb = ringbuffer_new(size);
+
+ return rb;
+}
+
+static void
+_release_rb(struct ringbuffer * rb) {
+ ringbuffer_delete(rb);
+}
+
+struct mread_pool *
+mread_create(int port , int max , int buffer_size) {
+ int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
+ if (listen_fd == -1) {
+ return NULL;
+ }
+ int reuse = 1;
+ setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int));
+
+ struct sockaddr_in my_addr;
+ memset(&my_addr, 0, sizeof(struct sockaddr_in));
+ my_addr.sin_family = AF_INET;
+ my_addr.sin_port = htons(port);
+ my_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); // INADDR_ANY;
+ printf("MREAD bind %s:%u\n",inet_ntoa(my_addr.sin_addr),ntohs(my_addr.sin_port));
+ if (bind(listen_fd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) {
+ close(listen_fd);
+ return NULL;
+ }
+ if (listen(listen_fd, BACKLOG) == -1) {
+ close(listen_fd);
+ return NULL;
+ }
+
+ int epoll_fd = epoll_create(max + 1);
+ if (epoll_fd == -1) {
+ close(listen_fd);
+ return NULL;
+ }
+ struct epoll_event ev;
+ ev.events = EPOLLIN;
+ ev.data.fd = listen_fd;
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
+ close(listen_fd);
+ close(epoll_fd);
+ return NULL;
+ }
+
+ struct mread_pool * self = malloc(sizeof(*self));
+ self->listen_fd = listen_fd;
+ self->epoll_fd = epoll_fd;
+ self->max_connection = max;
+ self->closed = 0;
+ self->active = -1;
+ self->skip = 0;
+ self->sockets = _create_sockets(max);
+ self->free_socket = &self->sockets[0];
+ self->socket_hash = map_new(max);
+ self->queue_len = 0;
+ self->queue_head = 0;
+ if (buffer_size == 0) {
+ self->rb = _create_rb(RINGBUFFER_DEFAULT);
+ } else {
+ self->rb = _create_rb(buffer_size);
+ }
+
+ return self;
+}
+
+void
+mread_close(struct mread_pool *self) {
+ if (self == NULL)
+ return;
+ int i;
+ struct socket * s = self->sockets;
+ for (i=0;i<self->max_connection;i++) {
+ if (s[i].status != SOCKET_INVALID) {
+ close(s[i].fd);
+ }
+ }
+ free(s);
+ if (self->listen_fd >= 0) {
+ close(self->listen_fd);
+ }
+ close(self->epoll_fd);
+ _release_rb(self->rb);
+ map_delete(self->socket_hash);
+ free(self);
+}
+
+static int
+_read_queue(struct mread_pool * self, int timeout) {
+ self->queue_head = 0;
+ int n = epoll_wait(self->epoll_fd , self->ev, READQUEUE, timeout);
+ if (n == -1) {
+ self->queue_len = 0;
+ return -1;
+ }
+ self->queue_len = n;
+ return n;
+}
+
+inline static int
+_read_one(struct mread_pool * self) {
+ if (self->queue_head >= self->queue_len) {
+ return -1;
+ }
+ return self->ev[self->queue_head ++].data.fd;
+}
+
+static struct socket *
+_alloc_socket(struct mread_pool * self) {
+ if (self->free_socket == NULL) {
+ return NULL;
+ }
+ struct socket * s = self->free_socket;
+ int next_free = s->fd;
+ if (next_free < 0 ) {
+ self->free_socket = NULL;
+ } else {
+ self->free_socket = &self->sockets[next_free];
+ }
+ return s;
+}
+
+static void
+_add_client(struct mread_pool * self, int fd) {
+ struct socket * s = _alloc_socket(self);
+ if (s == NULL) {
+ close(fd);
+ return;
+ }
+ struct epoll_event ev;
+ ev.events = EPOLLIN;
+ ev.data.fd = fd;
+ if (epoll_ctl(self->epoll_fd, EPOLL_CTL_ADD, fd, &ev) == -1) {
+ close(fd);
+ return;
+ }
+
+ s->fd = fd;
+ s->node = NULL;
+ s->status = SOCKET_SUSPEND;
+ int id = s - self->sockets;
+ map_insert(self->socket_hash , fd , id);
+}
+
+static int
+_report_closed(struct mread_pool * self) {
+ int i;
+ for (i=0;i<self->max_connection;i++) {
+ if (self->sockets[i].status == SOCKET_CLOSED) {
+ self->active = i;
+ return i;
+ }
+ }
+ assert(0);
+ return -1;
+}
+
+int
+mread_poll(struct mread_pool * self , int timeout) {
+ if (self->active >= 0) {
+ struct socket * s = &self->sockets[self->active];
+ if (s->status == SOCKET_READ) {
+ return self->active;
+ }
+ }
+ if (self->closed > 0 ) {
+ return _report_closed(self);
+ }
+ if (self->queue_head >= self->queue_len) {
+ if (_read_queue(self, timeout) == -1) {
+ self->active = -1;
+ return -1;
+ }
+ }
+ for (;;) {
+ int fd = _read_one(self);
+ if (fd == -1) {
+ self->active = -1;
+ return -1;
+ }
+ if (fd == self->listen_fd) {
+ struct sockaddr_in remote_addr;
+ socklen_t len = sizeof(struct sockaddr_in);
+ int client_fd = accept(self->listen_fd , (struct sockaddr *)&remote_addr , &len);
+ if (client_fd >= 0) {
+ printf("MREAD connect %s:%u (fd=%d)\n",inet_ntoa(remote_addr.sin_addr),ntohs(remote_addr.sin_port), client_fd);
+ _add_client(self, client_fd);
+ }
+ } else {
+ int index = map_search(self->socket_hash , fd);
+ if (index >= 0) {
+ self->active = index;
+ struct socket * s = &self->sockets[index];
+ s->status = SOCKET_POLLIN;
+ return index;
+ }
+ }
+ }
+}
+
+int
+mread_socket(struct mread_pool * self, int index) {
+ return self->sockets[index].fd;
+}
+
+static void
+_link_node(struct ringbuffer * rb, int id, struct socket * s , struct ringbuffer_block * blk) {
+ if (s->node) {
+ ringbuffer_link(rb, s->node , blk);
+ } else {
+ blk->id = id;
+ s->node = blk;
+ }
+}
+
+static void
+_close_client(struct mread_pool * self, int id) {
+ struct socket * s = &self->sockets[id];
+ s->status = SOCKET_CLOSED;
+ s->node = NULL;
+ s->temp = NULL;
+ close(s->fd);
+ printf("MREAD close %d (fd=%d)\n",id,s->fd);
+ epoll_ctl(self->epoll_fd, EPOLL_CTL_DEL, s->fd , NULL);
+
+ ++self->closed;
+}
+
+static void
+_close_active(struct mread_pool * self) {
+ int id = self->active;
+ struct socket * s = &self->sockets[id];
+ ringbuffer_free(self->rb, s->temp);
+ ringbuffer_free(self->rb, s->node);
+ _close_client(self, id);
+}
+
+static char *
+_ringbuffer_read(struct mread_pool * self, int *size) {
+ struct socket * s = &self->sockets[self->active];
+ if (s->node == NULL) {
+ *size = 0;
+ return NULL;
+ }
+ int sz = *size;
+ void * ret;
+ *size = ringbuffer_data(self->rb, s->node, sz , self->skip, &ret);
+ return ret;
+}
+
+void *
+mread_pull(struct mread_pool * self , int size) {
+ if (self->active == -1) {
+ return NULL;
+ }
+ struct socket *s = &self->sockets[self->active];
+ int rd_size = size;
+ char * buffer = _ringbuffer_read(self, &rd_size);
+ if (buffer) {
+ self->skip += size;
+ return buffer;
+ }
+ if (s->status == SOCKET_CLOSED) {
+ ringbuffer_free(self->rb , s->node);
+ s->node = NULL;
+ return NULL;
+ }
+
+ if (s->status == SOCKET_READ) {
+ s->status = SOCKET_SUSPEND;
+ return NULL;
+ }
+ assert(s->status == SOCKET_POLLIN);
+
+ int sz = size - rd_size;
+ int rd = READBLOCKSIZE;
+ if (rd < sz) {
+ rd = sz;
+ }
+
+ int id = self->active;
+ struct ringbuffer * rb = self->rb;
+
+ struct ringbuffer_block * blk = ringbuffer_alloc(rb , rd);
+ while (blk == NULL) {
+ int collect_id = ringbuffer_collect(rb);
+ _close_client(self , collect_id);
+ if (id == collect_id) {
+ return NULL;
+ }
+ blk = ringbuffer_alloc(rb , rd);
+ }
+
+ buffer = (char *)(blk + 1);
+
+ for (;;) {
+ int bytes = recv(s->fd, buffer, rd, MSG_DONTWAIT);
+ if (bytes > 0) {
+ ringbuffer_resize(rb, blk , bytes);
+ if (bytes < sz) {
+ _link_node(rb, self->active, s , blk);
+ s->status = SOCKET_SUSPEND;
+ return NULL;
+ }
+ s->status = SOCKET_READ;
+ break;
+ }
+ if (bytes == 0) {
+ _close_active(self);
+ return NULL;
+ }
+ if (bytes == -1) {
+ switch(errno) {
+ case EWOULDBLOCK:
+ return NULL;
+ default:
+ if (errno == EAGAIN) {
+ continue;
+ }
+ _close_active(self);
+ return NULL;
+ }
+ }
+ }
+ _link_node(rb, self->active , s , blk);
+ void * ret;
+ int real_rd = ringbuffer_data(rb, s->node , size , self->skip, &ret);
+ if (ret) {
+ self->skip += size;
+ return ret;
+ }
+ assert(real_rd == size);
+ struct ringbuffer_block * temp = ringbuffer_alloc(rb, size);
+ while (temp == NULL) {
+ int collect_id = ringbuffer_collect(rb);
+ if (id == collect_id) {
+ return NULL;
+ }
+ temp = ringbuffer_alloc(rb , size);
+ }
+ temp->id = id;
+ if (s->temp) {
+ ringbuffer_link(rb, temp, s->temp);
+ }
+ s->temp = temp;
+ ret = ringbuffer_copy(rb, s->node, self->skip, temp);
+ assert(ret);
+ self->skip += size;
+
+ return ret;
+}
+
+void
+mread_yield(struct mread_pool * self) {
+ if (self->active == -1) {
+ return;
+ }
+ struct socket *s = &self->sockets[self->active];
+ ringbuffer_free(self->rb , s->temp);
+ s->temp = NULL;
+ if (s->status == SOCKET_CLOSED && s->node == NULL) {
+ --self->closed;
+ s->status = SOCKET_INVALID;
+ map_erase(self->socket_hash , s->fd);
+ s->fd = self->free_socket - self->sockets;
+ self->free_socket = s;
+ self->skip = 0;
+ self->active = -1;
+ } else {
+ if (s->node) {
+ s->node = ringbuffer_yield(self->rb, s->node, self->skip);
+ }
+ self->skip = 0;
+ if (s->node == NULL) {
+ self->active = -1;
+ }
+ }
+}
+
+int
+mread_closed(struct mread_pool * self) {
+ if (self->active == -1) {
+ return 0;
+ }
+ struct socket * s = &self->sockets[self->active];
+ if (s->status == SOCKET_CLOSED && s->node == NULL) {
+ mread_yield(self);
+ return 1;
+ }
+ return 0;
+}
15 mread.h
@@ -0,0 +1,15 @@
+#ifndef MREAD_H
+#define MREAD_H
+
+struct mread_pool;
+
+struct mread_pool * mread_create(int port , int max , int buffer);
+void mread_close(struct mread_pool *m);
+
+int mread_poll(struct mread_pool *m , int timeout);
+void * mread_pull(struct mread_pool *m , int size);
+void mread_yield(struct mread_pool *m);
+int mread_closed(struct mread_pool *m);
+int mread_socket(struct mread_pool *m , int index);
+
+#endif
266 ringbuffer.c
@@ -0,0 +1,266 @@
+#include "ringbuffer.h"
+
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+
+struct ringbuffer {
+ int size;
+ int head;
+};
+
+static inline int
+block_offset(struct ringbuffer * rb, struct ringbuffer_block * blk) {
+ char * start = (char *)(rb + 1);
+ return (char *)blk - start;
+}
+
+static inline struct ringbuffer_block *
+block_ptr(struct ringbuffer * rb, int offset) {
+ char * start = (char *)(rb + 1);
+ return (struct ringbuffer_block *)(start + offset);
+}
+
+static inline struct ringbuffer_block *
+block_next(struct ringbuffer * rb, struct ringbuffer_block * blk) {
+ int align_length = (blk->length + 3) & ~3;
+ int head = block_offset(rb, blk);
+ if (align_length + head == rb->size) {
+ return NULL;
+ }
+ assert(align_length + head < rb->size);
+ return block_ptr(rb, head + align_length);
+}
+
+struct ringbuffer *
+ringbuffer_new(int size) {
+ struct ringbuffer * rb = malloc(sizeof(*rb) + size);
+ rb->size = size;
+ rb->head = 0;
+ struct ringbuffer_block * blk = block_ptr(rb, 0);
+ blk->length = size;
+ blk->id = -1;
+ return rb;
+}
+
+void
+ringbuffer_delete(struct ringbuffer * rb) {
+ free(rb);
+}
+
+void
+ringbuffer_link(struct ringbuffer *rb , struct ringbuffer_block * head, struct ringbuffer_block * next) {
+ while (head->next >=0) {
+ head = block_ptr(rb, head->next);
+ }
+ next->id = head->id;
+ head->next = block_offset(rb, next);
+}
+
+static struct ringbuffer_block *
+_alloc(struct ringbuffer * rb, int total_size , int size) {
+ struct ringbuffer_block * blk = block_ptr(rb, rb->head);
+ int align_length = sizeof(struct ringbuffer_block) + ((size + 3) & ~3);
+ blk->length = sizeof(struct ringbuffer_block) + size;
+ blk->offset = 0;
+ blk->next = -1;
+ struct ringbuffer_block * next = block_next(rb, blk);
+ rb->head = block_offset(rb, next);
+ if (align_length < total_size) {
+ next->length = total_size - align_length;
+ if (next->length >= sizeof(struct ringbuffer_block)) {
+ next->id = -1;
+ }
+ }
+ return blk;
+}
+
+struct ringbuffer_block *
+ringbuffer_alloc(struct ringbuffer * rb, int size) {
+ int align_length = sizeof(struct ringbuffer_block) + ((size + 3) & ~3);
+ int i;
+ for (i=0;i<2;i++) {
+ int free_size = 0;
+ struct ringbuffer_block * blk = block_ptr(rb, rb->head);
+ do {
+ if (blk->length >= sizeof(struct ringbuffer_block) && blk->id >= 0)
+ return NULL;
+ free_size += blk->length;
+ if (free_size >= align_length) {
+ return _alloc(rb, free_size , size);
+ }
+ blk = block_next(rb, blk);
+ } while(blk);
+ rb->head = 0;
+ }
+ return NULL;
+}
+
+static int
+_last_id(struct ringbuffer * rb) {
+ int i;
+ for (i=0;i<2;i++) {
+ struct ringbuffer_block * blk = block_ptr(rb, rb->head);
+ do {
+ if (blk->length >= sizeof(struct ringbuffer_block) && blk->id >= 0)
+ return blk->id;
+ blk = block_next(rb, blk);
+ } while(blk);
+ rb->head = 0;
+ }
+ return -1;
+}
+
+int
+ringbuffer_collect(struct ringbuffer * rb) {
+ int id = _last_id(rb);
+ struct ringbuffer_block *blk = block_ptr(rb, 0);
+ do {
+ if (blk->length >= sizeof(struct ringbuffer_block) && blk->id == id) {
+ blk->id = -1;
+ }
+ blk = block_next(rb, blk);
+ } while(blk);
+ return id;
+}
+
+void
+ringbuffer_resize(struct ringbuffer * rb, struct ringbuffer_block * blk, int size) {
+ int align_length = sizeof(struct ringbuffer_block) + ((size + 3) & ~3);
+ int old_length = (blk->length + 3) & ~3;
+ assert(align_length < old_length);
+ blk->length = size + sizeof(struct ringbuffer_block);
+ if (align_length == old_length) {
+ return;
+ }
+ blk = block_next(rb, blk);
+ blk->length = old_length - align_length;
+ if (blk->length >= sizeof(struct ringbuffer_block)) {
+ blk->id = -1;
+ }
+ rb->head = block_offset(rb, blk);
+}
+
+static int
+_block_id(struct ringbuffer_block * blk) {
+ assert(blk->length >= sizeof(struct ringbuffer_block));
+ int id = blk->id;
+ assert(id>=0);
+ return id;
+}
+
+void
+ringbuffer_free(struct ringbuffer * rb, struct ringbuffer_block * blk) {
+ if (blk == NULL)
+ return;
+ int id = _block_id(blk);
+ blk->id = -1;
+ while (blk->next >= 0) {
+ blk = block_ptr(rb, blk->next);
+ assert(_block_id(blk) == id);
+ blk->id = -1;
+ }
+}
+
+int
+ringbuffer_data(struct ringbuffer * rb, struct ringbuffer_block * blk, int size, int skip, void **ptr) {
+ int length = blk->length - sizeof(struct ringbuffer_block) - blk->offset;
+ for (;;) {
+ if (length > skip) {
+ if (length - skip >= size) {
+ char * start = (char *)(blk + 1);
+ *ptr = (start + blk->offset + skip);
+ return size;
+ }
+ *ptr = NULL;
+ int ret = length - skip;
+ while (blk->next >= 0) {
+ blk = block_ptr(rb, blk->next);
+ ret += blk->length - sizeof(struct ringbuffer_block);
+ if (ret >= size)
+ return size;
+ }
+ return ret;
+ }
+ assert(blk->next >= 0);
+ blk = block_ptr(rb, blk->next);
+ assert(blk->offset == 0);
+ skip -= length;
+ length = blk->length - sizeof(struct ringbuffer_block);
+ }
+}
+
+void *
+ringbuffer_copy(struct ringbuffer * rb, struct ringbuffer_block * from, int skip, struct ringbuffer_block * to) {
+ int size = to->length - sizeof(struct ringbuffer_block);
+ int length = from->length - sizeof(struct ringbuffer_block) - from->offset;
+ char * ptr = (char *)(to+1);
+ for (;;) {
+ if (length > skip) {
+ char * src = (char *)(from + 1);
+ src += from->offset + skip;
+ length -= skip;
+ while (length < size) {
+ memcpy(ptr, src, length);
+ assert(from->next >= 0);
+ from = block_ptr(rb , from->next);
+ assert(from->offset == 0);
+ ptr += length;
+ size -= length;
+ length = from->length;
+ src = (char *)(from + 1);
+ }
+ memcpy(ptr, src , size);
+ to->id = from->id;
+ return (char *)(to + 1);
+ }
+ assert(from->next >= 0);
+ from = block_ptr(rb, from->next);
+ assert(from->offset == 0);
+ skip -= length;
+ length = from->length - sizeof(struct ringbuffer_block);
+ }
+}
+
+struct ringbuffer_block *
+ringbuffer_yield(struct ringbuffer * rb, struct ringbuffer_block *blk, int skip) {
+ int length = blk->length - sizeof(struct ringbuffer_block) - blk->offset;
+ for (;;) {
+ if (length > skip) {
+ blk->offset += skip;
+ return blk;
+ }
+ blk->id = -1;
+ if (blk->next < 0) {
+ return NULL;
+ }
+ blk = block_ptr(rb, blk->next);
+ assert(blk->offset == 0);
+ skip -= length;
+ length = blk->length - sizeof(struct ringbuffer_block);
+ }
+}
+
+void
+ringbuffer_dump(struct ringbuffer * rb) {
+ struct ringbuffer_block *blk = block_ptr(rb,0);
+ int i=0;
+ printf("total size= %d\n",rb->size);
+ while (blk) {
+ ++i;
+ if (i>10)
+ break;
+ if (blk->length >= sizeof(*blk)) {
+ printf("[%lu : %d]", blk->length - sizeof(*blk), block_offset(rb,blk));
+ printf(" id=%d",blk->id);
+ if (blk->id >=0) {
+ printf(" offset=%d next=%d",blk->offset, blk->next);
+ }
+ } else {
+ printf("<%u : %d>", blk->length, block_offset(rb,blk));
+ }
+ printf("\n");
+ blk = block_next(rb, blk);
+ }
+}
27 ringbuffer.h
@@ -0,0 +1,27 @@
+#ifndef MREAD_RINGBUFFER_H
+#define MREAD_RINGBUFFER_H
+
+struct ringbuffer;
+
+struct ringbuffer_block {
+ int length;
+ int offset;
+ int id;
+ int next;
+};
+
+struct ringbuffer * ringbuffer_new(int size);
+void ringbuffer_delete(struct ringbuffer * rb);
+void ringbuffer_link(struct ringbuffer *rb , struct ringbuffer_block * prev, struct ringbuffer_block * next);
+struct ringbuffer_block * ringbuffer_alloc(struct ringbuffer * rb, int size);
+int ringbuffer_collect(struct ringbuffer * rb);
+void ringbuffer_resize(struct ringbuffer * rb, struct ringbuffer_block * blk, int size);
+void ringbuffer_free(struct ringbuffer * rb, struct ringbuffer_block * blk);
+int ringbuffer_data(struct ringbuffer * rb, struct ringbuffer_block * blk, int size, int skip, void **ptr);
+void * ringbuffer_copy(struct ringbuffer * rb, struct ringbuffer_block * from, int skip, struct ringbuffer_block * to);
+struct ringbuffer_block * ringbuffer_yield(struct ringbuffer * rb, struct ringbuffer_block *blk, int skip);
+
+void ringbuffer_dump(struct ringbuffer * rb);
+
+#endif
+
Please sign in to comment.
Something went wrong with that request. Please try again.