Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

support for thread-hopping apr_pollset_* using z/OS async i/o.

no support for apr_pollcb yet.


git-svn-id: https://svn.apache.org/repos/asf/apr/apr/trunk@1308910 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit b83a2aacc9fb541c8a94b476e6fb84614bd4f95d 1 parent 5e28471
Greg Ames authored
2  configure.in
@@ -1271,6 +1271,7 @@ dnl ----------------------------- Checks for Any required Headers
1271 1271 AC_HEADER_STDC
1272 1272
1273 1273 APR_FLAG_HEADERS(
  1274 + aio.h \
1274 1275 ByteOrder.h \
1275 1276 conio.h \
1276 1277 crypt.h \
@@ -1368,6 +1369,7 @@ else
1368 1369 netinet_tcph=0
1369 1370 fi
1370 1371
  1372 +AC_SUBST(aioh)
1371 1373 AC_SUBST(arpa_ineth)
1372 1374 AC_SUBST(conioh)
1373 1375 AC_SUBST(ctypeh)
1  include/apr_poll.h
@@ -77,6 +77,7 @@ typedef enum {
77 77 APR_POLLSET_KQUEUE, /**< Poll uses kqueue method */
78 78 APR_POLLSET_PORT, /**< Poll uses Solaris event port method */
79 79 APR_POLLSET_EPOLL, /**< Poll uses epoll method */
  80 + APR_POLLSET_ASIO, /**< Poll uses z/OS async i/o method */
80 81 APR_POLLSET_POLL /**< Poll uses poll method */
81 82 } apr_pollset_method_e;
82 83
10 include/arch/unix/apr_arch_poll_private.h
@@ -45,6 +45,11 @@
45 45 #define HAS_PIPES(dt) (dt == APR_POLL_FILE) ? 1 : 0
46 46 #endif
47 47
  48 +#ifdef HAVE_AIO_H
  49 +#define _AIO_OS390 /* enable a bunch of z/OS aio.h definitions */
  50 +#include <aio.h> /* aiocb */
  51 +#endif
  52 +
48 53 /* Choose the best method platform specific to use in apr_pollset */
49 54 #ifdef HAVE_KQUEUE
50 55 #define POLLSET_USES_KQUEUE
@@ -55,6 +60,9 @@
55 60 #elif defined(HAVE_EPOLL)
56 61 #define POLLSET_USES_EPOLL
57 62 #define POLLSET_DEFAULT_METHOD APR_POLLSET_EPOLL
  63 +#elif defined(HAVE_AIO_H)
  64 +#define POLLSET_USES_ASIO
  65 +#define POLLSET_DEFAULT_METHOD APR_POLLSET_ASIO
58 66 #elif defined(HAVE_POLL)
59 67 #define POLLSET_USES_POLL
60 68 #define POLLSET_DEFAULT_METHOD APR_POLLSET_POLL
@@ -75,7 +83,7 @@
75 83 #endif
76 84 #endif
77 85
78   -#if defined(POLLSET_USES_KQUEUE) || defined(POLLSET_USES_EPOLL) || defined(POLLSET_USES_PORT)
  86 +#if defined(POLLSET_USES_KQUEUE) || defined(POLLSET_USES_EPOLL) || defined(POLLSET_USES_PORT) || defined(POLLSET_USES_ASIO)
79 87
80 88 #include "apr_ring.h"
81 89
8 poll/unix/pollset.c
@@ -52,6 +52,9 @@ extern apr_pollset_provider_t *apr_pollset_provider_port;
52 52 #if defined(HAVE_EPOLL)
53 53 extern apr_pollset_provider_t *apr_pollset_provider_epoll;
54 54 #endif
  55 +#if defined(HAVE_AIO_H)
  56 +extern apr_pollset_provider_t *apr_pollset_provider_asio;
  57 +#endif
55 58 #if defined(HAVE_POLL)
56 59 extern apr_pollset_provider_t *apr_pollset_provider_poll;
57 60 #endif
@@ -76,6 +79,11 @@ static apr_pollset_provider_t *pollset_provider(apr_pollset_method_e method)
76 79 provider = apr_pollset_provider_epoll;
77 80 #endif
78 81 break;
  82 + case APR_POLLSET_ASIO:
  83 +#if defined(HAVE_AIO_H)
  84 + provider = apr_pollset_provider_asio;
  85 +#endif
  86 + break;
79 87 case APR_POLLSET_POLL:
80 88 #if defined(HAVE_POLL)
81 89 provider = apr_pollset_provider_poll;
741 poll/unix/z_asio.c
... ... @@ -0,0 +1,741 @@
  1 +/* Licensed to the Apache Software Foundation (ASF) under one or more
  2 + * contributor license agreements. See the NOTICE file distributed with
  3 + * this work for additional information regarding copyright ownership.
  4 + * The ASF licenses this file to You under the Apache License, Version 2.0
  5 + * (the "License"); you may not use this file except in compliance with
  6 + * the License. You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + *
  16 + *
  17 + ******************************************************************************
  18 + *
  19 + * This implementation is based on the z/OS sockets async i/o facility. When a
  20 + * socket is added to the pollset, an async poll is issued for that individual
  21 + * socket. It specifies that the kernel should send an IPC message when the
  22 + * socket becomes ready. The IPC messages are sent to a single message queue
  23 + * that is part of the pollset. apr_pollset_poll waits on the arrival of IPC
  24 + * messages or the specified timeout.
  25 + *
  26 + * Since z/OS does not support async i/o for pipes or files at present, this
  27 + * implementation falls back to using ordinary poll() when
  28 + * APR_POLLSET_THREADSAFE is unset.
  29 + *
  30 + * Greg Ames
  31 + * April 2012
  32 + */
  33 +
  34 +#include "apr.h"
  35 +#include "apr_hash.h"
  36 +#include "apr_poll.h"
  37 +#include "apr_time.h"
  38 +#include "apr_portable.h"
  39 +#include "apr_arch_inherit.h"
  40 +#include "apr_arch_file_io.h"
  41 +#include "apr_arch_networkio.h"
  42 +#include "apr_arch_poll_private.h"
  43 +
  44 +#ifdef HAVE_AIO_H
  45 +
  46 +#include <sys/msg.h> /* msgget etc */
  47 +#include <time.h> /* timestruct */
  48 +#include <poll.h> /* pollfd */
  49 +#include <limits.h> /* MAX_INT */
  50 +
  51 +struct apr_pollset_private_t
  52 +{
  53 + int msg_q; /* IPC message queue. The z/OS kernel sends messages
  54 + * to this queue when our async polls on individual
  55 + * file descriptors complete
  56 + */
  57 + apr_pollfd_t *result_set;
  58 + apr_uint32_t size;
  59 +
  60 +#if APR_HAS_THREADS
  61 + /* A thread mutex to protect operations on the rings and the hash */
  62 + apr_thread_mutex_t *ring_lock;
  63 +#endif
  64 +
  65 + /* A hash of all active elements used for O(1) garbage collection */
  66 + apr_hash_t *elems;
  67 +
  68 + APR_RING_HEAD(ready_ring_t, asio_elem_t) ready_ring;
  69 + APR_RING_HEAD(prior_ready_ring_t, asio_elem_t) prior_ready_ring;
  70 + APR_RING_HEAD(free_ring_t, asio_elem_t) free_ring;
  71 +
  72 + /* for pipes etc with no asio */
  73 + struct pollfd *pollset;
  74 + apr_pollfd_t *query_set;
  75 +};
  76 +
  77 +typedef enum {
  78 + ASIO_INIT = 0,
  79 + ASIO_CANCELLED
  80 +} asio_state_e;
  81 +
  82 +typedef struct asio_elem_t asio_elem_t;
  83 +
  84 +struct asio_msgbuf_t {
  85 + long msg_type; /* must be > 0 */
  86 + asio_elem_t *msg_elem;
  87 +};
  88 +
  89 +struct asio_elem_t
  90 +{
  91 + APR_RING_ENTRY(asio_elem_t) link;
  92 + apr_pollfd_t pfd;
  93 + struct pollfd os_pfd;
  94 + struct aiocb a;
  95 + asio_state_e state;
  96 + struct asio_msgbuf_t msg;
  97 +};
  98 +
  99 +#define DEBUG 0
  100 +
  101 +/* DEBUG settings: 0 - no debug messages at all,
  102 + * 1 - should not occur messages,
  103 + * 2 - apr_pollset_* entry and exit messages,
  104 + * 3 - state changes, memory usage,
  105 + * 4 - z/OS, APR, and internal calls,
  106 + * 5 - everything else except the timer pop path,
  107 + * 6 - everything, including the Event 1 sec timer pop path
  108 + *
  109 + * each DEBUG level includes all messages produced by lower numbered levels
  110 + */
  111 +
  112 +#if DEBUG
  113 +
  114 +#include <assert.h>
  115 +#include <unistd.h> /* getpid */
  116 +
  117 +#define DBG_BUFF char dbg_msg_buff[256];
  118 +
  119 +#define DBG_TEST(lvl) if (lvl <= DEBUG) {
  120 +
  121 +#define DBG_CORE(msg) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
  122 + " " msg, getpid()), \
  123 + fprintf(stderr, "%s", dbg_msg_buff);
  124 +#define DBG_CORE1(msg, var1) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
  125 + " " msg, getpid(), var1), \
  126 + fprintf(stderr, "%s", dbg_msg_buff);
  127 +#define DBG_CORE2(msg, var1, var2) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
  128 + " " msg, getpid(), var1, var2), \
  129 + fprintf(stderr, "%s", dbg_msg_buff);
  130 +#define DBG_CORE3(msg, var1, var2, var3) \
  131 + sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
  132 + " " msg, getpid(), var1, var2, var3), \
  133 + fprintf(stderr, "%s", dbg_msg_buff);
  134 +#define DBG_CORE4(msg, var1, var2, var3, var4) \
  135 + sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
  136 + " " msg, getpid(), var1, var2, var3, var4),\
  137 + fprintf(stderr, "%s", dbg_msg_buff);
  138 +
  139 +#define DBG_END }
  140 +
  141 +#define DBG(lvl, msg) DBG_TEST(lvl) \
  142 + DBG_CORE(msg) \
  143 + DBG_END
  144 +
  145 +#define DBG1(lvl, msg, var1) DBG_TEST(lvl) \
  146 + DBG_CORE1(msg, var1) \
  147 + DBG_END
  148 +
  149 +#define DBG2(lvl, msg, var1, var2) DBG_TEST(lvl) \
  150 + DBG_CORE2(msg, var1, var2) \
  151 + DBG_END
  152 +
  153 +#define DBG3(lvl, msg, var1, var2, var3) \
  154 + DBG_TEST(lvl) \
  155 + DBG_CORE3(msg, var1, var2, var3) \
  156 + DBG_END
  157 +
  158 +#define DBG4(lvl, msg, var1, var2, var3, var4) \
  159 + DBG_TEST(lvl) \
  160 + DBG_CORE4(msg, var1, var2, var3, var4) \
  161 + DBG_END
  162 +
  163 +#else /* DEBUG is 0 */
  164 +#define DBG_BUFF
  165 +#define DBG(lvl, msg) ((void)0)
  166 +#define DBG1(lvl, msg, var1) ((void)0)
  167 +#define DBG2(lvl, msg, var1, var2) ((void)0)
  168 +#define DBG3(lvl, msg, var1, var2, var3) ((void)0)
  169 +#define DBG4(lvl, msg, var1, var2, var3, var4) ((void)0)
  170 +
  171 +#endif /* DEBUG */
  172 +
  173 +static int asyncio(asio_elem_t *elem)
  174 +{
  175 + DBG_BUFF
  176 + int rv;
  177 +
  178 +#ifdef _LP64
  179 +#define AIO BPX4AIO
  180 +#else
  181 +#define AIO BPX1AIO
  182 +#endif
  183 +
  184 + AIO(sizeof(struct aiocb), &(elem->a), &rv, &errno, __err2ad());
  185 + DBG3(4, "BPX4AIO aiocb %p elem %p rv %d\n",
  186 + &(elem->a), elem, rv);
  187 +#ifdef DEBUG
  188 + if (rv < 0) {
  189 + DBG2(4, "errno %d errnojr %08x\n",
  190 + errno, *__err2ad());
  191 + }
  192 +#endif
  193 + return rv;
  194 +}
  195 +
  196 +static apr_int16_t get_event(apr_int16_t event)
  197 +{
  198 + DBG_BUFF
  199 + apr_int16_t rv = 0;
  200 + DBG(4, "entered\n");
  201 +
  202 + if (event & APR_POLLIN)
  203 + rv |= POLLIN;
  204 + if (event & APR_POLLPRI)
  205 + rv |= POLLPRI;
  206 + if (event & APR_POLLOUT)
  207 + rv |= POLLOUT;
  208 + if (event & APR_POLLERR)
  209 + rv |= POLLERR;
  210 + if (event & APR_POLLHUP)
  211 + rv |= POLLHUP;
  212 + if (event & APR_POLLNVAL)
  213 + rv |= POLLNVAL;
  214 +
  215 + DBG(4, "exiting\n");
  216 + return rv;
  217 +}
  218 +
  219 +static apr_int16_t get_revent(apr_int16_t event)
  220 +{
  221 + DBG_BUFF
  222 + apr_int16_t rv = 0;
  223 + DBG(4, "entered\n");
  224 +
  225 + if (event & POLLIN)
  226 + rv |= APR_POLLIN;
  227 + if (event & POLLPRI)
  228 + rv |= APR_POLLPRI;
  229 + if (event & POLLOUT)
  230 + rv |= APR_POLLOUT;
  231 + if (event & POLLERR)
  232 + rv |= APR_POLLERR;
  233 + if (event & POLLHUP)
  234 + rv |= APR_POLLHUP;
  235 + if (event & POLLNVAL)
  236 + rv |= APR_POLLNVAL;
  237 +
  238 + DBG(4, "exiting\n");
  239 + return rv;
  240 +}
  241 +
  242 +static apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset)
  243 +{
  244 + DBG_BUFF
  245 + int rv;
  246 +
  247 + DBG(4, "entered\n");
  248 + rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL);
  249 +
  250 + DBG1(4, "exiting, msgctl(IPC_RMID) returned %d\n", rv);
  251 + return rv;
  252 +}
  253 +
  254 +static apr_status_t asio_pollset_create(apr_pollset_t *pollset,
  255 + apr_uint32_t size,
  256 + apr_pool_t *p,
  257 + apr_uint32_t flags)
  258 +{
  259 + DBG_BUFF
  260 + apr_status_t rv;
  261 + apr_pollset_private_t *priv;
  262 +
  263 + DBG1(2, "entered, flags: %x\n", flags);
  264 +
  265 + priv = pollset->p = apr_palloc(p, sizeof(*priv));
  266 +
  267 + if (flags & APR_POLLSET_THREADSAFE) {
  268 +#if APR_HAS_THREADS
  269 + if (rv = apr_thread_mutex_create(&(priv->ring_lock),
  270 + APR_THREAD_MUTEX_DEFAULT,
  271 + p) != APR_SUCCESS) {
  272 + DBG1(1, "apr_thread_mutex_create returned %d\n", rv);
  273 + pollset = NULL;
  274 + return rv;
  275 + }
  276 + rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */
  277 + if (rv < 0) {
  278 +#if DEBUG
  279 + perror(__FUNCTION__ " msgget returned < 0 ");
  280 +#endif
  281 + pollset = NULL;
  282 + return rv;
  283 + }
  284 +
  285 + DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv);
  286 + priv->msg_q = rv;
  287 + priv->elems = apr_hash_make(p);
  288 +
  289 + APR_RING_INIT(&priv->free_ring, asio_elem_t, link);
  290 + APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
  291 +
  292 +#else /* APR doesn't have threads but caller wants a threadsafe pollset */
  293 + pollset = NULL;
  294 + return APR_ENOTIMPL;
  295 +#endif
  296 +
  297 + } else { /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o,
  298 + * init fields only needed in old style pollset
  299 + */
  300 +
  301 + priv->pollset = apr_palloc(p, size * sizeof(struct pollfd));
  302 + priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
  303 +
  304 + if ((!priv->pollset) || (!priv->query_set)) {
  305 + return APR_ENOMEM;
  306 + }
  307 + }
  308 +
  309 + pollset->nelts = 0;
  310 + pollset->flags = flags;
  311 + pollset->pool = p;
  312 + priv->size = size;
  313 + priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
  314 + if (!priv->result_set) {
  315 + return APR_ENOMEM;
  316 + }
  317 +
  318 + DBG2(2, "exiting, pollset: %p, type: %s\n",
  319 + pollset,
  320 + flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX");
  321 +
  322 +
  323 + return APR_SUCCESS;
  324 +
  325 +} /* end of asio_pollset_create */
  326 +
  327 +static apr_status_t posix_add(apr_pollset_t *pollset,
  328 + const apr_pollfd_t *descriptor)
  329 +{
  330 + DBG_BUFF
  331 + int fd;
  332 + apr_pool_t *p = pollset->pool;
  333 + apr_pollset_private_t *priv = pollset->p;
  334 +
  335 + DBG(4, "entered\n");
  336 +
  337 + if (pollset->nelts == priv->size) {
  338 + return APR_ENOMEM;
  339 + }
  340 +
  341 + priv->query_set[pollset->nelts] = *descriptor;
  342 + if (descriptor->desc_type == APR_POLL_SOCKET) {
  343 + fd = descriptor->desc.s->socketdes;
  344 + }
  345 + else {
  346 + fd = descriptor->desc.f->filedes;
  347 + }
  348 +
  349 + priv->pollset[pollset->nelts].fd = fd;
  350 +
  351 + priv->pollset[pollset->nelts].events =
  352 + get_event(descriptor->reqevents);
  353 +
  354 + pollset->nelts++;
  355 +
  356 + DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset);
  357 +
  358 + return APR_SUCCESS;
  359 +} /* end of posix_add */
  360 +
  361 +
  362 +static apr_status_t asio_pollset_add(apr_pollset_t *pollset,
  363 + const apr_pollfd_t *descriptor)
  364 +{
  365 + DBG_BUFF
  366 + asio_elem_t *elem;
  367 + apr_status_t rv = APR_SUCCESS;
  368 + apr_pollset_private_t *priv = pollset->p;
  369 +
  370 + pollset_lock_rings();
  371 + DBG(2, "entered\n");
  372 +
  373 + if (pollset->flags & APR_POLLSET_THREADSAFE) {
  374 +
  375 + if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) {
  376 + elem = APR_RING_FIRST(&(priv->free_ring));
  377 + APR_RING_REMOVE(elem, link);
  378 + DBG1(3, "used recycled memory at %08p\n", elem);
  379 + elem->state = ASIO_INIT;
  380 + }
  381 + else {
  382 + elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t));
  383 + DBG1(3, "alloced new memory at %08p\n", elem);
  384 +
  385 + elem->a.aio_notifytype = AIO_MSGQ;
  386 + elem->a.aio_msgev_qid = priv->msg_q;
  387 + DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid);
  388 + elem->a.aio_msgev_size = sizeof(asio_elem_t *);
  389 + elem->a.aio_msgev_flag = 0; /* wait if queue is full */
  390 + elem->a.aio_msgev_addr = &(elem->msg);
  391 + elem->a.aio_buf = &(elem->os_pfd);
  392 + elem->a.aio_nbytes = 1; /* number of pfds to poll */
  393 + elem->msg.msg_type = 1;
  394 + elem->msg.msg_elem = elem;
  395 + }
  396 +
  397 + /* z/OS only supports async I/O for sockets for now */
  398 + elem->os_pfd.fd = descriptor->desc.s->socketdes;
  399 +
  400 + APR_RING_ELEM_INIT(elem, link);
  401 + elem->a.aio_cmd = AIO_SELPOLL;
  402 + elem->a.aio_cflags &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/
  403 + elem->pfd = *descriptor;
  404 + elem->os_pfd.events = get_event(descriptor->reqevents);
  405 +
  406 + if (0 != asyncio(elem)) {
  407 + rv = errno;
  408 + DBG3(4, "pollset %p asio failed fd %d, errno %p\n",
  409 + pollset, elem->os_pfd.fd, rv);
  410 +#if DEBUG
  411 + perror(__FUNCTION__ " asio failure");
  412 +#endif
  413 + }
  414 + else {
  415 + DBG2(4, "good asio call, adding fd %d to pollset %p\n",
  416 + elem->os_pfd.fd, pollset);
  417 +
  418 + pollset->nelts++;
  419 + apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem);
  420 + }
  421 + }
  422 + else {
  423 + /* APR_POLLSET_THREADSAFE isn't set. use POSIX poll in case
  424 + * pipes or files are used with this pollset
  425 + */
  426 +
  427 + rv = posix_add(pollset, descriptor);
  428 + }
  429 +
  430 + DBG1(2, "exiting, rv = %d\n", rv);
  431 +
  432 + pollset_unlock_rings();
  433 + return rv;
  434 +} /* end of asio_pollset_add */
  435 +
  436 +static posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor)
  437 +{
  438 + DBG_BUFF
  439 + apr_uint32_t i;
  440 + apr_pollset_private_t *priv = pollset->p;
  441 +
  442 + DBG(4, "entered\n");
  443 + for (i = 0; i < pollset->nelts; i++) {
  444 + if (descriptor->desc.s == priv->query_set[i].desc.s) {
  445 + /* Found an instance of the fd: remove this and any other copies */
  446 + apr_uint32_t dst = i;
  447 + apr_uint32_t old_nelts = pollset->nelts;
  448 + pollset->nelts--;
  449 + for (i++; i < old_nelts; i++) {
  450 + if (descriptor->desc.s == priv->query_set[i].desc.s) {
  451 + pollset->nelts--;
  452 + }
  453 + else {
  454 + priv->pollset[dst] = priv->pollset[i];
  455 + priv->query_set[dst] = priv->query_set[i];
  456 + dst++;
  457 + }
  458 + }
  459 + DBG(4, "returning OK\n");
  460 + return APR_SUCCESS;
  461 + }
  462 + }
  463 +
  464 + DBG(1, "returning APR_NOTFOUND\n");
  465 + return APR_NOTFOUND;
  466 +
  467 +} /* end of posix_remove */
  468 +
  469 +static apr_status_t asio_pollset_remove(apr_pollset_t *pollset,
  470 + const apr_pollfd_t *descriptor)
  471 +{
  472 + DBG_BUFF
  473 + asio_elem_t *elem;
  474 + apr_status_t rv = APR_SUCCESS;
  475 + apr_pollset_private_t *priv = pollset->p;
  476 +
  477 + int fd;
  478 +
  479 + DBG(2, "entered\n");
  480 +
  481 + if (!(pollset->flags & APR_POLLSET_THREADSAFE)) {
  482 + return posix_remove(pollset, descriptor);
  483 + }
  484 +
  485 + pollset_lock_rings();
  486 +
  487 +#if DEBUG
  488 + assert(descriptor->desc_type == APR_POLL_SOCKET);
  489 +#endif
  490 + /* zOS 1.12 doesn't support files for async i/o */
  491 + fd = descriptor->desc.s->socketdes;
  492 +
  493 + elem = apr_hash_get(priv->elems, &(fd), sizeof(int));
  494 + if (elem == NULL) {
  495 + DBG1(1, "couldn't find fd %d\n", fd);
  496 + rv = APR_NOTFOUND;
  497 + } else {
  498 + DBG1(5, "hash found fd %d\n", fd);
  499 + /* delete this fd from the hash */
  500 + apr_hash_set(priv->elems, &(fd), sizeof(int), NULL);
  501 + /* asyncio call to cancel */
  502 + elem->a.aio_cmd = AIO_CANCEL;
  503 +
  504 + /* elem->a.aio_cflags = AIO_CANCELNONOTIFY; */
  505 + /* we want *msgrcv to collect cancel notifications to remove races
  506 + * in garbage collection */
  507 +
  508 + rv = asyncio(elem);
  509 + DBG1(4, "asyncio returned %d\n", rv);
  510 +
  511 + if (rv == 1) {
  512 + elem->state = ASIO_CANCELLED;
  513 + rv = APR_SUCCESS;
  514 + }
  515 + }
  516 +
  517 + DBG1(2, "exiting, rv: %d\n", rv);
  518 +
  519 + pollset_unlock_rings();
  520 +
  521 + return rv;
  522 +} /* end of asio_pollset_remove */
  523 +
  524 +static posix_poll(apr_pollset_t *pollset,
  525 + apr_interval_time_t timeout,
  526 + apr_int32_t *num,
  527 + const apr_pollfd_t **descriptors)
  528 +{
  529 + DBG_BUFF
  530 + int rv;
  531 + apr_uint32_t i, j;
  532 + apr_pollset_private_t *priv = pollset->p;
  533 +
  534 + DBG(4, "entered\n");
  535 +
  536 + if (timeout > 0) {
  537 + timeout /= 1000;
  538 + }
  539 + rv = poll(priv->pollset, pollset->nelts, timeout);
  540 + (*num) = rv;
  541 + if (rv < 0) {
  542 + return apr_get_netos_error();
  543 + }
  544 + if (rv == 0) {
  545 + return APR_TIMEUP;
  546 + }
  547 + j = 0;
  548 + for (i = 0; i < pollset->nelts; i++) {
  549 + if (priv->pollset[i].revents != 0) {
  550 + priv->result_set[j] = priv->query_set[i];
  551 + priv->result_set[j].rtnevents =
  552 + get_revent(priv->pollset[i].revents);
  553 + j++;
  554 + }
  555 + }
  556 + if (descriptors)
  557 + *descriptors = priv->result_set;
  558 +
  559 + DBG(4, "exiting ok\n");
  560 + return APR_SUCCESS;
  561 +
  562 +} /* end of posix_poll */
  563 +
  564 +static process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg)
  565 +{
  566 + DBG_BUFF
  567 + asio_elem_t *elem = msg->msg_elem;
  568 +
  569 + if (elem->state == ASIO_CANCELLED) {
  570 + DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n",
  571 + elem, elem->os_pfd.fd);
  572 + APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem,
  573 + asio_elem_t, link);
  574 + } else {
  575 + DBG2(4, "adding to ready ring: elem %08p, fd %d\n",
  576 + elem, elem->os_pfd.fd);
  577 + APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem,
  578 + asio_elem_t, link);
  579 + }
  580 +}
  581 +
  582 +static apr_status_t asio_pollset_poll(apr_pollset_t *pollset,
  583 + apr_interval_time_t timeout,
  584 + apr_int32_t *num,
  585 + const apr_pollfd_t **descriptors)
  586 +{
  587 + DBG_BUFF
  588 + int i, ret;
  589 + asio_elem_t *elem, *next_elem;
  590 + struct asio_msgbuf_t msg_buff;
  591 + struct timespec tv;
  592 + apr_status_t rv = APR_SUCCESS;
  593 + apr_pollset_private_t *priv = pollset->p;
  594 +
  595 + DBG(6, "entered\n"); /* chatty - traces every second w/Event */
  596 +
  597 + if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) {
  598 + return posix_poll(pollset, timeout, num, descriptors);
  599 + }
  600 +
  601 + pollset_lock_rings();
  602 + APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link);
  603 +
  604 + while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) {
  605 + elem = APR_RING_FIRST(&(priv->prior_ready_ring));
  606 + DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n",
  607 + pollset,
  608 + elem,
  609 + elem->os_pfd.fd);
  610 +
  611 + APR_RING_REMOVE(elem, link);
  612 +
  613 + /*
  614 + * since USS does not remember what's in our pollset, we have
  615 + * to re-add fds which have not been apr_pollset_remove'd
  616 + *
  617 + * there may have been too many ready fd's to return in the
  618 + * result set last time. re-poll inline for both cases
  619 + */
  620 +
  621 + if (elem->state == ASIO_CANCELLED) {
  622 + continue; /* do not re-add if it has been _removed */
  623 + }
  624 +
  625 + elem->a.aio_cflags = AIO_OK2COMPIMD;
  626 +
  627 + if (0 != (ret = asyncio(elem))) {
  628 + if (ret == 1) {
  629 + DBG(4, "asyncio() completed inline\n");
  630 + /* it's ready now */
  631 + APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t,
  632 + link);
  633 + }
  634 + else {
  635 + DBG2(1, "asyncio() failed, ret: %d, errno: %d\n",
  636 + ret, errno);
  637 + pollset_unlock_rings();
  638 + return errno;
  639 + }
  640 + }
  641 + DBG1(4, "asyncio() completed rc %d\n", ret);
  642 + }
  643 +
  644 + DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */
  645 +
  646 + /* Gather async poll completions that have occurred since the last call */
  647 + while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0,
  648 + IPC_NOWAIT)) {
  649 + process_msg(pollset, &msg_buff);
  650 + }
  651 +
  652 + /* Suspend if nothing is ready yet. */
  653 + if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) {
  654 +
  655 + if (timeout >= 0) {
  656 + tv.tv_sec = apr_time_sec(timeout);
  657 + tv.tv_nsec = apr_time_usec(timeout) * 1000;
  658 + } else {
  659 + tv.tv_sec = INT_MAX; /* block until something is ready */
  660 + }
  661 +
  662 + DBG2(6, "nothing on the ready ring "
  663 + "- blocking for %d seconds %d ns\n",
  664 + tv.tv_sec, tv.tv_nsec);
  665 +
  666 + pollset_unlock_rings(); /* allow other apr_pollset_* calls while blocked */
  667 +
  668 + if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff,
  669 + sizeof(asio_elem_t *), 0, NULL, &tv))) {
  670 +#if DEBUG
  671 + if (errno == EAGAIN) {
  672 + DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */
  673 + }
  674 + else {
  675 + DBG(1, "__msgrcv_timed failed!\n");
  676 + }
  677 +#endif
  678 + return (errno == EAGAIN) ? APR_TIMEUP : errno;
  679 + }
  680 +
  681 + pollset_lock_rings();
  682 +
  683 + process_msg(pollset, &msg_buff);
  684 + }
  685 +
  686 + APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
  687 +
  688 + (*num) = 0;
  689 + elem = APR_RING_FIRST(&(priv->ready_ring));
  690 +
  691 + for (i = 0;
  692 +
  693 + i < priv->size
  694 + && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link);
  695 + i++) {
  696 + DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd);
  697 +
  698 + priv->result_set[i] = elem->pfd;
  699 + priv->result_set[i].rtnevents
  700 + = get_revent(elem->os_pfd.revents);
  701 + (*num)++;
  702 +
  703 + elem = APR_RING_NEXT(elem, link);
  704 +
  705 +#if DEBUG
  706 + if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) {
  707 + DBG(5, "end of ready ring reached\n");
  708 + }
  709 +#endif
  710 + }
  711 +
  712 + if (descriptors) {
  713 + *descriptors = priv->result_set;
  714 + }
  715 +
  716 + /* if the result size is too small, remember which descriptors
  717 + * haven't had results reported yet. we will look
  718 + * at these descriptors on the next apr_pollset_poll call
  719 + */
  720 +
  721 + APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link);
  722 +
  723 + DBG1(2, "exiting, rv = %d\n", rv);
  724 +
  725 + pollset_unlock_rings();
  726 +
  727 + return rv;
  728 +} /* end of asio_pollset_poll */
  729 +
  730 +static apr_pollset_provider_t impl = {
  731 + asio_pollset_create,
  732 + asio_pollset_add,
  733 + asio_pollset_remove,
  734 + asio_pollset_poll,
  735 + asio_pollset_cleanup,
  736 + "asio"
  737 +};
  738 +
  739 +apr_pollset_provider_t *apr_pollset_provider_asio = &impl;
  740 +
  741 +#endif /* __MVS__ */

0 comments on commit b83a2aa

Please sign in to comment.
Something went wrong with that request. Please try again.