diff --git a/Lib/module.c b/Lib/module.c index 5d9ce1e..84fd4b8 100644 --- a/Lib/module.c +++ b/Lib/module.c @@ -306,36 +306,9 @@ module_ret_code module_unsubscribe(const self_t *self, const char *topic) { return MOD_ERR; } -module_ret_code tell_system_pubsub_msg(m_context *c, enum sys_msg_t type, ...) { - pubsub_msg_t m = { .topic = NULL, .sender = NULL, .message = NULL, .type = SYSTEM }; - switch (type) { - case LOOP_STARTED: - m.message = "LOOP_STARTED"; - break; - case LOOP_STOPPED: - m.message = "LOOP_STOPPED"; - break; - case TOPIC_REGISTERED: - case TOPIC_DEREGISTERED:{ - char name[256] = { 0 }; - - va_list args; - va_start(args, type); - - char *topic = va_arg(args, char *); - snprintf(name, sizeof(name) - 1, "TOPIC_%s: %s", type == TOPIC_REGISTERED ? "REGISTERED" : "DEREGISTERED", topic); - m.message = name; - - va_end(args); - } - break; - default: - break; - } - if (m.message) { - return tell_pubsub_msg(&m, NULL, c); - } - return MOD_ERR; +module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const char *topic) { + pubsub_msg_t m = { .topic = topic, .sender = NULL, .message = NULL, .type = type }; + return tell_pubsub_msg(&m, NULL, c); } int flush_pubsub_msg(void *data, void *m) { @@ -364,9 +337,10 @@ static int tell_if(void *data, void *m) { /* * Only if mod is actually running and + * or it is a SYSTEM message. or * if topic is null or this module is subscribed to topic */ - if (module_is(&mod->self, RUNNING) && (!msg->topic || + if (module_is(&mod->self, RUNNING) && (msg->type != USER || !msg->topic || hashmap_get(mod->subscriptions, msg->topic, (void **)&tmp) == MAP_OK)) { MODULE_DEBUG("Telling a message to %s.\n", mod->self.name); diff --git a/Lib/module_priv.h b/Lib/module_priv.h index ca0dda2..289bad4 100644 --- a/Lib/module_priv.h +++ b/Lib/module_priv.h @@ -67,10 +67,8 @@ typedef struct { map_t topics; } m_context; -enum sys_msg_t { LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED }; - int evaluate_module(void *data, void *m); -module_ret_code tell_system_pubsub_msg(m_context *c, enum sys_msg_t type, ...); +module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const char *topic); int flush_pubsub_msg(void *data, void *m); void destroy_pubsub_msg(pubsub_msg_t *m); diff --git a/Lib/modules.c b/Lib/modules.c index d6abd21..5616e85 100644 --- a/Lib/modules.c +++ b/Lib/modules.c @@ -59,7 +59,7 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, int max_events) { c->max_events = max_events; /* Tell every module that loop is started */ - tell_system_pubsub_msg(c, LOOP_STARTED); + tell_system_pubsub_msg(c, LOOP_STARTED, NULL); while (!c->quit) { int nfds = poll_wait(c->fd, c->max_events, c->pevents); @@ -73,7 +73,7 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, int max_events) { pubsub_msg_t *m = NULL; if (p->fd == mod->pubsub_fd[0]) { *(int *)&msg.is_pubsub = 1; - read(p->fd, &m, sizeof(struct pubsub_msg_t *)); + read(p->fd, &m, sizeof(pubsub_msg_t *)); *(pubsub_msg_t **)&msg.msg = m; } else { *(int *)&msg.is_pubsub = 0; @@ -90,7 +90,7 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, int max_events) { } /* Tell every module that loop is stopped */ - tell_system_pubsub_msg(c, LOOP_STOPPED); + tell_system_pubsub_msg(c, LOOP_STOPPED, NULL); /* Flush pubsub msg to avoid memleaks */ hashmap_iterate(c->modules, flush_pubsub_msg, NULL); diff --git a/Lib/public/module/module_cmn.h.in b/Lib/public/module/module_cmn.h.in index 8e211ea..39d6c81 100644 --- a/Lib/public/module/module_cmn.h.in +++ b/Lib/public/module/module_cmn.h.in @@ -35,7 +35,7 @@ typedef struct _self self_t; /* Modules states */ enum module_states { IDLE = 0x1, RUNNING = 0x2, PAUSED = 0x4, STOPPED = 0x8 }; -enum msg_type { USER, SYSTEM }; +enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED }; typedef struct { const char *topic; diff --git a/Samples/Cpp/doggo.cpp b/Samples/Cpp/doggo.cpp index 4c8afdc..fc3191a 100644 --- a/Samples/Cpp/doggo.cpp +++ b/Samples/Cpp/doggo.cpp @@ -22,8 +22,7 @@ static void module_pre_start(void) { * returns a valid fd to be polled. */ static void init(void) { - /* Doggo is subscribed to "leaving" topic */ - m_subscribe("leaving"); + } /* @@ -50,7 +49,7 @@ static int evaluate(void) { /* * Destroyer function, called at module unload (at end of program). - * Note that module's FD is automatically closed for you. + * Note that any module's fds are automatically closed for you. */ static void destroy(void) { @@ -61,26 +60,36 @@ static void destroy(void) { */ static void receive(const msg_t *msg, const void *userdata) { if (msg->is_pubsub) { - if (!strcmp(msg->msg->message, "ComeHere")) { - m_log("Running...\n"); - m_reply(msg->msg->sender, "BauBau"); - } else if (!strcmp(msg->msg->message, "LetsPlay")) { - m_log("BauBau BauuBauuu!\n"); - } else if (!strcmp(msg->msg->message, "LetsEat")) { - m_log("Burp!\n"); - } else if (!strcmp(msg->msg->message, "LetsSleep")) { - m_become(sleeping); - m_log("ZzzZzz...\n"); - } else if (!strcmp(msg->msg->message, "ByeBye")) { - m_log("Sob...\n"); - } else if (!strcmp(msg->msg->message, "WakeUp")) { - m_log("???\n"); + switch (msg->msg->type) { + case USER: + if (!strcmp(msg->msg->message, "ComeHere")) { + m_log("Running...\n"); + m_reply(msg->msg->sender, "BauBau"); + } else if (!strcmp(msg->msg->message, "LetsPlay")) { + m_log("BauBau BauuBauuu!\n"); + } else if (!strcmp(msg->msg->message, "LetsEat")) { + m_log("Burp!\n"); + } else if (!strcmp(msg->msg->message, "LetsSleep")) { + m_become(sleeping); + m_log("ZzzZzz...\n"); + } else if (!strcmp(msg->msg->message, "ByeBye")) { + m_log("Sob...\n"); + } else if (!strcmp(msg->msg->message, "WakeUp")) { + m_log("???\n"); + } + break; + case TOPIC_REGISTERED: + /* Doggo should subscribe to "leaving" topic */ + m_subscribe(msg->msg->topic); + break; + default: + break; } } } static void receive_sleeping(const msg_t *msg, const void *userdata) { - if (msg->is_pubsub) { + if (msg->is_pubsub && msg->msg->type == USER) { if (!strcmp(msg->msg->message, "WakeUp")) { m_unbecome(); m_log("Yawn...\n"); diff --git a/Samples/Cpp/main.cpp b/Samples/Cpp/main.cpp index c74c2de..3e43c42 100644 --- a/Samples/Cpp/main.cpp +++ b/Samples/Cpp/main.cpp @@ -13,6 +13,5 @@ void modules_pre_start() { int main() { /* Loop on our modules' events */ - modules_loop(); - return 0; + return modules_loop(); } diff --git a/Samples/Cpp/pippo.cpp b/Samples/Cpp/pippo.cpp index ea11ccc..a35dac7 100644 --- a/Samples/Cpp/pippo.cpp +++ b/Samples/Cpp/pippo.cpp @@ -4,8 +4,8 @@ #include #include #ifdef __linux__ - #include - #include +#include +#include #endif /* @@ -21,7 +21,7 @@ static void receive_ready(const msg_t *msg, const void *userdata); * Use this to set some global state needed eg: in check() function */ static void module_pre_start(void) { - + } /* @@ -29,7 +29,7 @@ static void module_pre_start(void) { * returns a valid fd to be polled. */ static void init(void) { -#ifdef __linux__ + #ifdef __linux__ /* Add signal fd */ sigset_t mask; @@ -39,11 +39,11 @@ static void init(void) { sigprocmask(SIG_BLOCK, &mask, NULL); int fd = signalfd(-1, &mask, 0); - m_add_fd(fd); -#endif + m_register_fd(fd); + #endif /* Add stdin fd */ - m_add_fd(STDIN_FILENO); + m_register_fd(STDIN_FILENO); } /* @@ -108,8 +108,10 @@ static void receive(const msg_t *msg, const void *userdata) { break; } } else { - if (!strcmp(msg->msg->message, "BauBau")) { + if (msg->msg->type == USER && !strcmp(msg->msg->message, "BauBau")) { m_become(ready); + /* Finally register Leaving topic */ + m_register_topic("leaving"); m_log("Press 'p' to play with Doggo! Or 'f' to feed your Doggo. 's' to have a nap. 'w' to wake him up. 'q' to leave him for now.\n"); } } diff --git a/Samples/Easy/doggo.c b/Samples/Easy/doggo.c index 78e12ef..15cb453 100644 --- a/Samples/Easy/doggo.c +++ b/Samples/Easy/doggo.c @@ -60,7 +60,8 @@ static void destroy(void) { */ static void receive(const msg_t *msg, const void *userdata) { if (msg->is_pubsub) { - if (msg->msg->type == USER) { + switch (msg->msg->type) { + case USER: if (!strcmp(msg->msg->message, "ComeHere")) { m_log("Running...\n"); m_reply(msg->msg->sender, "BauBau"); @@ -76,18 +77,19 @@ static void receive(const msg_t *msg, const void *userdata) { } else if (!strcmp(msg->msg->message, "WakeUp")) { m_log("???\n"); } - } else { - if (strstr(msg->msg->message, "TOPIC_REGISTERED: ")) { - const char *topic = msg->msg->message + strlen("TOPIC_REGISTERED: "); - /* Doggo should subscribe to "leaving" topic */ - m_subscribe(topic); - } + break; + case TOPIC_REGISTERED: + /* Doggo should subscribe to "leaving" topic */ + m_subscribe(msg->msg->topic); + break; + default: + break; } } } static void receive_sleeping(const msg_t *msg, const void *userdata) { - if (msg->is_pubsub) { + if (msg->is_pubsub && msg->msg->type == USER) { if (!strcmp(msg->msg->message, "WakeUp")) { m_unbecome(); m_log("Yawn...\n"); diff --git a/Samples/Easy/main.c b/Samples/Easy/main.c index c74c2de..3e43c42 100644 --- a/Samples/Easy/main.c +++ b/Samples/Easy/main.c @@ -13,6 +13,5 @@ void modules_pre_start() { int main() { /* Loop on our modules' events */ - modules_loop(); - return 0; + return modules_loop(); } diff --git a/Samples/Easy/pippo.c b/Samples/Easy/pippo.c index 9040a61..c6e3a8d 100644 --- a/Samples/Easy/pippo.c +++ b/Samples/Easy/pippo.c @@ -108,7 +108,7 @@ static void receive(const msg_t *msg, const void *userdata) { break; } } else { - if (!strcmp(msg->msg->message, "BauBau")) { + if (msg->msg->type == USER && !strcmp(msg->msg->message, "BauBau")) { m_become(ready); /* Finally register Leaving topic */ m_register_topic("leaving"); diff --git a/Samples/MultiCtx/a.c b/Samples/MultiCtx/a.c index 7f90e8b..428f81c 100644 --- a/Samples/MultiCtx/a.c +++ b/Samples/MultiCtx/a.c @@ -36,7 +36,7 @@ static void init(void) { timerValue.it_value.tv_sec = 1; timerValue.it_interval.tv_sec = 1; timerfd_settime(fd, 0, &timerValue, NULL); - m_add_fd(fd); + m_register_fd(fd); #endif } diff --git a/Samples/MultiCtx/b.c b/Samples/MultiCtx/b.c index 10e8e68..8ef07f1 100644 --- a/Samples/MultiCtx/b.c +++ b/Samples/MultiCtx/b.c @@ -35,7 +35,7 @@ static void init(void) { sigprocmask(SIG_BLOCK, &mask, NULL); int fd = signalfd(-1, &mask, 0); - m_add_fd(fd); + m_register_fd(fd); #endif } diff --git a/Samples/SharedSrc/mod.c b/Samples/SharedSrc/mod.c index e6651e5..39f0289 100644 --- a/Samples/SharedSrc/mod.c +++ b/Samples/SharedSrc/mod.c @@ -47,7 +47,7 @@ void destroy_modules(void) { * returns a valid fd to be polled. */ static void A_init(void) { - module_add_fd(selfA, STDIN_FILENO); + module_register_fd(selfA, STDIN_FILENO); } /* @@ -55,8 +55,7 @@ static void A_init(void) { * returns a valid fd to be polled. */ static void B_init(void) { - /* Doggo is subscribed to "leaving" topic */ - module_subscribe(selfB, "leaving"); + } /* @@ -91,6 +90,11 @@ static void A_recv(const msg_t *msg, const void *userdata) { module_log(selfA, "Doggo, come here!\n"); module_tell(selfA, "Doggo", "ComeHere"); break; + case 'q': + module_log(selfA, "I have to go now!\n"); + module_publish(selfA, "leaving", "ByeBye"); + modules_ctx_quit("test"); + break; default: /* Avoid newline */ if (c != 10) { @@ -99,7 +103,7 @@ static void A_recv(const msg_t *msg, const void *userdata) { break; } } else { - if (!strcmp(msg->msg->message, "BauBau")) { + if (msg->msg->type == USER && !strcmp(msg->msg->message, "BauBau")) { module_become(selfA, A_recv_ready); module_log(selfA, "Press 'p' to play with Doggo! Or 'f' to feed your Doggo. 's' to have a nap. 'w' to wake him up. 'q' to leave him for now.\n"); } @@ -148,26 +152,36 @@ static void A_recv_ready(const msg_t *msg, const void *userdata) { */ static void B_recv(const msg_t *msg, const void *userdata) { if (msg->is_pubsub) { - if (!strcmp(msg->msg->message, "ComeHere")) { - module_log(selfB, "Running...\n"); - module_reply(selfB, msg->msg->sender, "BauBau"); - } else if (!strcmp(msg->msg->message, "LetsPlay")) { - module_log(selfB, "BauBau BauuBauuu!\n"); - } else if (!strcmp(msg->msg->message, "LetsEat")) { - module_log(selfB, "Burp!\n"); - } else if (!strcmp(msg->msg->message, "LetsSleep")) { - module_become(selfB, B_recv_sleeping); - module_log(selfB, "ZzzZzz...\n"); - } else if (!strcmp(msg->msg->message, "ByeBye")) { - module_log(selfB, "Sob...\n"); - } else if (!strcmp(msg->msg->message, "WakeUp")) { - module_log(selfB, "???\n"); + switch (msg->msg->type) { + case USER: + if (!strcmp(msg->msg->message, "ComeHere")) { + module_log(selfB, "Running...\n"); + module_reply(selfB, msg->msg->sender, "BauBau"); + } else if (!strcmp(msg->msg->message, "LetsPlay")) { + module_log(selfB, "BauBau BauuBauuu!\n"); + } else if (!strcmp(msg->msg->message, "LetsEat")) { + module_log(selfB, "Burp!\n"); + } else if (!strcmp(msg->msg->message, "LetsSleep")) { + module_become(selfB, B_recv_sleeping); + module_log(selfB, "ZzzZzz...\n"); + } else if (!strcmp(msg->msg->message, "ByeBye")) { + module_log(selfB, "Sob...\n"); + } else if (!strcmp(msg->msg->message, "WakeUp")) { + module_log(selfB, "???\n"); + } + break; + case TOPIC_REGISTERED: + /* Doggo should subscribe to "leaving" topic */ + module_subscribe(selfB, msg->msg->topic); + break; + default: + break; } } } static void B_recv_sleeping(const msg_t *msg, const void *userdata) { - if (msg->is_pubsub) { + if (msg->is_pubsub && msg->msg->type == USER) { if (!strcmp(msg->msg->message, "WakeUp")) { module_become(selfB, B_recv); module_log(selfB, "Yawn...\n"); diff --git a/TODO.md b/TODO.md index 678667b..b45159d 100644 --- a/TODO.md +++ b/TODO.md @@ -41,7 +41,7 @@ - [x] Fixed bug when passing a locally scoped variable as key in hashmap_put() - [x] Rename module_{add, rm}_fd to module_{register, deregister}_fd and add a compatibility macro -- [ ] Update examples -> Cpp, MultiCtx, SharedSrc +- [x] Update examples -> Cpp, MultiCtx, SharedSrc - [x] Update doc: now modules_loop is always needed, even in case on pubsub only messaging diff --git a/docs/src/data_structures.rst b/docs/src/data_structures.rst index 8c46d80..c7e402f 100644 --- a/docs/src/data_structures.rst +++ b/docs/src/data_structures.rst @@ -24,8 +24,8 @@ Types /* Modules states */ enum module_states { IDLE = 0x1, RUNNING = 0x2, PAUSED = 0x4, STOPPED = 0x8 }; - enum msg_type { USER, SYSTEM }; - + enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED }; + typedef struct { const char *topic; const char *message; diff --git a/docs/src/pubsub.rst b/docs/src/pubsub.rst index 5c8e585..5598fa7 100644 --- a/docs/src/pubsub.rst +++ b/docs/src/pubsub.rst @@ -19,6 +19,13 @@ Since libmodule 2.1, pubsub implementation is async and makes use of unix pipes. When sending a message to other modules, a pubsub message is allocated and its address is written in recipient module's writable end of pipe. |br| The message will then get caught by modules_loop, the address read from readable end of pipe and callback called with the message. +PubSub system messages +---------------------- + +Beside USER messages (pubsub_msg_t.type), there are 4 system messages, with type respectively: LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED. |br| +These pubsub messages are automatically sent by libmodule (note that sender will be NULL) when matching functions are called. |br| +For example, you can use TOPIC_REGISTERED message (note that pubsub_msg_t.topic will be valued matching newly created topic) to subscribe to a topic as soon as it appears in current context. + PubSub notes ------------