Skip to content

Commit

Permalink
Lots of changes and improvements:
Browse files Browse the repository at this point in the history
Pubsub API:
* Add a module_poisonpill() function, to send a system message to a module to stop it -> this will be enqueued in module's message queue, conversely to module_stop that should stop module right away freeing all its enqueued messages
* Always destroy messages in flush_pubsub_msg(), when a module gets stopped or a loop stops looping

Map API:
* Fixed value update in map implementation and added a specific test
* Improved hashmap implementation, switching to https://github.com/DavidLeeds/hashmap as base
* Added a stress test for hashmap, with 1mln of key,value pairs
* Added map_iterate tests

Stack API:
* Stack_iterate() follows same logic as map_iterate

Generic:
* Renamed modules_set_memalloc_hook to modules_set_memhook()
* Renamed memalloc_hook to memhook_t
  • Loading branch information
FedeDP committed Jul 17, 2019
1 parent 4623cf1 commit ff976c9
Show file tree
Hide file tree
Showing 17 changed files with 413 additions and 309 deletions.
465 changes: 244 additions & 221 deletions Lib/map.c

Large diffs are not rendered by default.

79 changes: 41 additions & 38 deletions Lib/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ static void default_logger(const self_t *self, const char *fmt, va_list args, co
static module_ret_code _register_fd(module *mod, const int fd, const bool autoclose, const void *userptr);
static module_ret_code _deregister_fd(module *mod, const int fd);
static int manage_fds(module *mod, m_context *c, const int flag, const bool stop);
static module_ret_code start(module *mod, const char *err_str);
static module_ret_code stop(module *mod, const char *err_str, const bool stop);
static map_ret_code subscribtions_dump(void *data, const char *key, void *value);

static module_ret_code init_ctx(const char *ctx_name, m_context **context) {
Expand Down Expand Up @@ -160,9 +158,31 @@ static int manage_fds(module *mod, m_context *c, const int flag, const bool stop
return ret;
}

static module_ret_code start(module *mod, const char *err_str) {
GET_CTX_PRIV((&mod->self));
static map_ret_code subscribtions_dump(void *data, const char *key, void *value) {
const self_t *self = (self_t *) data;

module_log(self, "-> %s: %p\n", key, value);
return MAP_OK;
}

/** Private API **/

bool _module_is(const module *mod, const enum module_states st) {
return mod->state & st;
}

map_ret_code evaluate_module(void *data, const char *key, void *value) {
module *mod = (module *)value;
if (_module_is(mod, IDLE) && mod->hook.evaluate()) {
start(mod, true);
}
return MAP_OK;
}

module_ret_code start(module *mod, const bool start) {
static const char *errors[2] = { "Failed to resume module.", "Failed to start module." };

GET_CTX_PRIV((&mod->self));

const bool was_idle = _module_is(mod, IDLE);

Expand All @@ -171,28 +191,31 @@ static module_ret_code start(module *mod, const char *err_str) {
* or after it was stopped.
* Properly add back its pubsub fds.
*/
if (!_module_is(mod, PAUSED)) {
if (start) {
/* THIS IS NOT A RESUME */
if (init_pubsub_fd(mod) != MOD_OK) {
return MOD_ERR;
}
}

int ret = manage_fds(mod, c, ADD, false);
MOD_ASSERT(!ret, err_str, MOD_ERR);
MOD_ASSERT(!ret, errors[start], MOD_ERR);

mod->state = RUNNING;

/* If this is first time module is started, call its init() callback */
if (was_idle) {
mod->hook.init();
}

tell_system_pubsub_msg(c, MODULE_STARTED, &mod->self, NULL);
MODULE_DEBUG("%s '%s'.\n", start ? "Started" : "Resumed", mod->name);
tell_system_pubsub_msg(NULL, c, MODULE_STARTED, &mod->self, NULL);
return MOD_OK;
}

static module_ret_code stop(module *mod, const char *err_str, const bool stop) {
module_ret_code stop(module *mod, const bool stop) {
static const char *errors[2] = { "Failed to pause module.", "Failed to stop module." };

GET_CTX_PRIV((&mod->self));

if (stop) {
Expand All @@ -203,11 +226,11 @@ static module_ret_code stop(module *mod, const char *err_str, const bool stop) {
* we will close and reopen its pubsub_fds,
* thus we are not able to retrieve its old messages anymore.
*/
flush_pubsub_msg(mod, NULL, mod);
flush_pubsub_msgs(NULL, NULL, mod);
}

int ret = manage_fds(mod, c, RM, stop);
MOD_ASSERT(!ret, err_str, MOD_ERR);
MOD_ASSERT(!ret, errors[stop], MOD_ERR);

mod->state = stop ? STOPPED : PAUSED;
/*
Expand All @@ -220,31 +243,11 @@ static module_ret_code stop(module *mod, const char *err_str, const bool stop) {
mod->pubsub_fd[1] = -1;
}

tell_system_pubsub_msg(c, MODULE_STOPPED, &mod->self, NULL);
MODULE_DEBUG("%s '%s'.\n", stop ? "Stopped" : "Paused", mod->name);
tell_system_pubsub_msg(NULL, c, MODULE_STOPPED, &mod->self, NULL);
return MOD_OK;
}

static map_ret_code subscribtions_dump(void *data, const char *key, void *value) {
const self_t *self = (self_t *) data;

module_log(self, "-> %s: %p\n", key, value);
return MAP_OK;
}

/** Private API **/

bool _module_is(const module *mod, const enum module_states st) {
return mod->state & st;
}

map_ret_code evaluate_module(void *data, const char *key, void *value) {
module *mod = (module *)value;
if (_module_is(mod, IDLE) && mod->hook.evaluate()) {
start(mod, "Failed to start module.");
}
return MAP_OK;
}

/** Public API **/

module_ret_code module_register(const char *name, const char *ctx_name, self_t **self, const userhook *hook) {
Expand Down Expand Up @@ -318,7 +321,7 @@ module_ret_code module_deregister(self_t **self) {
GET_MOD(tmp);
MODULE_DEBUG("Deregistering module '%s'.\n", mod->name);

stop(mod, "Failed to stop module.", true);
stop(mod, true);

/* Remove the module from the context */
map_remove(tmp->ctx->modules, mod->name);
Expand Down Expand Up @@ -482,23 +485,23 @@ module_ret_code module_dump(const self_t *self) {
module_ret_code module_start(const self_t *self) {
GET_MOD_IN_STATE(self, IDLE | STOPPED);

return start(mod, "Failed to start module.");
return start(mod, true);
}

module_ret_code module_pause(const self_t *self) {
GET_MOD_IN_STATE(self, RUNNING);

return stop(mod, "Failed to pause module.", false);
return stop(mod, false);
}

module_ret_code module_resume(const self_t *self) {
GET_MOD_IN_STATE(self, PAUSED);

return start(mod, "Failed to resume module.");
return start(mod, false);
}

module_ret_code module_stop(const self_t *self) {
GET_MOD_IN_STATE(self, RUNNING | PAUSED);

return stop(mod, "Failed to stop module.", true);
return stop(mod, true);
}
21 changes: 12 additions & 9 deletions Lib/modules.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ static inline int loop_quit(m_context *c, const uint8_t quit_code);
static int recv_events(m_context *c, int timeout);

map_t *ctx;
memalloc_hook memhook;
memhook_t memhook;

void modules_pre_start(void) {
MODULE_DEBUG("Pre-starting libmodule.");
Expand Down Expand Up @@ -51,7 +51,7 @@ int main(int argc, char *argv[]) {

/* If there is more than 1 registered ctx, alloc as many pthreads as needed */
if (map_length(ctx) > 1) {
MODULE_DEBUG("Allocating %d pthreads.\n", map_length(ctx));
MODULE_DEBUG("Allocating %ld pthreads.\n", map_length(ctx));
th = memhook._calloc(map_length(ctx), sizeof(pthread_t));
}

Expand All @@ -77,7 +77,7 @@ int main(int argc, char *argv[]) {

static void modules_init(void) {
MODULE_DEBUG("Initializing libmodule %d.%d.%d.\n", MODULE_VERSION_MAJ, MODULE_VERSION_MIN, MODULE_VERSION_PAT);
modules_set_memalloc_hook(NULL);
modules_set_memhook(NULL);
ctx = map_new();
}

Expand Down Expand Up @@ -107,18 +107,18 @@ static module_ret_code loop_start(m_context *c, const int max_events) {
evaluate_new_state(c);

/* Tell every RUNNING module that loop is started */
tell_system_pubsub_msg(c, LOOP_STARTED, NULL, NULL);
tell_system_pubsub_msg(NULL, c, LOOP_STARTED, NULL, NULL);
return MOD_OK;
}
return MOD_ERR;
}

static int loop_stop(m_context *c) {
/* Tell every module that loop is stopped */
tell_system_pubsub_msg(c, LOOP_STOPPED, NULL, NULL);
tell_system_pubsub_msg(NULL, c, LOOP_STOPPED, NULL, NULL);

/* Flush pubsub msg to avoid memleaks */
map_iterate(c->modules, flush_pubsub_msg, NULL);
map_iterate(c->modules, flush_pubsub_msgs, NULL);

poll_destroy_pevents(&c->pevents, &c->max_events);
c->looping = false;
Expand Down Expand Up @@ -160,8 +160,11 @@ static int recv_events(m_context *c, int timeout) {
msg.fd_msg = &fd_msg;
}

if (!msg.is_pubsub || msg.ps_msg) {
if (!msg.is_pubsub || (msg.ps_msg && msg.ps_msg->type != POISON_PILL)) {
run_pubsub_cb(mod, &msg);
} else if (msg.ps_msg->type == POISON_PILL) {
MODULE_DEBUG("PoisonPilling '%s'.\n", mod->name);
stop(mod, true);
}
} else {
/* Forward error to below handling code */
Expand All @@ -185,13 +188,13 @@ static int recv_events(m_context *c, int timeout) {

/** Public API **/

module_ret_code modules_set_memalloc_hook(const memalloc_hook *hook) {
module_ret_code modules_set_memhook(const memhook_t *hook) {
if (hook) {
MOD_ASSERT(hook->_malloc, "NULL malloc fn.", MOD_ERR);
MOD_ASSERT(hook->_realloc, "NULL realloc fn.", MOD_ERR);
MOD_ASSERT(hook->_calloc, "NULL calloc fn.", MOD_ERR);
MOD_ASSERT(hook->_free, "NULL free fn.", MOD_ERR);
memcpy(&memhook, hook, sizeof(memalloc_hook));
memcpy(&memhook, hook, sizeof(memhook_t));
} else {
memhook._malloc = malloc;
memhook._realloc = realloc;
Expand Down
8 changes: 5 additions & 3 deletions Lib/priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,17 @@ struct _context {
/* Defined in module.c */
_pure_ bool _module_is(const module *mod, const enum module_states st);
map_ret_code evaluate_module(void *data, const char *key, void *value);
module_ret_code start(module *mod, const bool start);
module_ret_code stop(module *mod, const bool stop);

/* Defined in pubsub.c */
module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type,
module_ret_code tell_system_pubsub_msg(module *mod, m_context *c, enum msg_type type,
const self_t *sender, const char *topic);
map_ret_code flush_pubsub_msg(void *data, const char *key, void *value);
map_ret_code flush_pubsub_msgs(void *data, const char *key, void *value);
void run_pubsub_cb(module *mod, msg_t *msg);

/* Defined in priv.c */
char *mem_strdup(const char *s);

extern map_t *ctx;
extern memalloc_hook memhook;
extern memhook_t memhook;
7 changes: 4 additions & 3 deletions Lib/public/module/map.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
/** Hashmap interface **/

typedef enum {
MAP_WRONG_PARAM = -5,
MAP_EPERM = -6,
MAP_WRONG_PARAM,
MAP_ERR,
MAP_MISSING,
MAP_FULL,
Expand All @@ -32,7 +33,7 @@ extern "C"{
_public_ map_t *map_new(void);
_public_ map_itr_t *map_itr_new(const map_t *m);
_public_ map_itr_t *map_itr_next(map_itr_t *itr);
_public_ map_ret_code map_itr_remove(const map_itr_t *itr);
_public_ map_ret_code map_itr_remove(map_itr_t *itr);
_public_ const char *map_itr_get_key(const map_itr_t *itr);
_public_ void *map_itr_get_data(const map_itr_t *itr);
_public_ map_ret_code map_itr_set_data(const map_itr_t *itr, void *value);
Expand All @@ -43,7 +44,7 @@ _public_ bool map_has_key(const map_t *m, const char *key);
_public_ map_ret_code map_remove(map_t *m, const char *key);
_public_ map_ret_code map_clear(map_t *m);
_public_ map_ret_code map_free(map_t *m);
_public_ int map_length(const map_t *m);
_public_ ssize_t map_length(const map_t *m);
_public_ map_ret_code map_set_dtor(map_t *m, map_dtor fn);

#ifdef __cplusplus
Expand Down
1 change: 1 addition & 0 deletions Lib/public/module/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ _public_ module_ret_code module_publish(const self_t *self, const char *topic, c
const ssize_t size, const bool autofree);
_public_ module_ret_code module_broadcast(const self_t *self, const void *message,
const ssize_t size, const bool autofree, bool global);
_public_ module_ret_code module_poisonpill(const self_t *self, const self_t *recipient);

#ifdef __cplusplus
}
Expand Down
4 changes: 2 additions & 2 deletions Lib/public/module/module_cmn.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ typedef struct _self self_t;
enum module_states { IDLE = 0x1, RUNNING = 0x2, PAUSED = 0x4, STOPPED = 0x8 };

/* PubSub message types */
enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED, MODULE_STARTED, MODULE_STOPPED };
enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED, MODULE_STARTED, MODULE_STOPPED, POISON_PILL };

typedef struct {
const char *topic;
Expand Down Expand Up @@ -97,7 +97,7 @@ typedef struct {
realloc_fn _realloc;
calloc_fn _calloc;
free_fn _free;
} memalloc_hook;
} memhook_t;

/* Module return codes */
typedef enum {
Expand Down
1 change: 1 addition & 0 deletions Lib/public/module/module_easy.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ static void _ctor2_ module_pre_start(void)
#define m_tell(recipient, msg, size, free) module_tell(self(), recipient, msg, size, free)
#define m_publish(topic, msg, size, free) module_publish(self(), topic, msg, size, free)
#define m_broadcast(msg, size, free, global) module_broadcast(self(), msg, size, free, global)
#define m_poisonpill(recipient) module_poisonpill(self(), recipient)
#define m_tell_str(recipient, msg) module_tell(self(), recipient, (const void *)msg, strlen(msg), false)
#define m_publish_str(topic, msg) module_publish(self(), topic, (const void *)msg, strlen(msg), false)
#define m_broadcast_str(msg, global) module_broadcast(self(), (const void *)msg, strlen(msg), false, global)
2 changes: 1 addition & 1 deletion Lib/public/module/modules.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ extern "C"{

_public_ void _ctor0_ _weak_ modules_pre_start(void);
_public_ int _weak_ main(int argc, char *argv[]);
_public_ module_ret_code modules_set_memalloc_hook(const memalloc_hook *hook);
_public_ module_ret_code modules_set_memhook(const memhook_t *hook);
_public_ module_ret_code modules_ctx_set_logger(const char *ctx_name, const log_cb logger);
_public_ module_ret_code modules_ctx_loop_events(const char *ctx_name, const int max_events);
_public_ module_ret_code modules_ctx_quit(const char *ctx_name, const uint8_t quit_code);
Expand Down
2 changes: 1 addition & 1 deletion Lib/public/module/stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ _public_ void *stack_pop(stack_t *s);
_public_ void *stack_peek(const stack_t *s);
_public_ stack_ret_code stack_clear(stack_t *s);
_public_ stack_ret_code stack_free(stack_t *s);
_public_ int stack_length(const stack_t *s);
_public_ ssize_t stack_length(const stack_t *s);
_public_ stack_ret_code stack_set_dtor(stack_t *s, stack_dtor fn);

#ifdef __cplusplus
Expand Down

0 comments on commit ff976c9

Please sign in to comment.