Skip to content

Commit

Permalink
* Added default main(int, char*[]) implementation that just run modul…
Browse files Browse the repository at this point in the history
…es_ctx_loop on each context. It is a weak symbol thus overridable.

* Allow to unsubscribe from a deregistered topic
* Avoid automatically unsubscribing when topic gets deregistered
* module_(un)load() interface is now simpler
  • Loading branch information
FedeDP committed Jul 4, 2019
1 parent 50d20d3 commit 2a7d11b
Show file tree
Hide file tree
Showing 20 changed files with 156 additions and 126 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake")

include(GNUInstallDirs)

find_package (Threads)

#
# Options
#
Expand Down Expand Up @@ -99,7 +101,7 @@ target_include_directories(${PROJECT_NAME} PRIVATE Lib/ Lib/public/module/)
#
# KQUEUE_LIBRARIES will be empty where native epoll/kqueue are supported
#
target_link_libraries(${PROJECT_NAME} ${KQUEUE_LIBRARIES} dl)
target_link_libraries(${PROJECT_NAME} ${KQUEUE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} dl)

install(TARGETS ${PROJECT_NAME}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
Expand Down
46 changes: 18 additions & 28 deletions Lib/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ static module_ret_code _deregister_fd(module *mod, const int fd);
static int manage_fds(module *mod, m_context *c, const int flag, const bool stop);
static module_ret_code start(module *mod, const char *err_str);
static module_ret_code stop(module *mod, const char *err_str, const bool stop);
static int subscribtions_dump(void *data, const char *key, void *value);
static map_ret_code subscribtions_dump(void *data, const char *key, void *value);

static module_ret_code init_ctx(const char *ctx_name, m_context **context) {
MODULE_DEBUG("Creating context '%s'.\n", ctx_name);
Expand Down Expand Up @@ -217,7 +217,7 @@ static module_ret_code stop(module *mod, const char *err_str, const bool stop) {
return MOD_OK;
}

static int subscribtions_dump(void *data, const char *key, void *value) {
static map_ret_code subscribtions_dump(void *data, const char *key, void *value) {
const self_t *self = (self_t *) data;

module_log(self, "-> %s: %p\n", key, value);
Expand Down Expand Up @@ -341,10 +341,11 @@ module_ret_code module_deregister(self_t **self) {
return MOD_OK;
}

module_ret_code module_load(const char *module_path, const char *mod_name, const char *ctx_name) {
module_ret_code module_load(const char *module_path, const char *ctx_name) {
MOD_PARAM_ASSERT(module_path);
MOD_PARAM_ASSERT(mod_name);
MOD_PARAM_ASSERT(ctx_name);
FIND_CTX(ctx_name);

const int module_size = map_length(c->modules);

void *handle = dlopen(module_path, RTLD_NOW);
if (!handle) {
Expand All @@ -353,37 +354,26 @@ module_ret_code module_load(const char *module_path, const char *mod_name, const
}

/*
* Check this after as ctx may be created by loaded module.
* Check that:
* 1) requested ctx exists (or has been created)
* 2) requested module has been created in requested ctx
* Check that requested module has been created in requested ctx,
* by looking at requested ctx number of modules
*/
m_context *c = map_get(ctx, (char *)ctx_name);
if (!c) {
dlclose(handle);
return MOD_NO_CTX;
}

module *mod = map_get(c->modules, (char *)mod_name);
if (!mod) {
if (module_size == map_length(c->modules) || // no new module registered in requested ctx
map_put(c->loaded, module_path, handle, false, false) != MAP_OK) {

dlclose(handle);
return MOD_NO_MOD;
}

if (map_put(c->loaded, mod_name, handle, false, false) == MAP_OK) {
return MOD_OK;
return MOD_ERR;
}
return MOD_ERR;
return MOD_OK;
}

module_ret_code module_unload(const char *mod_name, char *ctx_name) {
MOD_PARAM_ASSERT(mod_name);
module_ret_code module_unload(const char *module_path, const char *ctx_name) {
MOD_PARAM_ASSERT(module_path);
FIND_CTX(ctx_name);

void *handle = map_get(c->loaded, mod_name);
if (handle) {
void *handle = map_get(c->loaded, module_path);
if (handle && map_remove(c->loaded, module_path) == MAP_OK) {
dlclose(handle);
map_remove(c->loaded, mod_name);

return MOD_OK;
}
return MOD_ERR;
Expand Down
65 changes: 64 additions & 1 deletion Lib/modules.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "modules.h"
#include "poll_priv.h"
#include <pthread.h>

static void *thread_loop(void *param);
static map_ret_code main_loop(void *data, const char *key, void *value);
static _ctor1_ void modules_init(void);
static _dtor0_ void modules_destroy(void);
static void evaluate_new_state(m_context *c);
Expand All @@ -12,6 +15,66 @@ static int recv_events(m_context *c, int timeout);
map_t *ctx;
memalloc_hook memhook;

void modules_pre_start(void) {
MODULE_DEBUG("Pre-starting libmodule.");
}

static void *thread_loop(void *param) {
const char *key = (const char *)param;

modules_ctx_loop_events(key, MODULES_MAX_EVENTS);
return NULL;
}

static map_ret_code main_loop(void *data, const char *key, void *value) {
pthread_t *th = *(pthread_t **)data;
if (th) {
static int i = 0;
pthread_create(&th[i++], NULL, thread_loop, (void *)key);
return MAP_OK;
}
*(char **)data = (char *)key;
return MAP_ERR;
}

/*
* This is an exported global weak symbol.
* It means that if a program does not implement any main(int, char *[]),
* this will be used by default.
*
* All it does is:
* if ctx_num > 1 -> allocating ctx_num pthreads and each of them will loop on its context
* else -> just loops on only ctx on main thread
*/
int main(int argc, char *argv[]) {
void *th = NULL;

/* If there is more than 1 registered ctx, alloc as many pthreads as needed */
if (map_length(ctx) > 1) {
MODULE_DEBUG("Allocating %d pthreads.\n", map_length(ctx));
th = memhook._calloc(map_length(ctx), sizeof(pthread_t));
}

/*
* main_loop returns MAP_ERR for single-ctx runs,
* where we only need a pointer to ctx key.
* Ugliness warning: passing a void** ptr that is either an array of pthreads
* or is just a space to point to single-ctx key.
*/
if (map_iterate(ctx, main_loop, &th) == MAP_ERR) {
MODULE_DEBUG("Running in single ctx mode: '%s'\n", (const char *)th);
return modules_ctx_loop_events((const char *)th, MODULES_MAX_EVENTS);
}

/* If more than 1 ctx is registered, we should join all threads */
MODULE_DEBUG("Waiting all threads.\n");
for (int i = 0; i < map_length(ctx); i++) {
pthread_join(((pthread_t *)th)[i], NULL);
}
memhook._free(th);
return MOD_OK;
}

static void modules_init(void) {
MODULE_DEBUG("Initializing libmodule %d.%d.%d.\n", MODULE_VERSION_MAJ, MODULE_VERSION_MIN, MODULE_VERSION_PAT);
modules_set_memalloc_hook(NULL);
Expand Down Expand Up @@ -182,7 +245,7 @@ module_ret_code modules_ctx_dispatch(const char *ctx_name, int *ret) {
if (!c->looping) {
/* Ok, start now */
*ret = 0;
return loop_start(c, MODULE_MAX_EVENTS);
return loop_start(c, MODULES_MAX_EVENTS);
}

if (c->quit) {
Expand Down
4 changes: 2 additions & 2 deletions Lib/public/module/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ _public_ module_ret_code module_register(const char *name, const char *ctx_name,
_public_ module_ret_code module_deregister(self_t **self);

/* External shared object module runtime loading */
_public_ module_ret_code module_load(const char *module_path, const char *mod_name, const char *ctx_name);
_public_ module_ret_code module_unload(const char *mod_name, char *ctx_name);
_public_ module_ret_code module_load(const char *module_path, const char *ctx_name);
_public_ module_ret_code module_unload(const char *module_path, const char *ctx_name);

/* Module state getters */
_public_ _pure_ bool module_is(const self_t *self, const enum module_states st);
Expand Down
16 changes: 10 additions & 6 deletions Lib/public/module/module_cmn.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@

/*
* ctors order:
* 1) modules_pre_start()
* 2) modules_init()
* 3) each module_pre_start()
* 4) each module ctor function
* 0) modules_pre_start()
* 1) modules_init()
* 2) each module_pre_start() (only module_easy API)
* 3) each module ctor function (only module_easy API)
*
* dtors order:
* 1) each module's dtor function (only module_easy API)
* 0) modules_destroy()
*/
#define _ctor0_ __attribute__((constructor (110)))
#define _ctor1_ __attribute__((constructor (111)))
Expand All @@ -29,8 +33,8 @@
#define _weak_ __attribute__((weak))
#define _public_ __attribute__ ((visibility("default")))

#define MODULE_DEFAULT_CTX "default"
#define MODULE_MAX_EVENTS 64
#define MODULES_DEFAULT_CTX "default"
#define MODULES_MAX_EVENTS 64

/** Structs types **/

Expand Down
5 changes: 4 additions & 1 deletion Lib/public/module/module_easy.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ static void _ctor3_ constructor(void) { \
static void _dtor1_ destructor(void) { module_deregister((self_t **)&self()); } \
static void _ctor2_ module_pre_start(void)

#define MODULE(name) MODULE_CTX(name, MODULE_DEFAULT_CTX)
#define MODULE(name) MODULE_CTX(name, MODULES_DEFAULT_CTX)

/* Defines for easy API (with no need bothering with both _self and ctx) */
#define m_load(path) module_load(path, MODULES_DEFAULT_CTX)
#define m_unload(path) module_unload(path, MODULES_DEFAULT_CTX)

#define m_is(state) module_is(self(), state)
#define m_dump() module_dump(self())

Expand Down
1 change: 1 addition & 0 deletions Lib/public/module/modules.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ extern "C"{
#endif

_public_ void _ctor0_ _weak_ modules_pre_start(void);
_public_ int _weak_ main(int argc, char *argv[]);
_public_ module_ret_code modules_set_memalloc_hook(const memalloc_hook *hook);
_public_ module_ret_code modules_ctx_set_logger(const char *ctx_name, const log_cb logger);
_public_ module_ret_code modules_ctx_loop_events(const char *ctx_name, const int max_events);
Expand Down
12 changes: 6 additions & 6 deletions Lib/public/module/modules_easy.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#include "modules.h"

/* Defines for easy API (with no need bothering with both self and ctx) */
#define modules_set_logger(log) modules_ctx_set_logger(MODULE_DEFAULT_CTX, log)
#define modules_loop() modules_ctx_loop_events(MODULE_DEFAULT_CTX, MODULE_MAX_EVENTS)
#define modules_quit(code) modules_ctx_quit(MODULE_DEFAULT_CTX, code)
#define modules_set_logger(log) modules_ctx_set_logger(MODULES_DEFAULT_CTX, log)
#define modules_loop() modules_ctx_loop_events(MODULES_DEFAULT_CTX, MODULES_MAX_EVENTS)
#define modules_quit(code) modules_ctx_quit(MODULES_DEFAULT_CTX, code)

/* Define for easy looping without having to set a events limit */
#define modules_ctx_loop(ctx) modules_ctx_loop_events(ctx, MODULE_MAX_EVENTS)
#define modules_ctx_loop(ctx) modules_ctx_loop_events(ctx, MODULES_MAX_EVENTS)

#define modules_get_fd(fd) modules_ctx_get_fd(MODULE_DEFAULT_CTX, fd)
#define modules_dispatch(ret) modules_ctx_dispatch(MODULE_DEFAULT_CTX, ret)
#define modules_get_fd(fd) modules_ctx_get_fd(MODULES_DEFAULT_CTX, fd)
#define modules_dispatch(ret) modules_ctx_dispatch(MODULES_DEFAULT_CTX, ret)
19 changes: 2 additions & 17 deletions Lib/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ static void destroy_pubsub_msg(pubsub_priv_t *pubsub_msg);
static module_ret_code tell_pubsub_msg(pubsub_priv_t *m, module *mod, m_context *c);
static module_ret_code publish_msg(const module *mod, const char *topic, const unsigned char *message,
const ssize_t size, const bool autofree);
static map_ret_code unsubscribe(void *data, const char *key, void *value);

static map_ret_code tell_if(void *data, const char *key, void *value) {
module *mod = (module *)value;
Expand Down Expand Up @@ -91,16 +90,6 @@ static module_ret_code publish_msg(const module *mod, const char *topic, const u
return MOD_ERR;
}

static map_ret_code unsubscribe(void *data, const char *key, void *value) {
module *mod = (module *)value;
const char *topic = (const char *)data;

if (map_has_key(mod->subscriptions, topic)) {
map_remove(mod->subscriptions, topic);
}
return MAP_OK;
}

/** Private API **/

module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const self_t *sender, const char *topic) {
Expand Down Expand Up @@ -180,8 +169,6 @@ module_ret_code module_deregister_topic(const self_t *self, const char *topic) {
/* Only same mod which registered topic can deregister it */
if (tmp == mod) {
if (map_remove(c->topics, topic) == MAP_OK) {
/* Automatically unsubscribe any module subscribed to topic */
map_iterate(c->modules, unsubscribe, (void *)topic);
tell_system_pubsub_msg(c, TOPIC_DEREGISTERED, self, topic);
return MOD_OK;
}
Expand All @@ -207,11 +194,9 @@ module_ret_code module_subscribe(const self_t *self, const char *topic) {
module_ret_code module_unsubscribe(const self_t *self, const char *topic) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);
GET_CTX(self);

if (map_has_key(c->topics, topic) &&
(!map_has_key(mod->subscriptions, topic) ||
map_remove(mod->subscriptions, topic) == MAP_OK)) {
if (!map_has_key(mod->subscriptions, topic) ||
map_remove(mod->subscriptions, topic) == MAP_OK) {

return MOD_OK;
}
Expand Down
17 changes: 6 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Indeed, libmodule was heavily inspired by my own actor library experience with [

Unsurprisingly, module is the core concept of libmodule architecture.
A module is an Actor that can listen on socket events too.
Frankly speaking, it is denoted by a MODULE() macro plus a bunch of callbacks, eg:
Frankly speaking, it is denoted by a MODULE() macro plus a bunch of mandatory callbacks, eg:
```
cat pippo.c
Expand All @@ -23,16 +23,18 @@ cat pippo.c
#include <string.h>
#include <ctype.h>
MODULE("Foo");
MODULE("Pippo");
static void init(void) {
m_register_fd(STDIN_FILENO, 0, NULL);
}
/* Should module be registered? */
static bool check(void) {
return true;
}
/* Should module be started? */
static bool evaluate(void) {
return true;
}
Expand Down Expand Up @@ -61,16 +63,9 @@ static void receive(const msg_t *msg, const void *userdata) {
}
}
```
```
cat main.c
#include <module/modules_easy.h>

int main() {
return modules_loop();
}
```
In this example, a "Foo" module is created and will read chars from stdin until 'q' is pressed.
In this example, a "Pippo" module is created and will read chars from stdin until 'q' is pressed.
Note that it does not even need a main function, as libmodule already provides a default one (as a [weak, thus overridable, symbol](https://gcc.gnu.org/onlinedocs/gcc-3.2/gcc/Function-Attributes.html)).

## Is it portable?

Expand Down
1 change: 1 addition & 0 deletions Samples/Cpp/doggo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ MODULE("Doggo");
* Use this to set some global state needed eg: in check() function
*/
static void module_pre_start(void) {
printf("Press 'c' to start playing with your own doggo...\n");
}

/*
Expand Down
17 changes: 0 additions & 17 deletions Samples/Cpp/main.cpp

This file was deleted.

0 comments on commit 2a7d11b

Please sign in to comment.