From 171c235451dd16fe6bb29329c3562b2c741f4b1d Mon Sep 17 00:00:00 2001 From: Levente Uzonyi Date: Mon, 19 Oct 2020 01:44:37 +0200 Subject: [PATCH] Added epoll support for aio.c on unix When HAVE_CONFIG_H and HAVE_EPOLL is true (the former is pretty much always true), the select-based aio implementation is replaced with an epoll-based one. The epoll variant has a few advantages: - it is free of the lost readSemaphore signals bug (see the comments of Socket's various #waitFor* methods which wait on readSemaphore). - idle CPU usage is down from ~10% to ~2% - faster Socket operations Known issue: - when the VM quits, an error message appears from aioDisable() because the file descriptor about to be removed from the epoll set has already been closed, therefore it's not in the set. It can be worked around by supressing error messages when aioDisable() is called by aioFini() --- platforms/unix/vm/aio.c | 358 +++++++++++++++++++++++++++++++++------- 1 file changed, 299 insertions(+), 59 deletions(-) diff --git a/platforms/unix/vm/aio.c b/platforms/unix/vm/aio.c index f9d0c9409e..285ee1290f 100644 --- a/platforms/unix/vm/aio.c +++ b/platforms/unix/vm/aio.c @@ -33,6 +33,7 @@ #include "interp.h" /* For COGVM define */ #include "sqaio.h" #include "sqAssert.h" +#include #ifdef HAVE_CONFIG_H @@ -92,7 +93,6 @@ # include # include # include -# include # include # include # include @@ -112,7 +112,34 @@ long aioThisTick = 0; long aioDebugLogging = 0; #endif -#define _DO_FLAG_TYPE() do { _DO(AIO_R, rd) _DO(AIO_W, wr) _DO(AIO_X, ex) } while (0) +#if HAVE_CONFIG_H && HAVE_EPOLL +static int epollFd; +static struct epoll_event** epollEventsByFileDescriptor; +static size_t epollEventsByFileDescriptorSize; + +struct epollEventData { + int fd; + int aioMask; + aioHandler readHandler; + aioHandler writeHandler; + aioHandler exceptionHandler; + void* clientData; +}; + +const static int epollFlagsForAIOFlags[] = { + 0, + EPOLLPRI, // AIO_X + EPOLLIN, // AIO_R + EPOLLPRI | EPOLLIN, // AIO_RX + EPOLLOUT, // AIO_W + EPOLLPRI | EPOLLOUT, // AIO_WX + EPOLLIN | EPOLLOUT, // AIO_RW + EPOLLIN | EPOLLOUT | EPOLLPRI // AIO_RWX +}; + +#else + +# define _DO_FLAG_TYPE() do { _DO(AIO_R, rd) _DO(AIO_W, wr) _DO(AIO_X, ex) } while (0) static aioHandler rdHandler[FD_SETSIZE]; static aioHandler wrHandler[FD_SETSIZE]; @@ -126,7 +153,7 @@ static fd_set rdMask; /* handle read */ static fd_set wrMask; /* handle write */ static fd_set exMask; /* handle exception */ static fd_set xdMask; /* external descriptor */ - +#endif static void undefinedHandler(int fd, void *clientData, int flags) @@ -189,17 +216,42 @@ static stack_t signal_stack; static int stderrIsAFile = 0; // for pollpip to avoid cluttering logs +#if HAVE_CONFIG_H && HAVE_EPOLL +static void +growEpollEventsByFileDescriptorTo(size_t newSize) { + FPRINTF((stderr, "growEpollEventsByFileDescriptorTo(%lu)\n", newSize)); + if (newSize <= epollEventsByFileDescriptorSize) return; + struct epoll_event** newEpollEventsByFileDescriptor = (struct epoll_event**)realloc(epollEventsByFileDescriptor, newSize * sizeof(*epollEventsByFileDescriptor)); + if (!newEpollEventsByFileDescriptor) { + perror("growEpollEventsByFileDescriptorTo realloc"); + exit(1); // Cannot recover from here + } + epollEventsByFileDescriptor = newEpollEventsByFileDescriptor; + memset(epollEventsByFileDescriptor + epollEventsByFileDescriptorSize, 0, (newSize - epollEventsByFileDescriptorSize) * sizeof(*epollEventsByFileDescriptor)); + epollEventsByFileDescriptorSize = newSize; +} +#endif + void aioInit(void) { extern void forceInterruptCheck(int); /* not really, but hey */ - +#if HAVE_CONFIG_H && HAVE_EPOLL + epollFd = epoll_create1(EPOLL_CLOEXEC); + if (epollFd == -1) { + perror("epoll_create1 failed"); + exit(1); + } + epollEventsByFileDescriptorSize = 0; + epollEventsByFileDescriptor = NULL; +#else FD_ZERO(&fdMask); FD_ZERO(&rdMask); FD_ZERO(&wrMask); FD_ZERO(&exMask); FD_ZERO(&xdMask); maxFd = 0; +#endif stderrIsAFile = !isatty(fileno(stderr)); @@ -239,6 +291,18 @@ aioInit(void) void aioFini(void) { +#if HAVE_CONFIG_H && HAVE_EPOLL + for (int index = 0; index < epollEventsByFileDescriptorSize; ++index) { + struct epoll_event* event = epollEventsByFileDescriptor[index]; + if (event) { + struct epollEventData* data = event->data.ptr; + if (!(data->aioMask & AIO_EXT)) { + aioDisable(index); + close(index); + } + } + } +#else int fd; for (fd = 0; fd < maxFd; fd++) @@ -252,6 +316,7 @@ aioFini(void) } while (maxFd && !FD_ISSET(maxFd - 1, &fdMask)) --maxFd; +#endif signal(SIGPIPE, SIG_DFL); } @@ -292,37 +357,100 @@ do if ((bool) && !(++tickCount % TICKS_PER_CHAR)) { \ long aioPoll(long microSeconds) { +#if HAVE_CONFIG_H && HAVE_EPOLL + +# if AIO_DEBUG + struct sigaction current_sigio_action; + extern void forceInterruptCheck(int); /* not really, but hey */ +# endif + + DO_TICK(SHOULD_TICK()); + +# if defined(AIO_DEBUG) +# if AIO_DEBUG >= 2 + FPRINTF((stderr, "aioPoll(%ld)\n", microSeconds)); +# endif + // check that our signal handler is in place. + // If it isn't, things aren't right. + sigaction(SIGIO, NULL, ¤t_sigio_action); + assert(current_sigio_action.sa_handler == forceInterruptCheck); +# endif + /* + * get out early if there is no pending i/o and no need to relinquish + * cpu + */ + +# ifdef TARGET_OS_IS_IPHONE + if (epollEventsByFileDescriptorSize == 0) + return 0; +# else + if (epollEventsByFileDescriptorSize == 0 && microSeconds == 0) + return 0; +# endif + + do { + const unsigned long long start = ioUTCMicroseconds(); + const struct epoll_event events[128]; + const int eventsTriggered = epoll_wait(epollFd, (struct epoll_event*)events, 128, (int)microSeconds); + if (eventsTriggered == -1) { + if (errno != EINTR) { + perror("epoll_wait"); + return 0; + } + } else if (eventsTriggered > 0) { + for (int i = 0; i < eventsTriggered; ++i) { + const struct epoll_event event = events[i]; + struct epollEventData* data = (struct epollEventData*)(event.data.ptr); + if (event.events & EPOLLIN) { + (data->readHandler)(data->fd, data->clientData, AIO_R); + } + if (event.events & EPOLLOUT) { + (data->writeHandler)(data->fd, data->clientData, AIO_W); + } + if (event.events & (EPOLLPRI | EPOLLERR | EPOLLHUP)) { + (data->exceptionHandler)(data->fd, data->clientData, AIO_X); + } + } + return 1; + } else { // eventsTriggered == 0 + if (microSeconds > 0) addIdleUsecs(microSeconds); + return 0; + } + microSeconds -= max(ioUTCMicroseconds() - start, 1); + } while(microSeconds > 0); + return 0; +#else int fd; fd_set rd, wr, ex; unsigned long long us; -#if AIO_DEBUG +# if AIO_DEBUG struct sigaction current_sigio_action; extern void forceInterruptCheck(int); /* not really, but hey */ -#endif +# endif DO_TICK(SHOULD_TICK()); -#if defined(AIO_DEBUG) -# if AIO_DEBUG >= 2 +# if defined(AIO_DEBUG) +# if AIO_DEBUG >= 2 FPRINTF((stderr, "aioPoll(%ld)\n", microSeconds)); -# endif +# endif // check that our signal handler is in place. // If it isn't, things aren't right. sigaction(SIGIO, NULL, ¤t_sigio_action); assert(current_sigio_action.sa_handler == forceInterruptCheck); -#endif +# endif /* * get out early if there is no pending i/o and no need to relinquish * cpu */ -#ifdef TARGET_OS_IS_IPHONE +# ifdef TARGET_OS_IS_IPHONE if (maxFd == 0) return 0; -#else +# else if (maxFd == 0 && microSeconds == 0) return 0; -#endif +# endif rd = rdMask; wr = wrMask; @@ -357,8 +485,8 @@ aioPoll(long microSeconds) } for (fd = 0; fd < maxFd; ++fd) { -#undef _DO -#define _DO(FLAG, TYPE) \ +# undef _DO +# define _DO(FLAG, TYPE) \ if (FD_ISSET(fd, &TYPE)) { \ aioHandler handler= TYPE##Handler[fd]; \ FD_CLR(fd, &TYPE##Mask); \ @@ -368,6 +496,7 @@ aioPoll(long microSeconds) _DO_FLAG_TYPE(); } return 1; +#endif } @@ -402,6 +531,48 @@ aioSleepForUsecs(long microSeconds) return aioPoll(microSeconds); } +static void +makeFileDescriptorNonBlockingAndSetupSigio(int fd) { + int arg; + +#if defined(O_ASYNC) + if (fcntl(fd, F_SETOWN, getpid()) < 0) + perror("fcntl(F_SETOWN, getpid())"); + if ((arg = fcntl(fd, F_GETFL, 0)) < 0) + perror("fcntl(F_GETFL)"); + if (fcntl(fd, F_SETFL, arg | O_NONBLOCK | O_ASYNC) < 0) + perror("fcntl(F_SETFL, O_ASYNC)"); +# if defined(F_SETNOSIGPIPE) + if ((arg = fcntl(fd, F_SETNOSIGPIPE, 1)) < 0) + perror("fcntl(F_GETFL)"); +# endif + FPRINTF((stderr, "makeFileDescriptorNonBlockingAndSetupSigio(%d): Elicit SIGIO via O_ASYNC/fcntl\n", fd)); + +#elif defined(FASYNC) + if (fcntl(fd, F_SETOWN, getpid()) < 0) + perror("fcntl(F_SETOWN, getpid())"); + if ((arg = fcntl(fd, F_GETFL, 0)) < 0) + perror("fcntl(F_GETFL)"); + if (fcntl(fd, F_SETFL, arg | O_NONBLOCK | FASYNC) < 0) + perror("fcntl(F_SETFL, FASYNC)"); +# if defined(F_SETNOSIGPIPE) + if ((arg = fcntl(fd, F_SETNOSIGPIPE, 1)) < 0) + perror("fcntl(F_SETNOSIGPIPE)"); +# endif + FPRINTF((stderr, "makeFileDescriptorNonBlockingAndSetupSigio(%d): Elicit SIGIO via FASYNC/fcntl\n", fd)); + +#elif defined(FIOASYNC) + arg = getpid(); + if (ioctl(fd, SIOCSPGRP, &arg) < 0) + perror("ioctl(SIOCSPGRP, getpid())"); + arg = 1; + if (ioctl(fd, FIOASYNC, &arg) < 0) + perror("ioctl(FIOASYNC, 1)"); + FPRINTF((stderr, "makeFileDescriptorNonBlockingAndSetupSigio(%d): Elicit SIGIO via FIOASYNC/fcntl\n", fd)); +#else + FPRINTF((stderr, "makeFileDescriptorNonBlockingAndSetupSigio(%d): UNABLE TO ELICIT SIGIO!!\n", fd)); +#endif +} /* enable asynchronous notification for a descriptor */ @@ -409,11 +580,57 @@ void aioEnable(int fd, void *data, int flags) { if (fd < 0) { - FPRINTF((stderr, "aioEnable(%d): IGNORED\n", fd)); + FPRINTF((stderr, "aioEnable(%d, %p, %d): IGNORED\n", fd, data, flags)); + return; + } +#if HAVE_CONFIG_H && HAVE_EPOLL + if (fd >= epollEventsByFileDescriptorSize) { + FPRINTF((stderr, "aioEnable(%d): fd too large\n", fd)); + growEpollEventsByFileDescriptorTo(max(epollEventsByFileDescriptorSize * 2, (size_t)fd + 1)); + } else if (epollEventsByFileDescriptor[fd]) { + FPRINTF((stderr, "aioEnable: descriptor %d already enabled\n", fd)); + return; + } + FPRINTF((stderr, "aioEnable(%d, %p, %d): allocating new epoll_event\n", fd, data, flags)); + struct epoll_event* event = (struct epoll_event*)malloc(sizeof(struct epoll_event)); + if (!event) { + perror("aioEnable: malloc event"); + return; + } + FPRINTF((stderr, "aioEnable(%d, %p, %d): allocating new epollEventData\n", fd, data, flags)); + struct epollEventData* epollData = (struct epollEventData*)malloc(sizeof(struct epollEventData)); + if (!epollData) { + perror("aioEnable: malloc epollData"); + free(event); + return; + } + epollData->fd = fd; + epollData->aioMask = flags & AIO_EXT; + epollData->readHandler = undefinedHandler; + epollData->writeHandler = undefinedHandler; + epollData->exceptionHandler = undefinedHandler; + epollData->clientData = data; + event->events = 0; + event->data.ptr = epollData; + epollEventsByFileDescriptor[fd] = event; + if (flags & AIO_EXT) { + /* we should not set NBIO ourselves on external descriptors! */ + FPRINTF((stderr, "aioEnable(%d, %p, %d): external\n", fd, data, flags)); + } else { + FPRINTF((stderr, "aioEnable(%d, %p, %d): enable non-blocking io and sigio\n", fd, data, flags)); + /* + * enable non-blocking asynchronous i/o and delivery of SIGIO + * to the active process + */ + makeFileDescriptorNonBlockingAndSetupSigio(fd); + } +#else + if (fd >= FD_SETSIZE) { + FPRINTF((stderr, "aioEnable(%d): fd too large\n", fd)); return; } if (FD_ISSET(fd, &fdMask)) { - fprintf(stderr, "aioEnable: descriptor %d already enabled\n", fd); + FPRINTF((stderr, "aioEnable: descriptor %d already enabled\n", fd)); return; } clientData[fd] = data; @@ -434,48 +651,11 @@ aioEnable(int fd, void *data, int flags) * enable non-blocking asynchronous i/o and delivery of SIGIO * to the active process */ - int arg; FD_CLR(fd, &xdMask); - -#if defined(O_ASYNC) - if (fcntl(fd, F_SETOWN, getpid()) < 0) - perror("fcntl(F_SETOWN, getpid())"); - if ((arg = fcntl(fd, F_GETFL, 0)) < 0) - perror("fcntl(F_GETFL)"); - if (fcntl(fd, F_SETFL, arg | O_NONBLOCK | O_ASYNC) < 0) - perror("fcntl(F_SETFL, O_ASYNC)"); -# if defined(F_SETNOSIGPIPE) - if ((arg = fcntl(fd, F_SETNOSIGPIPE, 1)) < 0) - perror("fcntl(F_GETFL)"); -# endif - FPRINTF((stderr, "aioEnable(%d): Elicit SIGIO via O_ASYNC/fcntl\n", fd)); - -#elif defined(FASYNC) - if (fcntl(fd, F_SETOWN, getpid()) < 0) - perror("fcntl(F_SETOWN, getpid())"); - if ((arg = fcntl(fd, F_GETFL, 0)) < 0) - perror("fcntl(F_GETFL)"); - if (fcntl(fd, F_SETFL, arg | O_NONBLOCK | FASYNC) < 0) - perror("fcntl(F_SETFL, FASYNC)"); -# if defined(F_SETNOSIGPIPE) - if ((arg = fcntl(fd, F_SETNOSIGPIPE, 1)) < 0) - perror("fcntl(F_GETFL)"); -# endif - FPRINTF((stderr, "aioEnable(%d): Elicit SIGIO via FASYNC/fcntl\n", fd)); - -#elif defined(FIOASYNC) - arg = getpid(); - if (ioctl(fd, TIOCSPGRP, &arg) < 0) - perror("ioctl(TIOCSPGRP, getpid())"); - arg = 1; - if (ioctl(fd, FIOASYNC, &arg) < 0) - perror("ioctl(FIOASYNC, 1)"); - FPRINTF((stderr, "aioEnable(%d): Elicit SIGIO via FIOASYNC/ioctl\n", fd)); -#else - FPRINTF((stderr, "aioEnable(%d): UNABLE TO ELICIT SIGIO!!\n", fd)); -#endif + makeFileDescriptorNonBlockingAndSetupSigio(fd); } +#endif } #if defined(AIO_DEBUG) @@ -523,16 +703,40 @@ aioHandle(int fd, aioHandler handlerFn, int mask) FPRINTF((stderr, "aioHandle(%d): IGNORED\n", fd)); return; } -#undef _DO -#define _DO(FLAG, TYPE) \ +#if HAVE_CONFIG_H && HAVE_EPOLL + if (fd >= epollEventsByFileDescriptorSize || !epollEventsByFileDescriptor[fd]) { + FPRINTF((stderr, "aioHandle(%d): NOT ENABLED\n", fd)); + return; + } + assert((mask & (AIO_RWX | AIO_EXT)) == mask); + struct epoll_event* event = epollEventsByFileDescriptor[fd]; + struct epollEventData* data = event->data.ptr; + assert(data->fd == fd); + if (mask & AIO_R) data->readHandler = handlerFn; + if (mask & AIO_W) data->writeHandler = handlerFn; + if (mask & AIO_X) data->exceptionHandler = handlerFn; + event->events = epollFlagsForAIOFlags[mask & AIO_RWX] | (mask & AIO_EXT ? EPOLLET : 0); + int epoll_operation = data->aioMask & AIO_RWX + ? EPOLL_CTL_MOD + : EPOLL_CTL_ADD; + data->aioMask |= mask; + if (epoll_ctl(epollFd, epoll_operation, fd, event) == -1) { + perror("epoll_ctl"); + FPRINTF((stderr, "aioHandle(%d, %p, %d): epoll_ctl(%d, %d, %d, %p) failed\n", fd, handlerFn, mask, epollFd, epoll_operation, fd, event)); + } else { + FPRINTF((stderr, "aioHandle(%d, %p, %d): epoll_ctl(%d, %d, %d, %p) succeeded\n", fd, handlerFn, mask, epollFd, epoll_operation, fd, event)); + } +#else +# undef _DO +# define _DO(FLAG, TYPE) \ if (mask & FLAG) { \ FD_SET(fd, &TYPE##Mask); \ TYPE##Handler[fd]= handlerFn; \ } _DO_FLAG_TYPE(); +#endif } - /* temporarily suspend asynchronous notification for a descriptor */ void @@ -543,6 +747,24 @@ aioSuspend(int fd, int mask) return; } FPRINTF((stderr, "aioSuspend(%d)\n", fd)); +#if HAVE_CONFIG_H && HAVE_EPOLL + if (fd >= epollEventsByFileDescriptorSize || !epollEventsByFileDescriptor[fd]) { + FPRINTF((stderr, "aioSuspend(%d): NOT ENABLED\n", fd)); + return; + } + struct epoll_event* event = epollEventsByFileDescriptor[fd]; + struct epollEventData* data = event->data.ptr; + if (data->aioMask & mask) { + data->aioMask &= ~mask; + event->events = epollFlagsForAIOFlags[data->aioMask & AIO_RWX]; + if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, epollEventsByFileDescriptor[fd]) == -1) { + perror("epoll_ctl"); + } + FPRINTF((stderr, "aioSuspend(%d, %d): SUSPENDED\n", fd, mask)); + } else { + FPRINTF((stderr, "aioSuspend(%d, %d): NOTHING TO SUSPEND\n", fd, mask)); + } +#else #undef _DO #define _DO(FLAG, TYPE) \ if (mask & FLAG) { \ @@ -550,6 +772,7 @@ aioSuspend(int fd, int mask) TYPE##Handler[fd]= undefinedHandler; \ } _DO_FLAG_TYPE(); +#endif } @@ -563,6 +786,22 @@ aioDisable(int fd) return; } FPRINTF((stderr, "aioDisable(%d)\n", fd)); +#if HAVE_CONFIG_H && HAVE_EPOLL + if (fd >= epollEventsByFileDescriptorSize || !epollEventsByFileDescriptor[fd]) { + FPRINTF((stderr, "aioDisable(%d): NOT ENABLED\n", fd)); + return; + } + if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, NULL) == -1) { + if (errno != ENOENT) { // When a socket is closed, it is automatically removed from the epoll set. Ignore those errors. + perror("aioDisable: epoll_ctl"); + } + } + struct epoll_event* event = epollEventsByFileDescriptor[fd]; + assert(event->data.ptr != NULL); + free(event->data.ptr); + free(event); + epollEventsByFileDescriptor[fd] = NULL; +#else aioSuspend(fd, AIO_RWX); FD_CLR(fd, &xdMask); FD_CLR(fd, &fdMask); @@ -571,4 +810,5 @@ aioDisable(int fd) /* keep maxFd accurate (drops to zero if no more sockets) */ while (maxFd && !FD_ISSET(maxFd - 1, &fdMask)) --maxFd; +#endif }