Skip to content

Commit

Permalink
Added 2 new APIs functions: modules_ctx_get_fd and modules_ctx_dispat…
Browse files Browse the repository at this point in the history
…ch; they are useful to integrate libmodule's loop inside your own loop.
  • Loading branch information
FedeDP committed Mar 10, 2019
1 parent 359dda7 commit eae5dea
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 75 deletions.
192 changes: 122 additions & 70 deletions Lib/modules.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

static _ctor1_ void modules_init(void);
static _dtor0_ void modules_destroy(void);
static void evaluate_new_state(m_context *context);
static void evaluate_new_state(m_context *c);
static module_ret_code loop_start(m_context *c, const int max_events);
static int recv_events(m_context *c);
static int loop_stop(m_context *c);

map_t *ctx;
memalloc_hook memhook;
Expand All @@ -19,8 +22,88 @@ static void modules_destroy(void) {
map_free(ctx);
}

static void evaluate_new_state(m_context *context) {
map_iterate(context->modules, evaluate_module, NULL);
static void evaluate_new_state(m_context *c) {
map_iterate(c->modules, evaluate_module, NULL);
}

static module_ret_code loop_start(m_context *c, const int max_events) {
if (poll_init_pevents(&c->pevents, max_events) == MOD_OK) {
c->looping = true;
c->quit_code = 0;
c->max_events = max_events;

/* Tell every module that loop is started */
tell_system_pubsub_msg(c, LOOP_STARTED, NULL);
return MOD_OK;
}
return MOD_ERR;
}

static int recv_events(m_context *c) {
int nfds = poll_wait(c->fd, c->max_events, c->pevents);
for (int i = 0; i < nfds; i++) {
module_poll_t *p = poll_recv(i, c->pevents);
if (p && p->self && p->self->mod) {
module *mod = p->self->mod;

msg_t msg;
fd_msg_t fd_msg;

if (p->fd == mod->pubsub_fd[0]) {
/* Received on pubsub interface */
*(bool *)&msg.is_pubsub = true;
if (read(p->fd, (void **)&msg.pubsub_msg, sizeof(pubsub_msg_t *)) != sizeof(pubsub_msg_t *)) {
MODULE_DEBUG("Failed to read message for %s: %s\n", mod->name, strerror(errno));
*((pubsub_msg_t **)&msg.pubsub_msg) = NULL;
}
} else {
/* Received from FD */
*(bool *)&msg.is_pubsub = false;
*(int *)&fd_msg.fd = p->fd;
fd_msg.userptr = p->userptr;
*(fd_msg_t **)&msg.fd_msg = &fd_msg;
}

if (!msg.is_pubsub || msg.pubsub_msg) {
run_pubsub_cb(mod, &msg);

/* Properly free pubsub msg */
if (msg.is_pubsub) {
destroy_pubsub_msg((pubsub_msg_t *)msg.pubsub_msg);
}
}
} else {
/* Forward error to below handling code */
errno = ENXIO;
nfds = -1;
}
}

if (nfds > 0) {
evaluate_new_state(c);
} else if (nfds < 0) {
if (errno != EINTR && errno != EAGAIN) {
fprintf(stderr, "Module loop error: %s.\n", strerror(errno));
c->quit = true;
c->quit_code = errno;
} else {
nfds = 0; // return < 0 only for real errors
}
}
return nfds;
}

static int loop_stop(m_context *c) {
/* Tell every module that loop is stopped */
tell_system_pubsub_msg(c, LOOP_STOPPED, NULL);

/* Flush pubsub msg to avoid memleaks */
map_iterate(c->modules, flush_pubsub_msg, NULL);

poll_destroy_pevents(&c->pevents, &c->max_events);
c->looping = false;
c->quit = false;
return c->quit_code;
}

/** Public API **/
Expand Down Expand Up @@ -55,75 +138,11 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, const int max_even
MOD_ASSERT(c->num_fds > 0, "No fds to loop on.", MOD_ERR);
MOD_ASSERT(!c->looping, "Context already looping.", MOD_ERR);

if (poll_init_pevents(&c->pevents, max_events) == MOD_OK) {
c->quit = false;
c->looping = true;
c->quit_code = 0;
c->max_events = max_events;

/* Tell every module that loop is started */
tell_system_pubsub_msg(c, LOOP_STARTED, NULL);

if (loop_start(c, max_events) == MOD_OK) {
while (!c->quit) {
int nfds = poll_wait(c->fd, c->max_events, c->pevents);
for (int i = 0; i < nfds; i++) {
module_poll_t *p = poll_recv(i, c->pevents);
if (p && p->self && p->self->mod) {
module *mod = p->self->mod;

msg_t msg;
fd_msg_t fd_msg;

if (p->fd == mod->pubsub_fd[0]) {
/* Received on pubsub interface */
*(bool *)&msg.is_pubsub = true;
if (read(p->fd, (void **)&msg.pubsub_msg, sizeof(pubsub_msg_t *)) != sizeof(pubsub_msg_t *)) {
MODULE_DEBUG("Failed to read message for %s: %s\n", mod->name, strerror(errno));
*((pubsub_msg_t **)&msg.pubsub_msg) = NULL;
}
} else {
/* Received from FD */
*(bool *)&msg.is_pubsub = false;
*(int *)&fd_msg.fd = p->fd;
fd_msg.userptr = p->userptr;
*(fd_msg_t **)&msg.fd_msg = &fd_msg;
}

if (!msg.is_pubsub || msg.pubsub_msg) {
run_pubsub_cb(mod, &msg);

/* Properly free pubsub msg */
if (msg.is_pubsub) {
destroy_pubsub_msg((pubsub_msg_t *)msg.pubsub_msg);
}
}
} else {
/* Forward error to below handling code */
errno = ENXIO;
nfds = -1;
}
}

if (nfds > 0) {
evaluate_new_state(c);
} else if (nfds < 0) {
if (errno != EINTR && errno != EAGAIN) {
fprintf(stderr, "Module loop error: %s.\n", strerror(errno));
c->quit = true;
c->quit_code = errno;
}
}
recv_events(c);
}

/* Tell every module that loop is stopped */
tell_system_pubsub_msg(c, LOOP_STOPPED, NULL);

/* Flush pubsub msg to avoid memleaks */
map_iterate(c->modules, flush_pubsub_msg, NULL);

poll_destroy_pevents(&c->pevents, &c->max_events);
c->looping = false;
return c->quit_code;
return loop_stop(c);
}
return MOD_ERR;
}
Expand All @@ -136,3 +155,36 @@ module_ret_code modules_ctx_quit(const char *ctx_name, const uint8_t quit_code)
c->quit_code = quit_code;
return MOD_OK;
}

module_ret_code modules_ctx_get_fd(const char *ctx_name, int *fd) {
MOD_PARAM_ASSERT(fd);
FIND_CTX(ctx_name);

*fd = dup(c->fd);
return MOD_OK;
}

module_ret_code modules_ctx_dispatch(const char *ctx_name, int *ret) {
MOD_PARAM_ASSERT(ret);
FIND_CTX(ctx_name);

if (!c->looping) {
/* Ok, start now */
*ret = 0;
return loop_start(c, MODULE_MAX_EVENTS);
}

if (c->quit) {
/* We are stopping! */
*ret = loop_stop(c);
/*
* MOD_ERR to let client know it's time to quit.
* You've called dispatch (for the first time) on a quitted ctx.
*/
return MOD_ERR;
}

/* Recv new events */
*ret = recv_events(c);
return *ret >= 0 ? MOD_OK : MOD_ERR;
}
3 changes: 3 additions & 0 deletions Lib/public/module/modules.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ _public_ module_ret_code modules_ctx_set_logger(const char *ctx_name, const log_
_public_ module_ret_code modules_ctx_loop_events(const char *ctx_name, const int max_events);
_public_ module_ret_code modules_ctx_quit(const char *ctx_name, const uint8_t quit_code);

_public_ module_ret_code modules_ctx_get_fd(const char *ctx_name, int *fd);
_public_ module_ret_code modules_ctx_dispatch(const char *ctx_name, int *ret);

#ifdef __cplusplus
}
#endif
3 changes: 3 additions & 0 deletions Lib/public/module/modules_easy.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@

/* Define for easy looping without having to set a events limit */
#define modules_ctx_loop(ctx) modules_ctx_loop_events(ctx, MODULE_MAX_EVENTS)

#define modules_get_fd(fd) modules_ctx_get_fd(MODULE_DEFAULT_CTX, fd)
#define modules_dispatch(ret) modules_ctx_dispatch(MODULE_DEFAULT_CTX, ret)
3 changes: 3 additions & 0 deletions Samples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ file(GLOB EASY_SRC Easy/*.c)
file(GLOB MULTICTX_SRC MultiCtx/*.c)
file(GLOB SHAREDSRC_SRC SharedSrc/*.c)
file(GLOB CPP_SRC Cpp/*.cpp)
file(GLOB POLL_SRC Poll/*.c)

include_directories("${PROJECT_SOURCE_DIR}/Lib/public/")

add_executable(Easy ${EASY_SRC})
add_executable(MultiCtx ${MULTICTX_SRC})
add_executable(SharedSrc ${SHAREDSRC_SRC})
add_executable(Cpp ${CPP_SRC})
add_executable(Poll ${POLL_SRC})

target_link_libraries(Easy ${PROJECT_NAME})
target_link_libraries(MultiCtx ${PROJECT_NAME} pthread)
target_link_libraries(SharedSrc ${PROJECT_NAME})
target_link_libraries(Cpp ${PROJECT_NAME})
target_link_libraries(Poll ${PROJECT_NAME})
100 changes: 100 additions & 0 deletions Samples/Poll/doggo.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <module/module_easy.h>
#include <unistd.h>
#include <string.h>

static void receive_sleeping(const msg_t *msg, const void *userdata);

/*
* Declare and automagically initialize
* this module as soon as program starts.
*/
MODULE("Doggo");

/*
* This function is automatically called before registering the module.
* Use this to set some global state needed eg: in check() function
*/
static void module_pre_start(void) {
}

/*
* Initializes this module's state;
* returns a valid fd to be polled.
*/
static void init(void) {

}

/*
* Whether this module should be actually created:
* true if module must be created, !true otherwise.
*
* Use this function as a starting filter:
* you may desire that a module is not started in certain conditions.
*/
static bool check(void) {
return true;
}

/*
* Should return not-0 value when module can be actually started (and thus polled).
* Use this to check intra-modules dependencies or any other env variable.
*
* Eg: you can evaluate your global state to make this module start right after
* certain conditions are met.
*/
static bool evaluate(void) {
return true;
}

/*
* Destroyer function, called at module unload (at end of program).
* Note that any module's fds are automatically closed for you.
*/
static void destroy(void) {

}

/*
* Default poll callback
*/
static void receive(const msg_t *msg, const void *userdata) {
if (msg->is_pubsub) {
switch (msg->pubsub_msg->type) {
case USER:
if (!strcmp((char *)msg->pubsub_msg->message, "ComeHere")) {
m_log("Running...\n");
m_tell_str(msg->pubsub_msg->sender, "BauBau");
} else if (!strcmp((char *)msg->pubsub_msg->message, "LetsPlay")) {
m_log("BauBau BauuBauuu!\n");
} else if (!strcmp((char *)msg->pubsub_msg->message, "LetsEat")) {
m_log("Burp!\n");
} else if (!strcmp((char *)msg->pubsub_msg->message, "LetsSleep")) {
m_become(sleeping);
m_log("ZzzZzz...\n");
} else if (!strcmp((char *)msg->pubsub_msg->message, "ByeBye")) {
m_log("Sob...\n");
} else if (!strcmp((char *)msg->pubsub_msg->message, "WakeUp")) {
m_log("???\n");
}
break;
case TOPIC_REGISTERED:
/* Doggo should subscribe to "leaving" topic */
m_subscribe(msg->pubsub_msg->topic);
break;
default:
break;
}
}
}

static void receive_sleeping(const msg_t *msg, const void *userdata) {
if (msg->is_pubsub && msg->pubsub_msg->type == USER) {
if (!strcmp((char *)msg->pubsub_msg->message, "WakeUp")) {
m_unbecome();
m_log("Yawn...\n");
} else {
m_log("ZzzZzz...\n");
}
}
}

0 comments on commit eae5dea

Please sign in to comment.