Skip to content

Commit

Permalink
main: keep fd handlers in list
Browse files Browse the repository at this point in the history
Experimental code in order to try to solve
sockets for Windows. On windows the "int fd" is actually
of type "SOCKET fd" and the range goes from 0 to very large.

ref #61
  • Loading branch information
alfredh committed Oct 7, 2017
1 parent 5fc85e9 commit 49834b7
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 59 deletions.
2 changes: 1 addition & 1 deletion mk/re.mk
Expand Up @@ -599,7 +599,7 @@ ifneq ($(HAVE_EPOLL),)
CFLAGS += -DHAVE_EPOLL
endif
ifneq ($(HAVE_KQUEUE),)
CFLAGS += -DHAVE_KQUEUE
#CFLAGS += -DHAVE_KQUEUE # NOTE: Broken in this branch
endif
CFLAGS += -DHAVE_UNAME
CFLAGS += -DHAVE_UNISTD_H
Expand Down
179 changes: 121 additions & 58 deletions src/main/main.c
Expand Up @@ -84,16 +84,19 @@ enum {
};


struct fdh {
struct le le; /**< Linked-list element */
int fd; /**< File Descriptor */
int flags; /**< Polling flags (Read, Write, etc.) */
fd_h *fh; /**< Event handler */
void *arg; /**< Handler argument */
};


/** Polling loop data */
struct re {
/** File descriptor handler set */
struct {
int flags; /**< Polling flags (Read, Write, etc.) */
fd_h *fh; /**< Event handler */
void *arg; /**< Handler argument */
} *fhs;
struct list fdhl; /**< List of File descriptor handlers */
int maxfds; /**< Maximum number of polling fds */
int nfds; /**< Number of active file descriptors */
enum poll_method method; /**< The current polling method */
bool update; /**< File descriptor set need updating */
bool polling; /**< Is polling flag */
Expand Down Expand Up @@ -121,8 +124,7 @@ struct re {
};

static struct re global_re = {
NULL,
0,
LIST_INIT,
0,
METHOD_NULL,
false,
Expand Down Expand Up @@ -230,21 +232,21 @@ static struct re *re_get(void)
* @param fd File descriptor
* @param flags Event flags
*/
static void fd_handler(struct re *re, int fd, int flags)
static void fd_handler(struct fdh *fdh, int flags)
{
const uint64_t tick = tmr_jiffies();
uint32_t diff;

DEBUG_INFO("event on fd=%d (flags=0x%02x)...\n", fd, flags);

re->fhs[fd].fh(flags, re->fhs[fd].arg);
fdh->fh(flags, fdh->arg);

diff = (uint32_t)(tmr_jiffies() - tick);

if (diff > MAX_BLOCKING) {
DEBUG_WARNING("long async blocking: %u>%u ms (h=%p arg=%p)\n",
diff, MAX_BLOCKING,
re->fhs[fd].fh, re->fhs[fd].arg);
fdh->fh, fdh->arg);
}
}
#endif
Expand Down Expand Up @@ -373,6 +375,7 @@ static int set_kqueue_fds(struct re *re, int fd, int flags)
#endif


#if 0
/**
* Rebuild the file descriptor mapping table. This must be done whenever
* the polling method is changed.
Expand Down Expand Up @@ -417,6 +420,7 @@ static int rebuild_fds(struct re *re)

return err;
}
#endif


static int poll_init(struct re *re)
Expand Down Expand Up @@ -496,7 +500,6 @@ static void poll_close(struct re *re)
{
DEBUG_INFO("poll close\n");

re->fhs = mem_deref(re->fhs);
re->maxfds = 0;

#ifdef HAVE_POLL
Expand Down Expand Up @@ -551,6 +554,30 @@ static int poll_setup(struct re *re)
}


static void fdh_destructor(void *data)
{
struct fdh *fdh = data;

list_unlink(&fdh->le);
}


static struct fdh *fdh_lookup(const struct re *re, int fd)
{
struct le *le;

for (le = re->fdhl.head; le; le = le->next) {

struct fdh *fdh = le->data;

if (fdh->fd == fd)
return fdh;
}

return NULL;
}


/**
* Listen for events on a file descriptor
*
Expand Down Expand Up @@ -579,23 +606,30 @@ int fd_listen(int fd, int flags, fd_h *fh, void *arg)
return err;
}

if (fd >= re->maxfds) {
if (flags) {
DEBUG_WARNING("fd_listen: fd=%d flags=0x%02x"
" - Max %d fds\n",
fd, flags, re->maxfds);
}
return EMFILE;
struct fdh *fdh;

fdh = fdh_lookup(re, fd);
if (fdh) {

/* Update fh set */
fdh->fd = fd;
fdh->flags = flags;
fdh->fh = fh;
fdh->arg = arg;
}
else {
fdh = mem_zalloc(sizeof(*fdh), fdh_destructor);

/* Add fh set */
fdh->fd = fd;
fdh->flags = flags;
fdh->fh = fh;
fdh->arg = arg;

/* Update fh set */
if (re->fhs) {
re->fhs[fd].flags = flags;
re->fhs[fd].fh = fh;
re->fhs[fd].arg = arg;
list_append(&re->fdhl, &fdh->le, fdh);
}

re->nfds = max(re->nfds, fd+1);
re->update = true;

switch (re->method) {

Expand Down Expand Up @@ -642,7 +676,18 @@ int fd_listen(int fd, int flags, fd_h *fh, void *arg)
*/
void fd_close(int fd)
{
(void)fd_listen(fd, 0, NULL, NULL);
struct re *re = re_get();

struct fdh *fdh = fdh_lookup(re, fd);
if (fdh) {
mem_deref(fdh);

re->update = true;
}
else {
DEBUG_NOTICE("fd_close: fd=%d not found\n", fd);
}

}


Expand All @@ -656,10 +701,11 @@ void fd_close(int fd)
static int fd_poll(struct re *re)
{
const uint64_t to = tmr_next_timeout(&re->tmrl);
int i, n;
int n;
#ifdef HAVE_SELECT
fd_set rfds, wfds, efds;
#endif
struct le *le;

DEBUG_INFO("next timer: %llu ms\n", to);

Expand All @@ -676,22 +722,29 @@ static int fd_poll(struct re *re)
#ifdef HAVE_SELECT
case METHOD_SELECT: {
struct timeval tv;
int nfds = 0;

/* Clear and update fd sets */
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);

for (i=0; i<re->nfds; i++) {
if (!re->fhs[i].fh)
/* loop through list of file-descriptor handlers */
for (le = re->fdhl.head; le; le = le->next) {

struct fdh *fdh = le->data;

if (!fdh->fh)
continue;

if (re->fhs[i].flags & FD_READ)
FD_SET(i, &rfds);
if (re->fhs[i].flags & FD_WRITE)
FD_SET(i, &wfds);
if (re->fhs[i].flags & FD_EXCEPT)
FD_SET(i, &efds);
if (fdh->flags & FD_READ)
FD_SET(fdh->fd, &rfds);
if (fdh->flags & FD_WRITE)
FD_SET(fdh->fd, &wfds);
if (fdh->flags & FD_EXCEPT)
FD_SET(fdh->fd, &efds);

nfds = max(nfds, fdh->fd+1);
}

#ifdef WIN32
Expand All @@ -701,7 +754,7 @@ static int fd_poll(struct re *re)
#endif
tv.tv_usec = (uint32_t) (to % 1000) * 1000;
re_unlock(re);
n = select(re->nfds, &rfds, &wfds, &efds, to ? &tv : NULL);
n = select(nfds, &rfds, &wfds, &efds, to ? &tv : NULL);
re_lock(re);
}
break;
Expand Down Expand Up @@ -740,9 +793,17 @@ static int fd_poll(struct re *re)
return errno;

/* Check for events */
for (i=0; (n > 0) && (i < re->nfds); i++) {

le = re->fdhl.head;

/* NOTE: callback handler might alter the list */
while (le) {

struct fdh *fdh = le->data;
int fd, flags = 0;

le = le->next;

switch (re->method) {

#ifdef HAVE_POLL
Expand All @@ -767,7 +828,8 @@ static int fd_poll(struct re *re)
#endif
#ifdef HAVE_SELECT
case METHOD_SELECT:
fd = i;
fd = fdh->fd;

if (FD_ISSET(fd, &rfds))
flags |= FD_READ;
if (FD_ISSET(fd, &wfds))
Expand Down Expand Up @@ -838,16 +900,18 @@ static int fd_poll(struct re *re)
if (!flags)
continue;

if (re->fhs[fd].fh) {

if (fdh->fh) {
#if MAIN_DEBUG
fd_handler(re, fd, flags);
fd_handler(fdh, flags);
#else
re->fhs[fd].fh(flags, re->fhs[fd].arg);
fdh->fh(flags, fdh->arg);
#endif
}

/* Check if polling method was changed */
if (re->update) {
re_printf("- fd_set updated\n");
re->update = false;
return 0;
}
Expand Down Expand Up @@ -879,15 +943,6 @@ int fd_setsize(int maxfds)
if (!re->maxfds)
re->maxfds = maxfds;

if (!re->fhs) {
DEBUG_INFO("fd_setsize: maxfds=%d, allocating %u bytes\n",
re->maxfds, re->maxfds * sizeof(*re->fhs));

re->fhs = mem_zalloc(re->maxfds * sizeof(*re->fhs), NULL);
if (!re->fhs)
return ENOMEM;
}

return 0;
}

Expand All @@ -898,20 +953,22 @@ int fd_setsize(int maxfds)
void fd_debug(void)
{
const struct re *re = re_get();
int i;
struct le *le;

if (!re->fhs)
if (!re)
return;

for (i=0; i<re->nfds; i++) {
for (le = re->fdhl.head; le; le = le->next) {

struct fdh *fdh = le->data;

if (!re->fhs[i].flags)
if (!fdh->flags)
continue;

(void)re_fprintf(stderr,
"fd %d in use: flags=%x fh=%p arg=%p\n",
i, re->fhs[i].flags, re->fhs[i].fh,
re->fhs[i].arg);
fdh->fd, fdh->flags, fdh->fh,
fdh->arg);
}
}

Expand Down Expand Up @@ -939,6 +996,8 @@ int re_main(re_signal_h *signalh)
struct re *re = re_get();
int err;

re_printf("$$$ NOTE - USING EXPERIMENTAL RE_MAIN $$$\n");

#ifdef HAVE_SIGNAL
if (signalh) {
(void)signal(SIGINT, signal_handler);
Expand Down Expand Up @@ -1029,7 +1088,7 @@ int re_debug(struct re_printf *pf, void *unused)

err |= re_hprintf(pf, "re main loop:\n");
err |= re_hprintf(pf, " maxfds: %d\n", re->maxfds);
err |= re_hprintf(pf, " nfds: %d\n", re->nfds);
err |= re_hprintf(pf, " nfds: %d\n", list_count(&re->fdhl));
err |= re_hprintf(pf, " method: %d (%s)\n", re->method,
poll_method_name(re->method));

Expand Down Expand Up @@ -1094,7 +1153,11 @@ int poll_method_set(enum poll_method method)
if (err)
return err;

#if 0
return rebuild_fds(re);
#endif

return 0;
}


Expand Down

0 comments on commit 49834b7

Please sign in to comment.