From badeeefea3f7acc7c49c17c397f5a10a38885374 Mon Sep 17 00:00:00 2001 From: "Alan T. DeKok" Date: Tue, 11 Nov 2014 14:28:19 -0500 Subject: [PATCH] Enable kqueue for the main event loop Which should be faster than select() for lots of sockets --- src/lib/event.c | 179 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 170 insertions(+), 9 deletions(-) diff --git a/src/lib/event.c b/src/lib/event.c index 278d53d96ea7..81d067e3d94f 100644 --- a/src/lib/event.c +++ b/src/lib/event.c @@ -28,6 +28,15 @@ RCSID("$Id$") #include #include +#ifdef HAVE_KQUEUE +#ifndef HAVE_SYS_EVENT_H +#error kqueue requires + +#else +#include +#endif +#endif /* HAVE_KQUEUE */ + typedef struct fr_event_fd_t { int fd; fr_event_fd_handler_t handler; @@ -35,14 +44,13 @@ typedef struct fr_event_fd_t { } fr_event_fd_t; #define FR_EV_MAX_FDS (256) + #undef USEC #define USEC (1000000) struct fr_event_list_t { fr_heap_t *times; - bool changed; - int exit; fr_event_status_t status; @@ -50,8 +58,16 @@ struct fr_event_list_t { struct timeval now; bool dispatch; - int max_readers; int num_readers; +#ifndef HAVE_KQUEUE + int max_readers; + + bool changed; + +#else + int kq; + struct kevent events[FR_EV_MAX_FDS]; /* so it doesn't go on the stack every time */ +#endif fr_event_fd_t readers[FR_EV_MAX_FDS]; }; @@ -93,6 +109,10 @@ static int _event_list_free(fr_event_list_t *list) fr_heap_delete(el->times); +#ifdef HAVE_KQUEUE + close(el->kq); +#endif + return 0; } @@ -118,9 +138,19 @@ fr_event_list_t *fr_event_list_create(TALLOC_CTX *ctx, fr_event_status_t status) el->readers[i].fd = -1; } - el->status = status; +#ifndef HAVE_KQUEUE el->changed = true; /* force re-set of fds's */ +#else + el->kq = kqueue(); + if (el->kq < 0) { + talloc_free(el); + return NULL; + } +#endif + + el->status = status; + return el; } @@ -307,12 +337,52 @@ int fr_event_fd_insert(fr_event_list_t *el, int type, int fd, return 0; } - if (el->max_readers >= FR_EV_MAX_FDS) { + if (el->num_readers >= FR_EV_MAX_FDS) { fr_strerror_printf("Too many readers"); return 0; } - ef = NULL; + +#ifdef HAVE_KQUEUE + /* + * We need to store TWO fields with the event. kqueue + * only lets us store one. If we put the two fields into + * a malloc'd structure, that would help. Except that + * kqueue can silently delete the event when the socket + * is closed, and not give us the opportunity to free it. + * + * + * The solution is to put the fields into an array, and + * do a linear search on addition/deletion of the FDs. + * However, to avoid MOST linear issues, we start off the + * search at "FD" offset. Since FDs are unique, AND + * usually less than 256, we do "FD & 0xff", which is a + * good guess, and makes the lookups mostly O(1). + */ + for (i = 0; i < FR_EV_MAX_FDS; i++) { + int j; + struct kevent evset; + + j = (i + fd) & (FR_EV_MAX_FDS - 1); + + if (el->readers[j].fd >= 0) continue; + + /* + * We want to read from the FD. + */ + EV_SET(&evset, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &el->readers[j]); + if (kevent(el->kq, &evset, 1, NULL, 0, NULL) < 0) { + fprintf(stderr, "SHIT %d %s\n", __LINE__, strerror(errno)); + return 0; + } + + ef = &el->readers[j]; + el->num_readers++; + break; + } + +#else /* HAVE_KQUEUE */ + for (i = 0; i <= el->max_readers; i++) { /* * Be fail-safe on multiple inserts. @@ -338,17 +408,20 @@ int fr_event_fd_insert(fr_event_list_t *el, int type, int fd, break; } } +#endif if (!ef) { fr_strerror_printf("Failed assigning FD"); return 0; } + ef->fd = fd; ef->handler = handler; ef->ctx = ctx; - ef->fd = fd; +#ifndef HAVE_KQUEUE el->changed = true; +#endif return 1; } @@ -361,6 +434,33 @@ int fr_event_fd_delete(fr_event_list_t *el, int type, int fd) if (type != 0) return 0; +#ifdef HAVE_KQUEUE + for (i = 0; i < FR_EV_MAX_FDS; i++) { + int j; + struct kevent evset; + + j = (i + fd) & (FR_EV_MAX_FDS - 1); + + if (el->readers[j].fd != fd) continue; + + /* + * Tell the kernel to delete it from the list. + * + * The caller MAY have closed it, in which case + * the kernel has removed it from the list. So + * we ignore the return code from kevent(). + */ + EV_SET(&evset, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + (void) kevent(el->kq, &evset, 1, NULL, 0, NULL); + + el->readers[j].fd = -1; + el->num_readers--; + + return 1; + } + +#else + for (i = 0; i < el->max_readers; i++) { if (el->readers[i].fd == fd) { el->readers[i].fd = -1; @@ -371,6 +471,7 @@ int fr_event_fd_delete(fr_event_list_t *el, int type, int fd) return 1; } } +#endif /* HAVE_KQUEUE */ return 0; } @@ -390,15 +491,22 @@ bool fr_event_loop_exiting(fr_event_list_t *el) int fr_event_loop(fr_event_list_t *el) { - int i, rcode, maxfd = 0; + int i, rcode; struct timeval when, *wake; +#ifdef HAVE_KQUEUE + struct timespec ts_when, *ts_wake; +#else + int maxfd = 0; fd_set read_fds, master_fds; + el->changed = true; +#endif + el->exit = 0; el->dispatch = true; - el->changed = true; while (!el->exit) { +#ifndef HAVE_KQUEUE /* * Cache the list of FD's to watch. */ @@ -419,6 +527,7 @@ int fr_event_loop(fr_event_list_t *el) el->changed = false; } +#endif /* HAVE_KQUEUE */ /* * Find the first event. If there's none, we wait @@ -467,6 +576,7 @@ int fr_event_loop(fr_event_list_t *el) */ if (el->status) el->status(wake); +#ifndef HAVE_KQUEUE read_fds = master_fds; rcode = select(maxfd + 1, &read_fds, NULL, NULL, wake); if ((rcode < 0) && (errno != EINTR)) { @@ -475,6 +585,19 @@ int fr_event_loop(fr_event_list_t *el) return -1; } +#else /* HAVE_KQUEUE */ + + if (wake) { + ts_wake = &ts_when; + ts_when.tv_sec = when.tv_sec; + ts_when.tv_nsec = when.tv_usec * 1000; + } else { + ts_wake = NULL; + } + + rcode = kevent(el->kq, NULL, 0, el->events, FR_EV_MAX_FDS, ts_wake); +#endif /* HAVE_KQUEUE */ + if (fr_heap_num_elements(el->times) > 0) { do { gettimeofday(&el->now, NULL); @@ -484,6 +607,11 @@ int fr_event_loop(fr_event_list_t *el) if (rcode <= 0) continue; +#ifndef HAVE_KQUEUE + /* + * Loop over all of the sockets to see if there's + * an event for that socket. + */ for (i = 0; i < el->max_readers; i++) { fr_event_fd_t *ef = &el->readers[i]; @@ -495,6 +623,39 @@ int fr_event_loop(fr_event_list_t *el) if (el->changed) break; } + +#else /* HAVE_KQUEUE */ + + /* + * Loop over all of the events, servicing them. + */ + for (i = 0; i < rcode; i++) { + if (el->events[i].flags & EV_EOF) { + fr_event_fd_t *ef = el->events[i].udata; + + /* + * FIXME: delete the handler + * here, and fix process.c to not + * call fr_event_fd_delete(). + * It's cleaner. + * + * Call the handler, which SHOULD + * delete the connection. + */ + ef->handler(el, ef->fd, ef->ctx); + continue; + } + + if (el->events[i].flags & EVFILT_READ) { + fr_event_fd_t *ef = el->events[i].udata; + + ef->handler(el, ef->fd, ef->ctx); + continue; + } + + /* else it's an unhandled event, which shouldn't happen */ + } +#endif /* HAVE_KQUEUE */ } el->dispatch = false;