Skip to content

Commit

Permalink
Dropped SYSTEM pubsub msg type in favor of TOPIC_REGISTERED, TOPIC_DE…
Browse files Browse the repository at this point in the history
…REGISTERED, LOOP_STARTED, LOOP_STOPPED; way easier.

Updated all examples.
Updated docs.
Updated TODO.
  • Loading branch information
FedeDP committed Aug 4, 2018
1 parent 3ea7774 commit 1c9ce72
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 101 deletions.
36 changes: 5 additions & 31 deletions Lib/module.c
Expand Up @@ -306,36 +306,9 @@ module_ret_code module_unsubscribe(const self_t *self, const char *topic) {
return MOD_ERR;
}

module_ret_code tell_system_pubsub_msg(m_context *c, enum sys_msg_t type, ...) {
pubsub_msg_t m = { .topic = NULL, .sender = NULL, .message = NULL, .type = SYSTEM };
switch (type) {
case LOOP_STARTED:
m.message = "LOOP_STARTED";
break;
case LOOP_STOPPED:
m.message = "LOOP_STOPPED";
break;
case TOPIC_REGISTERED:
case TOPIC_DEREGISTERED:{
char name[256] = { 0 };

va_list args;
va_start(args, type);

char *topic = va_arg(args, char *);
snprintf(name, sizeof(name) - 1, "TOPIC_%s: %s", type == TOPIC_REGISTERED ? "REGISTERED" : "DEREGISTERED", topic);
m.message = name;

va_end(args);
}
break;
default:
break;
}
if (m.message) {
return tell_pubsub_msg(&m, NULL, c);
}
return MOD_ERR;
module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const char *topic) {
pubsub_msg_t m = { .topic = topic, .sender = NULL, .message = NULL, .type = type };
return tell_pubsub_msg(&m, NULL, c);
}

int flush_pubsub_msg(void *data, void *m) {
Expand Down Expand Up @@ -364,9 +337,10 @@ static int tell_if(void *data, void *m) {

/*
* Only if mod is actually running and
* or it is a SYSTEM message. or
* if topic is null or this module is subscribed to topic
*/
if (module_is(&mod->self, RUNNING) && (!msg->topic ||
if (module_is(&mod->self, RUNNING) && (msg->type != USER || !msg->topic ||
hashmap_get(mod->subscriptions, msg->topic, (void **)&tmp) == MAP_OK)) {

MODULE_DEBUG("Telling a message to %s.\n", mod->self.name);
Expand Down
4 changes: 1 addition & 3 deletions Lib/module_priv.h
Expand Up @@ -67,10 +67,8 @@ typedef struct {
map_t topics;
} m_context;

enum sys_msg_t { LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED };

int evaluate_module(void *data, void *m);
module_ret_code tell_system_pubsub_msg(m_context *c, enum sys_msg_t type, ...);
module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const char *topic);
int flush_pubsub_msg(void *data, void *m);
void destroy_pubsub_msg(pubsub_msg_t *m);

Expand Down
6 changes: 3 additions & 3 deletions Lib/modules.c
Expand Up @@ -59,7 +59,7 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, int max_events) {
c->max_events = max_events;

/* Tell every module that loop is started */
tell_system_pubsub_msg(c, LOOP_STARTED);
tell_system_pubsub_msg(c, LOOP_STARTED, NULL);

while (!c->quit) {
int nfds = poll_wait(c->fd, c->max_events, c->pevents);
Expand All @@ -73,7 +73,7 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, int max_events) {
pubsub_msg_t *m = NULL;
if (p->fd == mod->pubsub_fd[0]) {
*(int *)&msg.is_pubsub = 1;
read(p->fd, &m, sizeof(struct pubsub_msg_t *));
read(p->fd, &m, sizeof(pubsub_msg_t *));
*(pubsub_msg_t **)&msg.msg = m;
} else {
*(int *)&msg.is_pubsub = 0;
Expand All @@ -90,7 +90,7 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, int max_events) {
}

/* Tell every module that loop is stopped */
tell_system_pubsub_msg(c, LOOP_STOPPED);
tell_system_pubsub_msg(c, LOOP_STOPPED, NULL);

/* Flush pubsub msg to avoid memleaks */
hashmap_iterate(c->modules, flush_pubsub_msg, NULL);
Expand Down
2 changes: 1 addition & 1 deletion Lib/public/module/module_cmn.h.in
Expand Up @@ -35,7 +35,7 @@ typedef struct _self self_t;
/* Modules states */
enum module_states { IDLE = 0x1, RUNNING = 0x2, PAUSED = 0x4, STOPPED = 0x8 };

enum msg_type { USER, SYSTEM };
enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED };

typedef struct {
const char *topic;
Expand Down
45 changes: 27 additions & 18 deletions Samples/Cpp/doggo.cpp
Expand Up @@ -22,8 +22,7 @@ static void module_pre_start(void) {
* returns a valid fd to be polled.
*/
static void init(void) {
/* Doggo is subscribed to "leaving" topic */
m_subscribe("leaving");

}

/*
Expand All @@ -50,7 +49,7 @@ static int evaluate(void) {

/*
* Destroyer function, called at module unload (at end of program).
* Note that module's FD is automatically closed for you.
* Note that any module's fds are automatically closed for you.
*/
static void destroy(void) {

Expand All @@ -61,26 +60,36 @@ static void destroy(void) {
*/
static void receive(const msg_t *msg, const void *userdata) {
if (msg->is_pubsub) {
if (!strcmp(msg->msg->message, "ComeHere")) {
m_log("Running...\n");
m_reply(msg->msg->sender, "BauBau");
} else if (!strcmp(msg->msg->message, "LetsPlay")) {
m_log("BauBau BauuBauuu!\n");
} else if (!strcmp(msg->msg->message, "LetsEat")) {
m_log("Burp!\n");
} else if (!strcmp(msg->msg->message, "LetsSleep")) {
m_become(sleeping);
m_log("ZzzZzz...\n");
} else if (!strcmp(msg->msg->message, "ByeBye")) {
m_log("Sob...\n");
} else if (!strcmp(msg->msg->message, "WakeUp")) {
m_log("???\n");
switch (msg->msg->type) {
case USER:
if (!strcmp(msg->msg->message, "ComeHere")) {
m_log("Running...\n");
m_reply(msg->msg->sender, "BauBau");
} else if (!strcmp(msg->msg->message, "LetsPlay")) {
m_log("BauBau BauuBauuu!\n");
} else if (!strcmp(msg->msg->message, "LetsEat")) {
m_log("Burp!\n");
} else if (!strcmp(msg->msg->message, "LetsSleep")) {
m_become(sleeping);
m_log("ZzzZzz...\n");
} else if (!strcmp(msg->msg->message, "ByeBye")) {
m_log("Sob...\n");
} else if (!strcmp(msg->msg->message, "WakeUp")) {
m_log("???\n");
}
break;
case TOPIC_REGISTERED:
/* Doggo should subscribe to "leaving" topic */
m_subscribe(msg->msg->topic);
break;
default:
break;
}
}
}

static void receive_sleeping(const msg_t *msg, const void *userdata) {
if (msg->is_pubsub) {
if (msg->is_pubsub && msg->msg->type == USER) {
if (!strcmp(msg->msg->message, "WakeUp")) {
m_unbecome();
m_log("Yawn...\n");
Expand Down
3 changes: 1 addition & 2 deletions Samples/Cpp/main.cpp
Expand Up @@ -13,6 +13,5 @@ void modules_pre_start() {

int main() {
/* Loop on our modules' events */
modules_loop();
return 0;
return modules_loop();
}
18 changes: 10 additions & 8 deletions Samples/Cpp/pippo.cpp
Expand Up @@ -4,8 +4,8 @@
#include <string.h>
#include <ctype.h>
#ifdef __linux__
#include <sys/signalfd.h>
#include <signal.h>
#include <sys/signalfd.h>
#include <signal.h>
#endif

/*
Expand All @@ -21,15 +21,15 @@ static void receive_ready(const msg_t *msg, const void *userdata);
* Use this to set some global state needed eg: in check() function
*/
static void module_pre_start(void) {

}

/*
* Initializes this module's state;
* returns a valid fd to be polled.
*/
static void init(void) {
#ifdef __linux__
#ifdef __linux__
/* Add signal fd */
sigset_t mask;

Expand All @@ -39,11 +39,11 @@ static void init(void) {
sigprocmask(SIG_BLOCK, &mask, NULL);

int fd = signalfd(-1, &mask, 0);
m_add_fd(fd);
#endif
m_register_fd(fd);
#endif

/* Add stdin fd */
m_add_fd(STDIN_FILENO);
m_register_fd(STDIN_FILENO);
}

/*
Expand Down Expand Up @@ -108,8 +108,10 @@ static void receive(const msg_t *msg, const void *userdata) {
break;
}
} else {
if (!strcmp(msg->msg->message, "BauBau")) {
if (msg->msg->type == USER && !strcmp(msg->msg->message, "BauBau")) {
m_become(ready);
/* Finally register Leaving topic */
m_register_topic("leaving");
m_log("Press 'p' to play with Doggo! Or 'f' to feed your Doggo. 's' to have a nap. 'w' to wake him up. 'q' to leave him for now.\n");
}
}
Expand Down
18 changes: 10 additions & 8 deletions Samples/Easy/doggo.c
Expand Up @@ -60,7 +60,8 @@ static void destroy(void) {
*/
static void receive(const msg_t *msg, const void *userdata) {
if (msg->is_pubsub) {
if (msg->msg->type == USER) {
switch (msg->msg->type) {
case USER:
if (!strcmp(msg->msg->message, "ComeHere")) {
m_log("Running...\n");
m_reply(msg->msg->sender, "BauBau");
Expand All @@ -76,18 +77,19 @@ static void receive(const msg_t *msg, const void *userdata) {
} else if (!strcmp(msg->msg->message, "WakeUp")) {
m_log("???\n");
}
} else {
if (strstr(msg->msg->message, "TOPIC_REGISTERED: ")) {
const char *topic = msg->msg->message + strlen("TOPIC_REGISTERED: ");
/* Doggo should subscribe to "leaving" topic */
m_subscribe(topic);
}
break;
case TOPIC_REGISTERED:
/* Doggo should subscribe to "leaving" topic */
m_subscribe(msg->msg->topic);
break;
default:
break;
}
}
}

static void receive_sleeping(const msg_t *msg, const void *userdata) {
if (msg->is_pubsub) {
if (msg->is_pubsub && msg->msg->type == USER) {
if (!strcmp(msg->msg->message, "WakeUp")) {
m_unbecome();
m_log("Yawn...\n");
Expand Down
3 changes: 1 addition & 2 deletions Samples/Easy/main.c
Expand Up @@ -13,6 +13,5 @@ void modules_pre_start() {

int main() {
/* Loop on our modules' events */
modules_loop();
return 0;
return modules_loop();
}
2 changes: 1 addition & 1 deletion Samples/Easy/pippo.c
Expand Up @@ -108,7 +108,7 @@ static void receive(const msg_t *msg, const void *userdata) {
break;
}
} else {
if (!strcmp(msg->msg->message, "BauBau")) {
if (msg->msg->type == USER && !strcmp(msg->msg->message, "BauBau")) {
m_become(ready);
/* Finally register Leaving topic */
m_register_topic("leaving");
Expand Down
2 changes: 1 addition & 1 deletion Samples/MultiCtx/a.c
Expand Up @@ -36,7 +36,7 @@ static void init(void) {
timerValue.it_value.tv_sec = 1;
timerValue.it_interval.tv_sec = 1;
timerfd_settime(fd, 0, &timerValue, NULL);
m_add_fd(fd);
m_register_fd(fd);
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion Samples/MultiCtx/b.c
Expand Up @@ -35,7 +35,7 @@ static void init(void) {
sigprocmask(SIG_BLOCK, &mask, NULL);

int fd = signalfd(-1, &mask, 0);
m_add_fd(fd);
m_register_fd(fd);
#endif
}

Expand Down
52 changes: 33 additions & 19 deletions Samples/SharedSrc/mod.c
Expand Up @@ -47,16 +47,15 @@ void destroy_modules(void) {
* returns a valid fd to be polled.
*/
static void A_init(void) {
module_add_fd(selfA, STDIN_FILENO);
module_register_fd(selfA, STDIN_FILENO);
}

/*
* Initializes B module's state;
* returns a valid fd to be polled.
*/
static void B_init(void) {
/* Doggo is subscribed to "leaving" topic */
module_subscribe(selfB, "leaving");

}

/*
Expand Down Expand Up @@ -91,6 +90,11 @@ static void A_recv(const msg_t *msg, const void *userdata) {
module_log(selfA, "Doggo, come here!\n");
module_tell(selfA, "Doggo", "ComeHere");
break;
case 'q':
module_log(selfA, "I have to go now!\n");
module_publish(selfA, "leaving", "ByeBye");
modules_ctx_quit("test");
break;
default:
/* Avoid newline */
if (c != 10) {
Expand All @@ -99,7 +103,7 @@ static void A_recv(const msg_t *msg, const void *userdata) {
break;
}
} else {
if (!strcmp(msg->msg->message, "BauBau")) {
if (msg->msg->type == USER && !strcmp(msg->msg->message, "BauBau")) {
module_become(selfA, A_recv_ready);
module_log(selfA, "Press 'p' to play with Doggo! Or 'f' to feed your Doggo. 's' to have a nap. 'w' to wake him up. 'q' to leave him for now.\n");
}
Expand Down Expand Up @@ -148,26 +152,36 @@ static void A_recv_ready(const msg_t *msg, const void *userdata) {
*/
static void B_recv(const msg_t *msg, const void *userdata) {
if (msg->is_pubsub) {
if (!strcmp(msg->msg->message, "ComeHere")) {
module_log(selfB, "Running...\n");
module_reply(selfB, msg->msg->sender, "BauBau");
} else if (!strcmp(msg->msg->message, "LetsPlay")) {
module_log(selfB, "BauBau BauuBauuu!\n");
} else if (!strcmp(msg->msg->message, "LetsEat")) {
module_log(selfB, "Burp!\n");
} else if (!strcmp(msg->msg->message, "LetsSleep")) {
module_become(selfB, B_recv_sleeping);
module_log(selfB, "ZzzZzz...\n");
} else if (!strcmp(msg->msg->message, "ByeBye")) {
module_log(selfB, "Sob...\n");
} else if (!strcmp(msg->msg->message, "WakeUp")) {
module_log(selfB, "???\n");
switch (msg->msg->type) {
case USER:
if (!strcmp(msg->msg->message, "ComeHere")) {
module_log(selfB, "Running...\n");
module_reply(selfB, msg->msg->sender, "BauBau");
} else if (!strcmp(msg->msg->message, "LetsPlay")) {
module_log(selfB, "BauBau BauuBauuu!\n");
} else if (!strcmp(msg->msg->message, "LetsEat")) {
module_log(selfB, "Burp!\n");
} else if (!strcmp(msg->msg->message, "LetsSleep")) {
module_become(selfB, B_recv_sleeping);
module_log(selfB, "ZzzZzz...\n");
} else if (!strcmp(msg->msg->message, "ByeBye")) {
module_log(selfB, "Sob...\n");
} else if (!strcmp(msg->msg->message, "WakeUp")) {
module_log(selfB, "???\n");
}
break;
case TOPIC_REGISTERED:
/* Doggo should subscribe to "leaving" topic */
module_subscribe(selfB, msg->msg->topic);
break;
default:
break;
}
}
}

static void B_recv_sleeping(const msg_t *msg, const void *userdata) {
if (msg->is_pubsub) {
if (msg->is_pubsub && msg->msg->type == USER) {
if (!strcmp(msg->msg->message, "WakeUp")) {
module_become(selfB, B_recv);
module_log(selfB, "Yawn...\n");
Expand Down

0 comments on commit 1c9ce72

Please sign in to comment.