Skip to content

Commit

Permalink
add core event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Fredrik Widlund committed Jan 8, 2020
1 parent d316f43 commit 2feb7bc
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 8 deletions.
6 changes: 4 additions & 2 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ src/dynamic/string.c \
src/dynamic/map.c \
src/dynamic/maps.c \
src/dynamic/mapi.c \
src/dynamic/pool.c
src/dynamic/pool.c \
src/dynamic/core.c

HEADER_FILES = \
src/dynamic/hash.h \
Expand All @@ -29,7 +30,8 @@ src/dynamic/string.h \
src/dynamic/map.h \
src/dynamic/maps.h \
src/dynamic/mapi.h \
src/dynamic/pool.c
src/dynamic/pool.h \
src/dynamic/core.h

AUTOMAKE_OPTIONS = subdir-objects
lib_LTLIBRARIES= libdynamic.la
Expand Down
1 change: 1 addition & 0 deletions src/dynamic.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extern "C" {
#include <dynamic/maps.h>
#include <dynamic/mapi.h>
#include <dynamic/pool.h>
#include <dynamic/core.h>

#ifdef __cplusplus
}
Expand Down
121 changes: 121 additions & 0 deletions src/dynamic/core.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/epoll.h>

#include "buffer.h"
#include "vector.h"
#include "core.h"

static core_status core_default_callback(core_event *);
static core_handler core_handler_default = {.callback = core_default_callback};

static core_status core_default_callback(core_event *event __attribute__((unused)))
{
return CORE_OK;
}

void core_construct(core *core)
{
*core = (struct core) {0};
vector_construct(&core->handlers, sizeof(core_handler));
vector_construct(&core->next, sizeof(core_handler));
core->fd = epoll_create1(EPOLL_CLOEXEC);
if (core->fd == -1)
core->errors ++;
}

void core_destruct(core *core)
{
if (core->fd >= 0)
(void) close(core->fd);
vector_destruct(&core->handlers, NULL);
vector_destruct(&core->next, NULL);
}

core_status core_dispatch(core_handler *handler, int type, uintptr_t data)
{
return handler->callback((core_event[]){{.state = handler->state, .type = type, .data = data}});
}

void core_loop(core *core)
{
struct epoll_event events[CORE_MAX_EVENTS];
core_handler *handlers;
int n, i;

while (core->handlers_active || vector_size(&core->next))
{
handlers = vector_data(&core->next);
for (i = 0; (size_t) i < vector_size(&core->next); i ++)
(void) core_dispatch(&handlers[i], 0, 0);
vector_clear(&core->next, NULL);

n = epoll_wait(core->fd, events, CORE_MAX_EVENTS, -1);
if (n == -1)
{
core->errors ++;
break;
}

handlers = vector_data(&core->handlers);
for (i = 0; i < n; i ++)
(void) core_dispatch(&handlers[events[i].data.fd], 0, events[i].events);
}
}

void core_add(core *core, core_callback *callback, void *state, int fd, int events)
{
int e;
core_handler *handlers;

while (vector_size(&core->handlers) <= (size_t) fd)
vector_push_back(&core->handlers, &core_handler_default);

handlers = vector_data(&core->handlers);
handlers[fd] = (core_handler) {.callback = callback, .state = state};
e = epoll_ctl(core->fd, EPOLL_CTL_ADD, fd, (struct epoll_event[]){{.events = events, .data.fd = fd}});
if (e == -1)
{
handlers[fd] = core_handler_default;
core->errors ++;
}
else
core->handlers_active ++;
}

void core_delete(core *core, int fd)
{
core_handler *handlers;
int e;

if (fd < 0)
return;

e = epoll_ctl(core->fd, EPOLL_CTL_DEL, fd, NULL);
if (e == -1)
core->errors ++;
handlers = vector_data(&core->handlers);
handlers[fd] = core_handler_default;
core->handlers_active --;
}

int core_next(core *core,core_callback *callback, void *state)
{
core_handler handler = {.callback = callback, .state = state};

vector_push_back(&core->next, &handler);
return vector_size(&core->next);
}

void core_cancel(core *core, int id)
{
core_handler *handlers;

if (id == 0)
return;

handlers = vector_data(&core->next);
handlers[id - 1] = core_handler_default;
}
49 changes: 49 additions & 0 deletions src/dynamic/core.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef CORE_H_INCLUDED
#define CORE_H_INCLUDED

#define CORE_MAX_EVENTS 16

enum core_status
{
CORE_OK = 0,
CORE_ABORT = -1
};

typedef enum core_status core_status;
typedef struct core_event core_event;
typedef core_status core_callback(core_event *);
typedef struct core_handler core_handler;
typedef struct core core;

struct core_event
{
void *state;
int type;
uintptr_t data;
};

struct core_handler
{
core_callback *callback;
void *state;
};

struct core
{
int fd;
int errors;
vector handlers;
size_t handlers_active;
vector next;
};

void core_construct(core *);
void core_destruct(core *);
core_status core_dispatch(core_handler *, int, uintptr_t);
void core_loop(core *);
void core_add(core *, core_callback *, void *, int, int);
void core_delete(core *, int);
int core_queue(core *, core_callback *, void *);
void core_cancel(core *, int);

#endif /* CORE_H_INCLUDED */
10 changes: 5 additions & 5 deletions src/dynamic/pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static void pool_flush(pool *pool)
if (n == -1)
{
if (errno != EAGAIN)
pool->error ++;
pool->errors ++;
break;
}
list_splice(list_front(&pool->messages_transit), message);
Expand All @@ -77,7 +77,7 @@ static void pool_maintain(pool *pool)
if (e == -1)
{
list_erase(worker, NULL);
pool->error ++;
pool->errors ++;
return;
}
pool->workers_count ++;
Expand Down Expand Up @@ -105,7 +105,7 @@ void pool_construct(pool *pool)

e = socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, fd);
if (e == -1)
pool->error ++;
pool->errors ++;

pool->socket = fd[0];
pool->workers_socket = fd[1];
Expand Down Expand Up @@ -156,7 +156,7 @@ int pool_fd(pool *pool)

int pool_error(pool *pool)
{
return pool->error != 0;
return pool->errors != 0;
}

void pool_enqueue(pool *pool, pool_callback *callback, void *state)
Expand Down Expand Up @@ -184,7 +184,7 @@ void *pool_collect(pool *pool, int flags)
if (n == -1)
{
if (errno != EAGAIN)
pool->error ++;
pool->errors ++;
return NULL;
}

Expand Down
2 changes: 1 addition & 1 deletion src/dynamic/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct pool_message
struct pool
{
int socket;
int error;
int errors;

size_t workers_min;
size_t workers_max;
Expand Down

0 comments on commit 2feb7bc

Please sign in to comment.