Skip to content
Browse files

Add evloop, btpd's new event loop. This will replace libevent.

  • Loading branch information...
1 parent 4457c12 commit 59905999ce145a81e0003766d468945c2444a90e @rmn64k rmn64k committed Jan 9, 2009
Showing with 722 additions and 0 deletions.
  1. +8 −0 evloop/Makefile.am
  2. +118 −0 evloop/epoll.c
  3. +57 −0 evloop/evloop.h
  4. +122 −0 evloop/kqueue.c
  5. +142 −0 evloop/poll.c
  6. +152 −0 evloop/timeheap.c
  7. +19 −0 evloop/timeheap.h
  8. +104 −0 evloop/timer.c
View
8 evloop/Makefile.am
@@ -0,0 +1,8 @@
+noinst_LIBRARIES=libevloop.a
+EXTRA_libevloop_a_SOURCES=epoll.c poll.c
+libevloop_a_SOURCES=\
+ evloop.h\
+ timeheap.c timeheap.h timer.c
+CFLAGS=@CFLAGS@ -D@EVLOOP_METHOD@ -I$(top_srcdir)/misc
+libevloop_a_LIBADD=@EVLOOP_IMPL@
+libevloop_a_DEPENDENCIES=@EVLOOP_IMPL@
View
118 evloop/epoll.c
@@ -0,0 +1,118 @@
+#include <sys/epoll.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "evloop.h"
+
+static int m_epfd;
+
+static struct epoll_event m_evs[100];
+static uint8_t m_valid[100];
+
+int
+evloop_init(void)
+{
+ if (timeheap_init() != 0)
+ return -1;
+ m_epfd = epoll_create(getdtablesize());
+ return m_epfd >= 0 ? 0 : -1;
+}
+
+int
+fdev_new(struct fdev *ev, int fd, uint16_t flags, evloop_cb_t cb, void *arg)
+{
+ ev->fd = fd;
+ ev->cb = cb;
+ ev->arg = arg;
+ ev->flags = 0;
+ ev->index = -1;
+ return fdev_enable(ev, flags);
+}
+
+int
+fdev_enable(struct fdev *ev, uint16_t flags)
+{
+ struct epoll_event epev;
+ int err = 0;
+ uint16_t sf = ev->flags;
+ ev->flags |= flags;
+ if (sf != ev->flags) {
+ epev.data.ptr = ev;
+ epev.events =
+ ((ev->flags & EV_READ) ? EPOLLIN : 0) |
+ ((ev->flags & EV_WRITE) ? EPOLLOUT : 0);
+ if (sf == 0)
+ err = epoll_ctl(m_epfd, EPOLL_CTL_ADD, ev->fd, &epev);
+ else
+ err = epoll_ctl(m_epfd, EPOLL_CTL_MOD, ev->fd, &epev);
+ }
+ return err;
+}
+
+int
+fdev_disable(struct fdev *ev, uint16_t flags)
+{
+ struct epoll_event epev;
+ int err = 0;
+ uint16_t sf = ev->flags;
+ ev->flags &= ~flags;
+ if (sf != ev->flags) {
+ epev.data.ptr = ev;
+ epev.events =
+ ((ev->flags & EV_READ) ? EPOLLIN : 0) |
+ ((ev->flags & EV_WRITE) ? EPOLLOUT : 0);
+ if (ev->flags == 0)
+ err = epoll_ctl(m_epfd, EPOLL_CTL_DEL, ev->fd, &epev);
+ else
+ err = epoll_ctl(m_epfd, EPOLL_CTL_MOD, ev->fd, &epev);
+ }
+ return err;
+}
+
+int
+fdev_del(struct fdev *ev)
+{
+ if (ev->index >= 0)
+ m_valid[ev->index] = 0;
+ return fdev_disable(ev, EV_READ|EV_WRITE);
+}
+
+int
+evloop(void)
+{
+ int nev, i, millisecs;
+ struct timespec delay;
+ while (1) {
+ timers_run();
+ delay = timer_delay();
+ if (delay.tv_sec >= 0)
+ millisecs = delay.tv_sec * 1000 + delay.tv_nsec / 1000000;
+ else
+ millisecs = -1;
+
+ if ((nev = epoll_wait(m_epfd, m_evs, 100, millisecs)) < 0) {
+ if (errno == EINTR)
+ continue;
+ else
+ return -1;
+ }
+ memset(m_valid, 1, nev);
+ for (i = 0; i < nev; i++) {
+ struct fdev *ev = m_evs[i].data.ptr;
+ ev->index = i;
+ }
+ for (i = 0; i < nev; i++) {
+ struct fdev *ev = m_evs[i].data.ptr;
+ if ((m_valid[i] &&
+ ev->flags & EV_READ &&
+ m_evs[i].events & (EPOLLIN|EPOLLERR|EPOLLHUP)))
+ ev->cb(ev->fd, EV_READ, ev->arg);
+ if ((m_valid[i] && ev->flags & EV_WRITE &&
+ m_evs[i].events & (EPOLLOUT|EPOLLERR|EPOLLHUP)))
+ ev->cb(ev->fd, EV_WRITE, ev->arg);
+ if (m_valid[i])
+ ev->index = -1;
+ }
+ }
+}
View
57 evloop/evloop.h
@@ -0,0 +1,57 @@
+#ifndef BTPD_EVLOOP_H
+#define BTPD_EVLOOP_H
+
+#include <sys/time.h>
+#include <stdint.h>
+
+#include "timeheap.h"
+
+#define EV_READ 1
+#define EV_WRITE 2
+#define EV_TIMEOUT 3
+
+typedef void (*evloop_cb_t)(int fd, short type, void *arg);
+
+#if defined(EVLOOP_EPOLL) || defined(EVLOOP_KQUEUE)
+
+struct fdev {
+ evloop_cb_t cb;
+ void *arg;
+ int fd;
+ uint16_t flags;
+ int16_t index;
+};
+
+#elif defined(EVLOOP_POLL)
+
+struct fdev {
+ int i;
+};
+
+#else
+#error No evloop method defined.
+#endif
+
+struct timeout {
+ evloop_cb_t cb;
+ void *arg;
+ struct th_handle th;
+};
+
+int evloop_init(void);
+int evloop(void);
+
+int fdev_new(struct fdev *ev, int fd, uint16_t flags, evloop_cb_t cb,
+ void *arg);
+int fdev_del(struct fdev *ev);
+int fdev_enable(struct fdev *ev, uint16_t flags);
+int fdev_disable(struct fdev *ev, uint16_t flags);
+
+void timer_init(struct timeout *, evloop_cb_t, void *);
+int timer_add(struct timeout *, struct timespec *);
+void timer_del(struct timeout *);
+
+void timers_run(void);
+struct timespec timer_delay(void);
+
+#endif
View
122 evloop/kqueue.c
@@ -0,0 +1,122 @@
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "evloop.h"
+
+static int m_kq;
+
+static struct kevent m_evs[100];
+static uint8_t m_valid[100];
+
+int
+evloop_init(void)
+{
+ if (timeheap_init() != 0)
+ return -1;
+ m_kq = kqueue();
+ return m_kq >= 0 ? 0 : -1;
+}
+
+int
+fdev_new(struct fdev *ev, int fd, uint16_t flags, evloop_cb_t cb, void *arg)
+{
+ ev->fd = fd;
+ ev->cb = cb;
+ ev->arg = arg;
+ ev->flags = 0;
+ ev->index = -1;
+ return fdev_enable(ev, flags);
+}
+
+int
+fdev_enable(struct fdev *ev, uint16_t flags)
+{
+ struct kevent kev[2], *kp = NULL;
+ int count = 0;
+ uint16_t sf = ev->flags;
+ ev->flags |= flags;
+ if ((sf & EV_READ) == 0 && (flags & EV_READ) != 0) {
+ EV_SET(&kev[0], ev->fd, EVFILT_READ, EV_ADD, 0, 0, ev);
+ kp = kev;
+ count = 1;
+ }
+ if ((sf & EV_WRITE) == 0 && (flags & EV_WRITE) != 0) {
+ EV_SET(&kev[1], ev->fd, EVFILT_WRITE, EV_ADD, 0, 0, ev);
+ if (count == 0)
+ kp = &kev[1];
+ count++;
+ }
+ return count > 0 ? kevent(m_kq, kp, count, NULL, 0, NULL) : 0;
+}
+
+int
+fdev_disable(struct fdev *ev, uint16_t flags)
+{
+ struct kevent kev[2], *kp = NULL;
+ int count = 0;
+ uint16_t sf = ev->flags;
+ ev->flags &= ~flags;
+ if ((sf & EV_READ) != 0 && (flags & EV_READ) != 0) {
+ EV_SET(&kev[0], ev->fd, EVFILT_READ, EV_DELETE, 0, 0, ev);
+ kp = kev;
+ count = 1;
+ }
+ if ((sf & EV_WRITE) != 0 && (flags & EV_WRITE) != 0) {
+ EV_SET(&kev[1], ev->fd, EVFILT_WRITE, EV_DELETE, 0, 0, ev);
+ if (count == 0)
+ kp = &kev[1];
+ count++;
+ }
+ return count > 0 ? kevent(m_kq, kp, count, NULL, 0, NULL) : 0;
+}
+
+int
+fdev_del(struct fdev *ev)
+{
+ if (ev->index >= 0)
+ m_valid[ev->index] = 0;
+ return fdev_disable(ev, EV_READ|EV_WRITE);
+}
+
+int
+evloop(void)
+{
+ int nev, i;
+ struct timespec delay;
+ while (1) {
+ timers_run();
+ delay = timer_delay();
+
+ if ((nev = kevent(m_kq, NULL, 0, m_evs, 100, &delay)) < 0) {
+ if (errno == EINTR)
+ continue;
+ else
+ return -1;
+ }
+ memset(m_valid, 1, nev);
+ for (i = 0; i < nev; i++) {
+ struct fdev *ev = (struct fdev *)m_evs[i].udata;
+ ev->index = i;
+ }
+ for (i = 0; i < nev; i++) {
+ if (m_evs[i].flags & EV_ERROR) {
+ errno = m_evs[i].data;
+ return -1;
+ }
+ struct fdev *ev = (struct fdev *)m_evs[i].udata;
+ if (m_valid[i] && ev->flags & EV_READ &&
+ m_evs[i].filter == EVFILT_READ)
+ ev->cb(ev->fd, EV_READ, ev->arg);
+ if (m_valid[i] && ev->flags & EV_WRITE &&
+ m_evs[i].filter == EVFILT_WRITE)
+ ev->cb(ev->fd, EV_WRITE, ev->arg);
+ if (m_valid[i])
+ ev->index = -1;
+ }
+ }
+}
View
142 evloop/poll.c
@@ -0,0 +1,142 @@
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "evloop.h"
+
+#define POLL_INIT_SIZE 64
+
+struct poll_ev {
+ struct fdev *ev;
+ evloop_cb_t cb;
+ void *arg;
+};
+
+static struct pollfd *m_pfds;
+static struct poll_ev *m_pevs;
+
+static int m_cap, m_size;
+static int m_cur = -1, m_curdel;
+
+static int
+poll_grow(void)
+{
+ int ncap = m_cap * 2;
+ struct pollfd *nm_pfds = realloc(m_pfds, ncap * sizeof(*m_pfds));
+ struct poll_ev *nm_pevs = realloc(m_pevs, ncap * sizeof(*m_pevs));
+ if (nm_pfds != NULL)
+ m_pfds = nm_pfds;
+ if (nm_pevs != NULL)
+ m_pevs = nm_pevs;
+ if (nm_pfds == NULL || nm_pevs == NULL)
+ return errno;
+ m_cap = ncap;
+ return 0;
+}
+
+int
+evloop_init(void)
+{
+ if (timeheap_init() != 0)
+ return -1;
+ m_cap = POLL_INIT_SIZE;
+ m_size = 0;
+ if ((m_pfds = calloc(m_cap, sizeof(*m_pfds))) == NULL)
+ return -1;
+ if ((m_pevs = calloc(m_cap, sizeof(*m_pevs))) == NULL) {
+ free(m_pfds);
+ return -1;
+ }
+ return 0;
+}
+
+int
+fdev_new(struct fdev *ev, int fd, uint16_t flags, evloop_cb_t cb, void *arg)
+{
+ if (m_size == m_cap && poll_grow() != 0)
+ return errno;
+ ev->i = m_size;
+ m_size++;
+ m_pfds[ev->i].fd = fd;
+ m_pfds[ev->i].events =
+ ((flags & EV_READ) ? POLLIN : 0) |
+ ((flags & EV_WRITE) ? POLLOUT : 0);
+ m_pevs[ev->i].ev = ev;
+ m_pevs[ev->i].cb = cb;
+ m_pevs[ev->i].arg = arg;
+ return 0;
+}
+
+int
+fdev_enable(struct fdev *ev, uint16_t flags)
+{
+ m_pfds[ev->i].events |=
+ ((flags & EV_READ) ? POLLIN : 0) |
+ ((flags & EV_WRITE) ? POLLOUT : 0);
+ return 0;
+}
+
+int
+fdev_disable(struct fdev *ev, uint16_t flags)
+{
+ short pflags =
+ ((flags & EV_READ) ? POLLIN : 0) |
+ ((flags & EV_WRITE) ? POLLOUT : 0);
+ m_pfds[ev->i].events &= ~pflags;
+ return 0;
+}
+
+int
+fdev_del(struct fdev *ev)
+{
+ assert(ev->i < m_size);
+ m_size--;
+ m_pfds[ev->i] = m_pfds[m_size];
+ m_pevs[ev->i] = m_pevs[m_size];
+ m_pevs[ev->i].ev->i = ev->i;
+ if (ev->i == m_cur)
+ m_curdel = 1;
+ return 0;
+}
+
+int
+evloop(void)
+{
+ int millisecs;
+ struct timespec delay;
+ while (1) {
+ timers_run();
+
+ delay = timer_delay();
+ if (delay.tv_sec >= 0)
+ millisecs = delay.tv_sec * 1000 + delay.tv_nsec / 1000000;
+ else
+ millisecs = -1;
+
+ if (poll(m_pfds, m_size, millisecs) < 0) {
+ if (errno == EINTR)
+ continue;
+ else
+ return -1;
+ }
+
+ m_cur = 0;
+ while (m_cur < m_size) {
+ struct pollfd *pfd = &m_pfds[m_cur];
+ struct poll_ev *pev = &m_pevs[m_cur];
+ if ((pfd->events & POLLIN &&
+ pfd->revents & (POLLIN|POLLERR|POLLHUP)))
+ pev->cb(pfd->fd, EV_READ, pev->arg);
+ if ((!m_curdel && pfd->events & POLLOUT &&
+ pfd->revents & (POLLOUT|POLLERR|POLLHUP)))
+ pev->cb(pfd->fd, EV_WRITE, pev->arg);
+ if (!m_curdel)
+ m_cur++;
+ else
+ m_curdel = 0;
+ }
+ m_cur = -1;
+ }
+}
View
152 evloop/timeheap.c
@@ -0,0 +1,152 @@
+#include <sys/time.h>
+#include <assert.h>
+#include <stdlib.h>
+
+#include "timeheap.h"
+
+struct th_entry {
+ struct timespec t;
+ struct th_handle *h;
+};
+
+static struct th_entry *heap;
+static int heap_cap;
+static int heap_use;
+
+static int
+cmptime_lt(struct timespec a, struct timespec b)
+{
+ if (a.tv_sec == b.tv_sec)
+ return a.tv_nsec < b.tv_nsec;
+ else
+ return a.tv_sec < b.tv_sec;
+}
+
+static int
+cmpentry_lt(int a, int b)
+{
+ return cmptime_lt(heap[a].t, heap[b].t);
+}
+
+static void
+swap(int i, int j)
+{
+ struct th_entry tmp = heap[i];
+ heap[i] = heap[j];
+ heap[i].h->i = i;
+ heap[j] = tmp;
+ heap[j].h->i = j;
+}
+
+static void
+bubble_up(int i)
+{
+ while (i != 0) {
+ int p = (i-1)/2;
+ if (cmpentry_lt(i, p)) {
+ swap(i, p);
+ i = p;
+ } else
+ return;
+ }
+}
+
+static void
+bubble_down(int i)
+{
+ int li, ri, ci;
+loop:
+ li = 2*i+1;
+ ri = 2*i+2;
+ if (ri < heap_use)
+ ci = cmpentry_lt(li, ri) ? li : ri;
+ else if (li < heap_use)
+ ci = li;
+ else
+ return;
+ if (cmpentry_lt(ci, i)) {
+ swap(i, ci);
+ i = ci;
+ goto loop;
+ }
+}
+
+int
+timeheap_init(void)
+{
+ heap_cap = 10;
+ heap_use = 0;
+ if ((heap = malloc(sizeof(struct th_entry) * heap_cap)) == NULL)
+ return -1;
+ else
+ return 0;
+}
+
+int
+timeheap_size(void)
+{
+ return heap_use;
+}
+
+int
+timeheap_insert(struct th_handle *h, struct timespec *t)
+{
+ if (heap_use == heap_cap) {
+ int ncap = heap_cap * 2;
+ struct th_entry *nheap = realloc(heap, ncap * sizeof(struct th_entry));
+ if (nheap == NULL)
+ return -1;
+ heap_cap = ncap;
+ heap = nheap;
+ }
+ heap[heap_use].t = *t;
+ heap[heap_use].h = h;
+ h->i = heap_use;
+ heap_use++;
+ bubble_up(h->i);
+ return 0;
+}
+
+void
+timeheap_remove(struct th_handle *h)
+{
+ assert(h->i >= 0 && h->i < heap_use);
+ heap_use--;
+ if (heap_use > 0) {
+ int i = h->i;
+ int earlier = cmpentry_lt(heap_use, i);
+ heap[i] = heap[heap_use];
+ heap[i].h->i = i;
+ if (earlier)
+ bubble_up(i);
+ else
+ bubble_down(i);
+ }
+}
+
+void
+timeheap_change(struct th_handle *h, struct timespec *t)
+{
+ assert(h->i >= 0 && h->i < heap_use);
+ int earlier = cmptime_lt(*t, heap[h->i].t);
+ heap[h->i].t = *t;
+ if (earlier)
+ bubble_up(h->i);
+ else
+ bubble_down(h->i);
+}
+
+struct timespec
+timeheap_top(void)
+{
+ return heap[0].t;
+}
+
+void *
+timeheap_remove_top(void)
+{
+ void *ret = heap[0].h->data;
+ struct th_handle h = { 0, NULL };
+ timeheap_remove(&h);
+ return ret;
+}
View
19 evloop/timeheap.h
@@ -0,0 +1,19 @@
+#ifndef BTPD_TIMEHEAP_H
+#define BTPD_TIMEHEAP_H
+
+struct th_handle {
+ int i;
+ void *data;
+};
+
+int timeheap_init(void);
+int timeheap_size(void);
+
+int timeheap_insert(struct th_handle *h, struct timespec *t);
+void timeheap_remove(struct th_handle *h);
+void timeheap_change(struct th_handle *h, struct timespec *t);
+
+void *timeheap_remove_top(void);
+struct timespec timeheap_top(void);
+
+#endif
View
104 evloop/timer.c
@@ -0,0 +1,104 @@
+#include <time.h>
+
+#include "evloop.h"
+#include "timeheap.h"
+
+#if defined(CLOCK_MONOTONIC_FAST)
+#define TIMER_CLOCK CLOCK_MONOTONIC_FAST
+#elif defined(CLOCK_MONOTONIC)
+#define TIMER_CLOCK CLOCK_MONOTONIC
+#else
+#error CLOCK_MONOTONIC needed!
+#endif
+
+static struct timespec
+addtime(struct timespec a, struct timespec b)
+{
+ struct timespec ret;
+ ret.tv_sec = a.tv_sec + b.tv_sec;
+ ret.tv_nsec = a.tv_nsec + b.tv_nsec;
+ if (ret.tv_nsec >= 1000000000) {
+ ret.tv_sec += 1;
+ ret.tv_nsec -= 1000000000;
+ }
+ return ret;
+}
+
+static struct timespec
+subtime(struct timespec a, struct timespec b)
+{
+ struct timespec ret;
+ ret.tv_sec = a.tv_sec - b.tv_sec;
+ ret.tv_nsec = a.tv_nsec - b.tv_nsec;
+ if (ret.tv_nsec < 0) {
+ ret.tv_sec -= 1;
+ ret.tv_nsec += 1000000000;
+ }
+ return ret;
+}
+
+void
+timer_init(struct timeout *h, evloop_cb_t cb, void *arg)
+{
+ h->cb = cb;
+ h->arg = arg;
+ h->th.i = -1;
+ h->th.data = h;
+}
+
+int
+timer_add(struct timeout *h, struct timespec *t)
+{
+ struct timespec now, sum;
+ clock_gettime(TIMER_CLOCK, &now);
+ sum = addtime(now, *t);
+ if (h->th.i == -1)
+ return timeheap_insert(&h->th, &sum);
+ else {
+ timeheap_change(&h->th, &sum);
+ return 0;
+ }
+}
+
+void
+timer_del(struct timeout *h)
+{
+ if (h->th.i >= 0) {
+ timeheap_remove(&h->th);
+ h->th.i = -1;
+ }
+}
+
+void
+timers_run(void)
+{
+ struct timespec now;
+ clock_gettime(TIMER_CLOCK, &now);
+ while (timeheap_size() > 0) {
+ struct timespec diff = subtime(timeheap_top(), now);
+ if (diff.tv_sec < 0) {
+ struct timeout *t = timeheap_remove_top();
+ t->th.i = -1;
+ t->cb(-1, EV_TIMEOUT, t->arg);
+ } else
+ break;
+ }
+}
+
+struct timespec
+timer_delay(void)
+{
+ struct timespec now, diff;
+ if (timeheap_size() == 0) {
+ diff.tv_sec = -1;
+ diff.tv_nsec = 0;
+ } else {
+ clock_gettime(TIMER_CLOCK, &now);
+ diff = subtime(timeheap_top(), now);
+ if (diff.tv_sec < 0) {
+ diff.tv_sec = 0;
+ diff.tv_nsec = 0;
+ }
+ }
+ return diff;
+}

0 comments on commit 5990599

Please sign in to comment.
Something went wrong with that request. Please try again.