Skip to content

Commit

Permalink
Enable kqueue for the main event loop
Browse files Browse the repository at this point in the history
Which should be faster than select() for lots of sockets
  • Loading branch information
alandekok committed Nov 11, 2014
1 parent 74d3c3f commit badeeef
Showing 1 changed file with 170 additions and 9 deletions.
179 changes: 170 additions & 9 deletions src/lib/event.c
Expand Up @@ -28,30 +28,46 @@ RCSID("$Id$")
#include <freeradius-devel/heap.h>
#include <freeradius-devel/event.h>

#ifdef HAVE_KQUEUE
#ifndef HAVE_SYS_EVENT_H
#error kqueue requires <sys/event.h>

#else
#include <sys/event.h>
#endif
#endif /* HAVE_KQUEUE */

typedef struct fr_event_fd_t {
int fd;
fr_event_fd_handler_t handler;
void *ctx;
} fr_event_fd_t;

#define FR_EV_MAX_FDS (256)

#undef USEC
#define USEC (1000000)

struct fr_event_list_t {
fr_heap_t *times;

bool changed;

int exit;

fr_event_status_t status;

struct timeval now;
bool dispatch;

int max_readers;
int num_readers;
#ifndef HAVE_KQUEUE
int max_readers;

bool changed;

#else
int kq;
struct kevent events[FR_EV_MAX_FDS]; /* so it doesn't go on the stack every time */
#endif
fr_event_fd_t readers[FR_EV_MAX_FDS];
};

Expand Down Expand Up @@ -93,6 +109,10 @@ static int _event_list_free(fr_event_list_t *list)

fr_heap_delete(el->times);

#ifdef HAVE_KQUEUE
close(el->kq);
#endif

return 0;
}

Expand All @@ -118,9 +138,19 @@ fr_event_list_t *fr_event_list_create(TALLOC_CTX *ctx, fr_event_status_t status)
el->readers[i].fd = -1;
}

el->status = status;
#ifndef HAVE_KQUEUE
el->changed = true; /* force re-set of fds's */

#else
el->kq = kqueue();
if (el->kq < 0) {
talloc_free(el);
return NULL;
}
#endif

el->status = status;

return el;
}

Expand Down Expand Up @@ -307,12 +337,52 @@ int fr_event_fd_insert(fr_event_list_t *el, int type, int fd,
return 0;
}

if (el->max_readers >= FR_EV_MAX_FDS) {
if (el->num_readers >= FR_EV_MAX_FDS) {
fr_strerror_printf("Too many readers");
return 0;
}

ef = NULL;

#ifdef HAVE_KQUEUE
/*
* We need to store TWO fields with the event. kqueue
* only lets us store one. If we put the two fields into
* a malloc'd structure, that would help. Except that
* kqueue can silently delete the event when the socket
* is closed, and not give us the opportunity to free it.
* <sigh>
*
* The solution is to put the fields into an array, and
* do a linear search on addition/deletion of the FDs.
* However, to avoid MOST linear issues, we start off the
* search at "FD" offset. Since FDs are unique, AND
* usually less than 256, we do "FD & 0xff", which is a
* good guess, and makes the lookups mostly O(1).
*/
for (i = 0; i < FR_EV_MAX_FDS; i++) {
int j;
struct kevent evset;

j = (i + fd) & (FR_EV_MAX_FDS - 1);

if (el->readers[j].fd >= 0) continue;

/*
* We want to read from the FD.
*/
EV_SET(&evset, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &el->readers[j]);
if (kevent(el->kq, &evset, 1, NULL, 0, NULL) < 0) {
fprintf(stderr, "SHIT %d %s\n", __LINE__, strerror(errno));
return 0;
}

ef = &el->readers[j];
el->num_readers++;
break;
}

#else /* HAVE_KQUEUE */

for (i = 0; i <= el->max_readers; i++) {
/*
* Be fail-safe on multiple inserts.
Expand All @@ -338,17 +408,20 @@ int fr_event_fd_insert(fr_event_list_t *el, int type, int fd,
break;
}
}
#endif

if (!ef) {
fr_strerror_printf("Failed assigning FD");
return 0;
}

ef->fd = fd;
ef->handler = handler;
ef->ctx = ctx;
ef->fd = fd;

#ifndef HAVE_KQUEUE
el->changed = true;
#endif

return 1;
}
Expand All @@ -361,6 +434,33 @@ int fr_event_fd_delete(fr_event_list_t *el, int type, int fd)

if (type != 0) return 0;

#ifdef HAVE_KQUEUE
for (i = 0; i < FR_EV_MAX_FDS; i++) {
int j;
struct kevent evset;

j = (i + fd) & (FR_EV_MAX_FDS - 1);

if (el->readers[j].fd != fd) continue;

/*
* Tell the kernel to delete it from the list.
*
* The caller MAY have closed it, in which case
* the kernel has removed it from the list. So
* we ignore the return code from kevent().
*/
EV_SET(&evset, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
(void) kevent(el->kq, &evset, 1, NULL, 0, NULL);

el->readers[j].fd = -1;
el->num_readers--;

return 1;
}

#else

for (i = 0; i < el->max_readers; i++) {
if (el->readers[i].fd == fd) {
el->readers[i].fd = -1;
Expand All @@ -371,6 +471,7 @@ int fr_event_fd_delete(fr_event_list_t *el, int type, int fd)
return 1;
}
}
#endif /* HAVE_KQUEUE */

return 0;
}
Expand All @@ -390,15 +491,22 @@ bool fr_event_loop_exiting(fr_event_list_t *el)

int fr_event_loop(fr_event_list_t *el)
{
int i, rcode, maxfd = 0;
int i, rcode;
struct timeval when, *wake;
#ifdef HAVE_KQUEUE
struct timespec ts_when, *ts_wake;
#else
int maxfd = 0;
fd_set read_fds, master_fds;

el->changed = true;
#endif

el->exit = 0;
el->dispatch = true;
el->changed = true;

while (!el->exit) {
#ifndef HAVE_KQUEUE
/*
* Cache the list of FD's to watch.
*/
Expand All @@ -419,6 +527,7 @@ int fr_event_loop(fr_event_list_t *el)

el->changed = false;
}
#endif /* HAVE_KQUEUE */

/*
* Find the first event. If there's none, we wait
Expand Down Expand Up @@ -467,6 +576,7 @@ int fr_event_loop(fr_event_list_t *el)
*/
if (el->status) el->status(wake);

#ifndef HAVE_KQUEUE
read_fds = master_fds;
rcode = select(maxfd + 1, &read_fds, NULL, NULL, wake);
if ((rcode < 0) && (errno != EINTR)) {
Expand All @@ -475,6 +585,19 @@ int fr_event_loop(fr_event_list_t *el)
return -1;
}

#else /* HAVE_KQUEUE */

if (wake) {
ts_wake = &ts_when;
ts_when.tv_sec = when.tv_sec;
ts_when.tv_nsec = when.tv_usec * 1000;
} else {
ts_wake = NULL;
}

rcode = kevent(el->kq, NULL, 0, el->events, FR_EV_MAX_FDS, ts_wake);
#endif /* HAVE_KQUEUE */

if (fr_heap_num_elements(el->times) > 0) {
do {
gettimeofday(&el->now, NULL);
Expand All @@ -484,6 +607,11 @@ int fr_event_loop(fr_event_list_t *el)

if (rcode <= 0) continue;

#ifndef HAVE_KQUEUE
/*
* Loop over all of the sockets to see if there's
* an event for that socket.
*/
for (i = 0; i < el->max_readers; i++) {
fr_event_fd_t *ef = &el->readers[i];

Expand All @@ -495,6 +623,39 @@ int fr_event_loop(fr_event_list_t *el)

if (el->changed) break;
}

#else /* HAVE_KQUEUE */

/*
* Loop over all of the events, servicing them.
*/
for (i = 0; i < rcode; i++) {
if (el->events[i].flags & EV_EOF) {
fr_event_fd_t *ef = el->events[i].udata;

/*
* FIXME: delete the handler
* here, and fix process.c to not
* call fr_event_fd_delete().
* It's cleaner.
*
* Call the handler, which SHOULD
* delete the connection.
*/
ef->handler(el, ef->fd, ef->ctx);
continue;
}

if (el->events[i].flags & EVFILT_READ) {
fr_event_fd_t *ef = el->events[i].udata;

ef->handler(el, ef->fd, ef->ctx);
continue;
}

/* else it's an unhandled event, which shouldn't happen */
}
#endif /* HAVE_KQUEUE */
}

el->dispatch = false;
Expand Down

0 comments on commit badeeef

Please sign in to comment.