Skip to content

Commit

Permalink
Merge pull request #1 from FedeDP/multi_fds_mod
Browse files Browse the repository at this point in the history
Multi fds mod
  • Loading branch information
FedeDP committed Mar 31, 2018
2 parents f22de98 + c2bb95e commit 74254b2
Show file tree
Hide file tree
Showing 17 changed files with 255 additions and 117 deletions.
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ set_target_properties(${PROJECT_NAME} PROPERTIES
PUBLIC_HEADER "${PUBLIC_H}"
)

# build with -DMAX_EVENTS=X to overwrite the default value of 512
# MAX_EVENTS are max number of fds for each context.
if (NOT MAX_EVENTS)
set(MAX_EVENTS 512)
endif (NOT MAX_EVENTS)
message(STATUS "MAX_EVENTS=${MAX_EVENTS}")
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Wl,--no-undefined -Wshadow -Wtype-limits -Wstrict-overflow -fno-strict-aliasing -Wformat -Wformat-security")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99 -D_GNU_SOURCE -fvisibility=hidden")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99 -D_GNU_SOURCE -fvisibility=hidden -DMAX_EVENTS=${MAX_EVENTS}")

configure_file(Extra/libmodule.pc.in libmodule.pc @ONLY)

Expand Down
1 change: 1 addition & 0 deletions Lib/hashmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ map_t hashmap_new(void) {
m->data = calloc(INITIAL_SIZE, sizeof(hashmap_element));
if (m->data) {
m->table_size = INITIAL_SIZE;
m->size = 0;
} else {
hashmap_free(m);
m = NULL;
Expand Down
109 changes: 78 additions & 31 deletions Lib/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ static module_ret_code add_subscription(module *mod, const char *topic);
static int tell_if(void *data, void *m);
static pubsub_msg_t *create_pubsub_msg(const char *message, const char *sender, const char *topic);
static module_ret_code tell_pubsub_msg(pubsub_msg_t *m, module *mod, m_context *c);
static int manage_fds(module *mod, m_context *c, int type, int stop);
static module_ret_code start_children(module *m);
static module_ret_code stop_children(module *m);

Expand Down Expand Up @@ -79,6 +80,7 @@ module_ret_code module_register(const char *name, const char *ctx_name, const se
mod->state = IDLE;
mod->self.name = strdup(name);
mod->self.ctx = strdup(ctx_name);
mod->fds = NULL;
mod->subscriptions = hashmap_new();
*self = &mod->self;
evaluate_module(NULL, mod);
Expand Down Expand Up @@ -148,8 +150,8 @@ int evaluate_module(void *data, void *m) {
if (module_is(&mod->self, IDLE)
&& mod->hook->evaluate()) {

int fd = mod->hook->init();
module_start(&mod->self, fd);
mod->hook->init();
module_start(&mod->self);
}
return MAP_OK;
}
Expand Down Expand Up @@ -178,22 +180,61 @@ module_ret_code module_set_userdata(const self_t *self, const void *userdata) {
return MOD_OK;
}

module_ret_code module_update_fd(const self_t *self, int new_fd, int close_old) {
GET_MOD_IN_STATE(self, RUNNING);
/*
* Append this fd to our list of fds and
* if module is in RUNNING state, start listening on its events
*/
module_ret_code module_add_fd(const self_t *self, int fd) {
GET_MOD(self);
MOD_ASSERT(c->num_fds < MAX_EVENTS, "Reached max number of events for this context.", MOD_ERR);

/* De-register this fd from epoll */
int ret = epoll_ctl(c->epollfd, EPOLL_CTL_DEL, mod->fd, NULL);
if (!ret) {
if (close_old) {
close(mod->fd);
module_poll_t *tmp = malloc(sizeof(module_poll_t));
MOD_ASSERT(tmp, "Failed to malloc.", MOD_ERR);

tmp->fd = fd;
tmp->ev.data.ptr = (void *)tmp;
tmp->ev.events = EPOLLIN;
tmp->prev = mod->fds;
tmp->self = (void *)self;
mod->fds = tmp;
c->num_fds++;

/* If a fd is added at runtime, start polling on it */
if (module_is(self, RUNNING)) {
if (!epoll_ctl(c->epollfd, EPOLL_CTL_ADD, tmp->fd, &tmp->ev)) {
return MOD_OK;
}
return MOD_ERR;
}
return MOD_OK;
}

/* Linearly searching for fd */
module_ret_code module_rm_fd(const self_t *self, int fd, int close_fd) {
GET_MOD_IN_STATE(self, RUNNING);

MOD_ASSERT(mod->fds, "No fd registered in this module.", MOD_ERR);
module_poll_t **tmp = &mod->fds;

mod->fd = new_fd;
/* Register new fd */
ret = epoll_ctl(c->epollfd, EPOLL_CTL_ADD, mod->fd, &mod->ev);
if (!ret) {
while (*tmp) {
if ((*tmp)->fd == fd) {
module_poll_t *t = *tmp;
*tmp = (*tmp)->prev;
if (close_fd) {
close(t->fd);
}
free(t);
c->num_fds--;
return MOD_OK;
}
tmp = &(*tmp)->prev;
}
return MOD_ERR;
}

module_ret_code module_update_fd(const self_t *self, int old_fd, int new_fd, int close_old) {
if (module_rm_fd(self, old_fd, close_old) == MOD_OK) {
return module_add_fd(self, new_fd);
}
return MOD_ERR;
}
Expand Down Expand Up @@ -291,22 +332,34 @@ int module_is(const self_t *self, const enum module_states st) {

/** Module state setters **/

module_ret_code module_start(const self_t *self, int fd) {
static int manage_fds(module *mod, m_context *c, int type, int stop) {
module_poll_t *tmp = mod->fds, *t = NULL;
int ret = 0;

while (tmp && !ret) {
if (stop) {
ret = close(tmp->fd);
t = tmp;
} else {
ret = epoll_ctl(c->epollfd, type, tmp->fd, &tmp->ev);
}
tmp = tmp->prev;
free(t); // only used when called with stop: properly free module_poll_t
}
return ret;
}

module_ret_code module_start(const self_t *self) {
GET_MOD_IN_STATE(self, IDLE);

mod->fd = fd;
MODULE_DEBUG("Starting module %s.\n", self->name);
return module_resume(self);
}

module_ret_code module_pause(const self_t *self) {
GET_MOD_IN_STATE(self, RUNNING);

int ret = 0;
if (mod->fd != MODULE_DONT_POLL) {
ret = epoll_ctl(c->epollfd, EPOLL_CTL_DEL, mod->fd, NULL);
}
if (!ret) {
if (!manage_fds(mod, c, EPOLL_CTL_DEL, 0)) {
mod->state = PAUSED;
return MOD_OK;
}
Expand All @@ -316,13 +369,7 @@ module_ret_code module_pause(const self_t *self) {
module_ret_code module_resume(const self_t *self) {
GET_MOD_IN_STATE(self, IDLE | PAUSED);

int ret = 0;
if (mod->fd != MODULE_DONT_POLL) {
mod->ev.data.ptr = (void *)self;
mod->ev.events = EPOLLIN;
ret = epoll_ctl(c->epollfd, EPOLL_CTL_ADD, mod->fd, &mod->ev);
}
if (!ret) {
if (!manage_fds(mod, c, EPOLL_CTL_ADD, 0)) {
mod->state = RUNNING;
return MOD_OK;
}
Expand All @@ -333,17 +380,17 @@ module_ret_code module_stop(const self_t *self) {
GET_MOD_IN_STATE(self, RUNNING);

MODULE_DEBUG("Stopping module %s.\n", self->name);
mod->state = STOPPED;
if (mod->fd == MODULE_DONT_POLL || close(mod->fd) == 0) { // implicitly calls EPOLL_CTL_DEL
if (!manage_fds(mod, c, -1, 1)) { // implicitly calls EPOLL_CTL_DEL
mod->state = STOPPED;
return MOD_OK;
}
return MOD_ERR;
}

static module_ret_code start_children(module *m) {
CHILDREN_LOOP({
int fd = mod->hook->init();
module_start(&mod->self, fd);
mod->hook->init();
module_start(&mod->self);
});
return MOD_OK;
}
Expand Down
42 changes: 21 additions & 21 deletions Lib/module.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
#pragma once

#include <limits.h>
#include <module_cmn.h>

/* Convenience macros */
#define MODULE_DONT_POLL INT_MIN

/* Interface Macros */
#define MODULE_CTX(name, ctx) \
static void _ctor2_ module_pre_start(void); \
static int init(void); \
static void init(void); \
static int check(void); \
static int evaluate(void); \
static void receive(const msg_t *msg, const void *userdata); \
Expand All @@ -26,20 +22,22 @@
#define MODULE(name) MODULE_CTX(name, DEFAULT_CTX)

/* Defines for easy API (with no need bothering with both self and ctx) */
#define m_is(state) module_is(self, state)
#define m_start(fd) module_start(self, fd)
#define m_pause() module_pause(self)
#define m_resume() module_resume(self)
#define m_stop() module_stop(self)
#define m_become(x) module_become(self, receive_##x)
#define m_unbecome() module_become(self, receive)
#define m_set_userdata(userdata) module_set_userdata(self, userdata)
#define m_update_fd(fd, close_old) module_update_fd(self, fd, close_old)
#define m_log(fmt, ...) module_log(self, fmt, ##__VA_ARGS__)
#define m_subscribe(topic) module_subscribe(self, topic)
#define m_tell(recipient, msg) module_tell(self, recipient, msg)
#define m_publish(topic, msg) module_publish(self, topic, msg)
#define m_broadcast(msg) module_publish(self, NULL, msg)
#define m_is(state) module_is(self, state)
#define m_start(fd) module_start(self)
#define m_pause() module_pause(self)
#define m_resume() module_resume(self)
#define m_stop() module_stop(self)
#define m_become(x) module_become(self, receive_##x)
#define m_unbecome() module_become(self, receive)
#define m_set_userdata(userdata) module_set_userdata(self, userdata)
#define m_add_fd(fd) module_add_fd(self, fd)
#define m_rm_fd(fd, close_fd) module_rm_fd(self, fd, close_fd)
#define m_update_fd(old, new, close_old) module_update_fd(self, old, new, close_old)
#define m_log(fmt, ...) module_log(self, fmt, ##__VA_ARGS__)
#define m_subscribe(topic) module_subscribe(self, topic)
#define m_tell(recipient, msg) module_tell(self, recipient, msg)
#define m_publish(topic, msg) module_publish(self, topic, msg)
#define m_broadcast(msg) module_publish(self, NULL, msg)

/* Module interface functions */

Expand All @@ -53,7 +51,7 @@ module_ret_code module_binds_to(const self_t *self, const char *parent);
_public_ int module_is(const self_t *self, const enum module_states st);

/* Module state setters */
_public_ module_ret_code module_start(const self_t *self, int fd);
_public_ module_ret_code module_start(const self_t *self);
_public_ module_ret_code module_pause(const self_t *self);
_public_ module_ret_code module_resume(const self_t *self);
_public_ module_ret_code module_stop(const self_t *self);
Expand All @@ -62,7 +60,9 @@ _public_ module_ret_code module_stop(const self_t *self);
_public_ module_ret_code module_become(const self_t *self, recv_cb new_recv);
_public_ module_ret_code module_log(const self_t *self, const char *fmt, ...);
_public_ module_ret_code module_set_userdata(const self_t *self, const void *userdata);
_public_ module_ret_code module_update_fd(const self_t *self, int new_fd, int close_old);
_public_ module_ret_code module_add_fd(const self_t *self, int fd);
_public_ module_ret_code module_rm_fd(const self_t *self, int fd, int close_fd);
_public_ module_ret_code module_update_fd(const self_t *self, int old_fd, int new_fd, int close_old);

/* Module PubSub interface */
_public_ module_ret_code module_subscribe(const self_t *self, const char *topic);
Expand Down
2 changes: 1 addition & 1 deletion Lib/module_cmn.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ typedef struct {
} msg_t;

/* Callbacks typedefs */
typedef int(*init_cb)(void);
typedef void(*init_cb)(void);
typedef int(*evaluate_cb)(void);
typedef void(*recv_cb)(const msg_t *msg, const void *userdata);
typedef void(*destroy_cb)(void);
Expand Down
11 changes: 9 additions & 2 deletions Lib/module_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ struct _self {
const char *ctx; // module's ctx
};

typedef struct _poll_t {
int fd;
struct epoll_event ev; // fd's epoll event struct
self_t *self; // ptr needed to map a fd to a self_t in epoll
struct _poll_t *prev;
} module_poll_t;

typedef struct child {
const self_t *self; // module's name
struct child *next; // module's ctx
Expand All @@ -58,15 +65,15 @@ typedef struct {
const void *userdata; // module's user defined data
enum module_states state; // module's state
self_t self; // module's info available to external world
int fd; // file descriptor to be polled
module_poll_t *fds; // module's fds to be polled
map_t subscriptions; // module's subscriptions
struct epoll_event ev; // module's epoll event struct
child_t *children; // list of children modules
} module;

typedef struct {
int quit;
int epollfd;
int num_fds; // number of fds in this context
log_cb logger;
map_t modules;
} m_context;
Expand Down
35 changes: 13 additions & 22 deletions Lib/modules.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,23 @@ module_ret_code modules_ctx_set_logger(const char *ctx_name, log_cb logger) {
module_ret_code modules_ctx_loop(const char *ctx_name) {
GET_CTX(ctx_name);

int size = hashmap_length(c->modules);
struct epoll_event *pevents = calloc(size, sizeof(struct epoll_event));
MOD_ASSERT(pevents, "Failed to malloc.", MOD_ERR);

int ret = MOD_OK;
struct epoll_event pevents[MAX_EVENTS] = {{ 0 }};
while (!c->quit) {
int nfds = epoll_wait(c->epollfd, pevents, size, -1);
if (nfds < 0) {
ret = MOD_ERR;
break;
} else {
for (int i = 0; i < nfds; i++) {
if (pevents[i].events & EPOLLIN) {
self_t *self = (self_t *)pevents[i].data.ptr;

CTX_GET_MOD(self->name, c);

const msg_t msg = { mod->fd, NULL };
mod->hook->recv(&msg, mod->userdata);
}
int nfds = epoll_wait(c->epollfd, pevents, c->num_fds, -1);
MOD_ASSERT(nfds > 0, "Epoll_wait error.", MOD_ERR);
for (int i = 0; i < nfds; i++) {
if (pevents[i].events & EPOLLIN) {
module_poll_t *p = (module_poll_t *)pevents[i].data.ptr;

CTX_GET_MOD(p->self->name, c);

const msg_t msg = { p->fd, NULL };
mod->hook->recv(&msg, mod->userdata);
}
evaluate_new_state(c);
}
evaluate_new_state(c);
}
free(pevents);
return ret;
return MOD_OK;
}

static void evaluate_new_state(m_context *context) {
Expand Down
3 changes: 1 addition & 2 deletions Samples/Easy/doggo.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ static void module_pre_start(void) {
* Initializes this module's state;
* returns a valid fd to be polled.
*/
static int init(void) {
static void init(void) {
/* Doggo is subscribed to "leaving" topic */
m_subscribe("leaving");
return MODULE_DONT_POLL;
}

/*
Expand Down

0 comments on commit 74254b2

Please sign in to comment.