Skip to content
This repository

Support for Illumos event ports #510

Merged
merged 3 commits into from almost 2 years ago

3 participants

Pieter Noordhuis Salvatore Sanfilippo David Pacheco
Pieter Noordhuis
Collaborator
pietern commented May 14, 2012

This drastically improves performance high concurrency.

Many thanks to @davepacheco and @joyent for contributing.

Salvatore Sanfilippo antirez merged commit 4042235 into from May 15, 2012
Salvatore Sanfilippo antirez closed this May 15, 2012
Salvatore Sanfilippo
Owner
antirez commented May 15, 2012

Thanks! Merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
14  src/ae.c
@@ -44,13 +44,17 @@
44 44
 
45 45
 /* Include the best multiplexing layer supported by this system.
46 46
  * The following should be ordered by performances, descending. */
47  
-#ifdef HAVE_EPOLL
48  
-#include "ae_epoll.c"
  47
+#ifdef HAVE_EVPORT
  48
+#include "ae_evport.c"
49 49
 #else
50  
-    #ifdef HAVE_KQUEUE
51  
-    #include "ae_kqueue.c"
  50
+    #ifdef HAVE_EPOLL
  51
+    #include "ae_epoll.c"
52 52
     #else
53  
-    #include "ae_select.c"
  53
+        #ifdef HAVE_KQUEUE
  54
+        #include "ae_kqueue.c"
  55
+        #else
  56
+        #include "ae_select.c"
  57
+        #endif
54 58
     #endif
55 59
 #endif
56 60
 
289  src/ae_evport.c
... ...
@@ -0,0 +1,289 @@
  1
+/* ae.c module for illumos event ports.
  2
+ * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  3
+ * Released under the BSD license. See the COPYING file for more info. */
  4
+
  5
+#include <assert.h>
  6
+#include <errno.h>
  7
+#include <port.h>
  8
+#include <poll.h>
  9
+
  10
+#include <sys/types.h>
  11
+#include <sys/time.h>
  12
+
  13
+#include <stdio.h>
  14
+
  15
+static int evport_debug = 0;
  16
+
  17
+/*
  18
+ * This file implements the ae API using event ports, present on Solaris-based
  19
+ * systems since Solaris 10.  Using the event port interface, we associate file
  20
+ * descriptors with the port.  Each association also includes the set of poll(2)
  21
+ * events that the consumer is interested in (e.g., POLLIN and POLLOUT).
  22
+ *
  23
+ * There's one tricky piece to this implementation: when we return events via
  24
+ * aeApiPoll, the corresponding file descriptors become dissociated from the
  25
+ * port.  This is necessary because poll events are level-triggered, so if the
  26
+ * fd didn't become dissociated, it would immediately fire another event since
  27
+ * the underlying state hasn't changed yet.  We must reassociate the file
  28
+ * descriptor, but only after we know that our caller has actually read from it.
  29
+ * The ae API does not tell us exactly when that happens, but we do know that
  30
+ * it must happen by the time aeApiPoll is called again.  Our solution is to
  31
+ * keep track of the last fds returned by aeApiPoll and reassociate them next
  32
+ * time aeApiPoll is invoked.
  33
+ *
  34
+ * To summarize, in this module, each fd association is EITHER (a) represented
  35
+ * only via the in-kernel assocation OR (b) represented by pending_fds and
  36
+ * pending_masks.  (b) is only true for the last fds we returned from aeApiPoll,
  37
+ * and only until we enter aeApiPoll again (at which point we restore the
  38
+ * in-kernel association).
  39
+ */
  40
+#define MAX_EVENT_BATCHSZ 512
  41
+
  42
+typedef struct aeApiState {
  43
+    int     portfd;                             /* event port */
  44
+    int     npending;                           /* # of pending fds */
  45
+    int     pending_fds[MAX_EVENT_BATCHSZ];     /* pending fds */
  46
+    int     pending_masks[MAX_EVENT_BATCHSZ];   /* pending fds' masks */
  47
+} aeApiState;
  48
+
  49
+static int aeApiCreate(aeEventLoop *eventLoop) {
  50
+    int i;
  51
+    aeApiState *state = zmalloc(sizeof(aeApiState));
  52
+    if (!state) return -1;
  53
+
  54
+    state->portfd = port_create();
  55
+    if (state->portfd == -1) {
  56
+        zfree(state);
  57
+        return -1;
  58
+    }
  59
+
  60
+    state->npending = 0;
  61
+
  62
+    for (i = 0; i < MAX_EVENT_BATCHSZ; i++) {
  63
+        state->pending_fds[i] = -1;
  64
+        state->pending_masks[i] = AE_NONE;
  65
+    }
  66
+
  67
+    eventLoop->apidata = state;
  68
+    return 0;
  69
+}
  70
+
  71
+static void aeApiFree(aeEventLoop *eventLoop) {
  72
+    aeApiState *state = eventLoop->apidata;
  73
+
  74
+    close(state->portfd);
  75
+    zfree(state);
  76
+}
  77
+
  78
+static int aeApiLookupPending(aeApiState *state, int fd) {
  79
+    int i;
  80
+
  81
+    for (i = 0; i < state->npending; i++) {
  82
+        if (state->pending_fds[i] == fd)
  83
+            return (i);
  84
+    }
  85
+
  86
+    return (-1);
  87
+}
  88
+
  89
+/*
  90
+ * Helper function to invoke port_associate for the given fd and mask.
  91
+ */
  92
+static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
  93
+    int events = 0;
  94
+    int rv, err;
  95
+
  96
+    if (mask & AE_READABLE)
  97
+        events |= POLLIN;
  98
+    if (mask & AE_WRITABLE)
  99
+        events |= POLLOUT;
  100
+
  101
+    if (evport_debug)
  102
+        fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);
  103
+
  104
+    rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
  105
+        (void *)(uintptr_t)mask);
  106
+    err = errno;
  107
+
  108
+    if (evport_debug)
  109
+        fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err));
  110
+
  111
+    if (rv == -1) {
  112
+        fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err));
  113
+
  114
+        if (err == EAGAIN)
  115
+            fprintf(stderr, "aeApiAssociate: event port limit exceeded.");
  116
+    }
  117
+
  118
+    return rv;
  119
+}
  120
+
  121
+static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  122
+    aeApiState *state = eventLoop->apidata;
  123
+    int fullmask, pfd;
  124
+
  125
+    if (evport_debug)
  126
+        fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);
  127
+
  128
+    /*
  129
+     * Since port_associate's "events" argument replaces any existing events, we
  130
+     * must be sure to include whatever events are already associated when
  131
+     * we call port_associate() again.
  132
+     */
  133
+    fullmask = mask | eventLoop->events[fd].mask;
  134
+    pfd = aeApiLookupPending(state, fd);
  135
+
  136
+    if (pfd != -1) {
  137
+        /*
  138
+         * This fd was recently returned from aeApiPoll.  It should be safe to
  139
+         * assume that the consumer has processed that poll event, but we play
  140
+         * it safer by simply updating pending_mask.  The fd will be
  141
+         * reassociated as usual when aeApiPoll is called again.
  142
+         */
  143
+        if (evport_debug)
  144
+            fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
  145
+        state->pending_masks[pfd] |= fullmask;
  146
+        return 0;
  147
+    }
  148
+
  149
+    return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask));
  150
+}
  151
+
  152
+static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
  153
+    aeApiState *state = eventLoop->apidata;
  154
+    int fullmask, pfd;
  155
+
  156
+    if (evport_debug)
  157
+        fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);
  158
+
  159
+    pfd = aeApiLookupPending(state, fd);
  160
+
  161
+    if (pfd != -1) {
  162
+        if (evport_debug)
  163
+            fprintf(stderr, "deleting event from pending fd %d\n", fd);
  164
+
  165
+        /*
  166
+         * This fd was just returned from aeApiPoll, so it's not currently
  167
+         * associated with the port.  All we need to do is update
  168
+         * pending_mask appropriately.
  169
+         */
  170
+        state->pending_masks[pfd] &= ~mask;
  171
+
  172
+        if (state->pending_masks[pfd] == AE_NONE)
  173
+            state->pending_fds[pfd] = -1;
  174
+
  175
+        return;
  176
+    }
  177
+
  178
+    /*
  179
+     * The fd is currently associated with the port.  Like with the add case
  180
+     * above, we must look at the full mask for the file descriptor before
  181
+     * updating that association.  We don't have a good way of knowing what the
  182
+     * events are without looking into the eventLoop state directly.  We rely on
  183
+     * the fact that our caller has already updated the mask in the eventLoop.
  184
+     */
  185
+
  186
+    fullmask = eventLoop->events[fd].mask;
  187
+    if (fullmask == AE_NONE) {
  188
+        /*
  189
+         * We're removing *all* events, so use port_dissociate to remove the
  190
+         * association completely.  Failure here indicates a bug.
  191
+         */
  192
+        if (evport_debug)
  193
+            fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd);
  194
+
  195
+        if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) {
  196
+            perror("aeApiDelEvent: port_dissociate");
  197
+            abort(); /* will not return */
  198
+        }
  199
+    } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd,
  200
+        fullmask) != 0) {
  201
+        /*
  202
+         * ENOMEM is a potentially transient condition, but the kernel won't
  203
+         * generally return it unless things are really bad.  EAGAIN indicates
  204
+         * we've reached an resource limit, for which it doesn't make sense to
  205
+         * retry (counterintuitively).  All other errors indicate a bug.  In any
  206
+         * of these cases, the best we can do is to abort.
  207
+         */
  208
+        abort(); /* will not return */
  209
+    }
  210
+}
  211
+
  212
+static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  213
+    aeApiState *state = eventLoop->apidata;
  214
+    struct timespec timeout, *tsp;
  215
+    int mask, i;
  216
+    uint_t nevents;
  217
+    port_event_t event[MAX_EVENT_BATCHSZ];
  218
+
  219
+    /*
  220
+     * If we've returned fd events before, we must reassociate them with the
  221
+     * port now, before calling port_get().  See the block comment at the top of
  222
+     * this file for an explanation of why.
  223
+     */
  224
+    for (i = 0; i < state->npending; i++) {
  225
+        if (state->pending_fds[i] == -1)
  226
+            /* This fd has since been deleted. */
  227
+            continue;
  228
+
  229
+        if (aeApiAssociate("aeApiPoll", state->portfd,
  230
+            state->pending_fds[i], state->pending_masks[i]) != 0) {
  231
+            /* See aeApiDelEvent for why this case is fatal. */
  232
+            abort();
  233
+        }
  234
+
  235
+        state->pending_masks[i] = AE_NONE;
  236
+        state->pending_fds[i] = -1;
  237
+    }
  238
+
  239
+    state->npending = 0;
  240
+
  241
+    if (tvp != NULL) {
  242
+        timeout.tv_sec = tvp->tv_sec;
  243
+        timeout.tv_nsec = tvp->tv_usec * 1000;
  244
+        tsp = &timeout;
  245
+    } else {
  246
+        tsp = NULL;
  247
+    }
  248
+
  249
+    /*
  250
+     * port_getn can return with errno == ETIME having returned some events (!).
  251
+     * So if we get ETIME, we check nevents, too.
  252
+     */
  253
+    nevents = 1;
  254
+    if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
  255
+        tsp) == -1 && (errno != ETIME || nevents == 0)) {
  256
+        if (errno == ETIME || errno == EINTR)
  257
+            return 0;
  258
+
  259
+        /* Any other error indicates a bug. */
  260
+        perror("aeApiPoll: port_get");
  261
+        abort();
  262
+    }
  263
+
  264
+    state->npending = nevents;
  265
+
  266
+    for (i = 0; i < nevents; i++) {
  267
+            mask = 0;
  268
+            if (event[i].portev_events & POLLIN)
  269
+                mask |= AE_READABLE;
  270
+            if (event[i].portev_events & POLLOUT)
  271
+                mask |= AE_WRITABLE;
  272
+
  273
+            eventLoop->fired[i].fd = event[i].portev_object;
  274
+            eventLoop->fired[i].mask = mask;
  275
+
  276
+            if (evport_debug)
  277
+                fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
  278
+                    (int)event[i].portev_object, mask);
  279
+
  280
+            state->pending_fds[i] = event[i].portev_object;
  281
+            state->pending_masks[i] = (uintptr_t)event[i].portev_user;
  282
+    }
  283
+
  284
+    return nevents;
  285
+}
  286
+
  287
+static char *aeApiName(void) {
  288
+    return "evport";
  289
+}
7  src/config.h
@@ -38,6 +38,13 @@
38 38
 #define HAVE_KQUEUE 1
39 39
 #endif
40 40
 
  41
+#ifdef __sun
  42
+#include <sys/feature_tests.h>
  43
+#ifdef _DTRACE_VERSION
  44
+#define HAVE_EVPORT 1
  45
+#endif
  46
+#endif
  47
+
41 48
 /* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */
42 49
 #ifdef __linux__
43 50
 #define aof_fsync fdatasync
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.