Skip to content

Commit

Permalink
* Renamed pubsub_priv_t to ps_priv_t.
Browse files Browse the repository at this point in the history
  • Loading branch information
FedeDP committed Jul 13, 2019
1 parent d7b8bf8 commit a25e536
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 20 deletions.
4 changes: 2 additions & 2 deletions Lib/modules.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ static int recv_events(m_context *c, int timeout) {

msg_t msg;
fd_msg_t fd_msg;
pubsub_priv_t *ps_msg;
ps_priv_t *ps_msg;

if (p->fd == mod->pubsub_fd[0]) {
/* Received on pubsub interface */
msg.is_pubsub = true;
if (read(p->fd, (void **)&ps_msg, sizeof(pubsub_priv_t *)) != sizeof(pubsub_priv_t *)) {
if (read(p->fd, (void **)&ps_msg, sizeof(ps_priv_t *)) != sizeof(ps_priv_t *)) {
MODULE_DEBUG("Failed to read message for %s: %s\n", mod->name, strerror(errno));
msg.ps_msg = NULL;
} else {
Expand Down
2 changes: 1 addition & 1 deletion Lib/priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ typedef struct {
ps_msg_t msg;
uint64_t refs;
bool autofree;
} pubsub_priv_t;
} ps_priv_t;

/* Struct that holds data for each module */
struct _module {
Expand Down
30 changes: 15 additions & 15 deletions Lib/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
/** Actor-like PubSub interface **/

static map_ret_code tell_if(void *data, const char *key, void *value);
static pubsub_priv_t *create_pubsub_msg(const void *message, const self_t *sender, const char *topic,
static ps_priv_t *create_pubsub_msg(const void *message, const self_t *sender, const char *topic,
enum msg_type type, const size_t size, const bool autofree);
static void destroy_pubsub_msg(pubsub_priv_t *pubsub_msg);
static module_ret_code tell_pubsub_msg(pubsub_priv_t *m, module *mod, m_context *c);
static void destroy_pubsub_msg(ps_priv_t *pubsub_msg);
static module_ret_code tell_pubsub_msg(ps_priv_t *m, module *mod, m_context *c);
static module_ret_code publish_msg(const module *mod, const char *topic, const void *message,
const ssize_t size, const bool autofree);

static map_ret_code tell_if(void *data, const char *key, void *value) {
module *mod = (module *)value;
pubsub_priv_t *msg = (pubsub_priv_t *)data;
ps_priv_t *msg = (ps_priv_t *)data;

if (_module_is(mod, RUNNING | PAUSED) && // mod is running or paused
((msg->msg.type != USER && msg->msg.sender != &mod->self) || // system messages with sender != this module (avoid sending ourselves system messages produced by us)
Expand All @@ -22,7 +22,7 @@ static map_ret_code tell_if(void *data, const char *key, void *value) {

MODULE_DEBUG("Telling a message to %s.\n", mod->name);

if (write(mod->pubsub_fd[1], &msg, sizeof(pubsub_priv_t *)) != sizeof(pubsub_priv_t *)) {
if (write(mod->pubsub_fd[1], &msg, sizeof(ps_priv_t *)) != sizeof(ps_priv_t *)) {
MODULE_DEBUG("Failed to write message for %s: %s\n", mod->name, strerror(errno));
} else {
msg->refs++;
Expand All @@ -31,9 +31,9 @@ static map_ret_code tell_if(void *data, const char *key, void *value) {
return MAP_OK;
}

static pubsub_priv_t *create_pubsub_msg(const void *message, const self_t *sender, const char *topic,
static ps_priv_t *create_pubsub_msg(const void *message, const self_t *sender, const char *topic,
enum msg_type type, const size_t size, const bool autofree) {
pubsub_priv_t *m = memhook._malloc(sizeof(pubsub_priv_t));
ps_priv_t *m = memhook._malloc(sizeof(ps_priv_t));
if (m) {
m->msg.message = message;
m->msg.sender = sender;
Expand All @@ -46,7 +46,7 @@ static pubsub_priv_t *create_pubsub_msg(const void *message, const self_t *sende
return m;
}

static void destroy_pubsub_msg(pubsub_priv_t *pubsub_msg) {
static void destroy_pubsub_msg(ps_priv_t *pubsub_msg) {
/* Properly free pubsub msg if its ref count reaches 0 and autofree bit is true */
if (pubsub_msg->refs == 0 || --pubsub_msg->refs == 0) {
if (pubsub_msg->autofree) {
Expand All @@ -56,7 +56,7 @@ static void destroy_pubsub_msg(pubsub_priv_t *pubsub_msg) {
}
}

static module_ret_code tell_pubsub_msg(pubsub_priv_t *m, module *mod, m_context *c) {
static module_ret_code tell_pubsub_msg(ps_priv_t *m, module *mod, m_context *c) {
if (mod) {
tell_if(m, NULL, mod);
} else {
Expand Down Expand Up @@ -84,7 +84,7 @@ static module_ret_code publish_msg(const module *mod, const char *topic, const v
* Moreover, a publish can only be made on existent topic.
*/
if (!topic || ((tmp = map_get(c->topics, topic)) && tmp == mod)) {
pubsub_priv_t *m = create_pubsub_msg(message, &mod->self, topic, USER, size, autofree);
ps_priv_t *m = create_pubsub_msg(message, &mod->self, topic, USER, size, autofree);
return tell_pubsub_msg(m, NULL, c);
}
return MOD_ERR;
Expand All @@ -93,15 +93,15 @@ static module_ret_code publish_msg(const module *mod, const char *topic, const v
/** Private API **/

module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const self_t *sender, const char *topic) {
pubsub_priv_t *m = create_pubsub_msg(NULL, sender, topic, type, 0, false);
ps_priv_t *m = create_pubsub_msg(NULL, sender, topic, type, 0, false);
return tell_pubsub_msg(m, NULL, c);
}

map_ret_code flush_pubsub_msg(void *data, const char *key, void *value) {
module *mod = (module *)value;
pubsub_priv_t *mm = NULL;
ps_priv_t *mm = NULL;

while (read(mod->pubsub_fd[0], &mm, sizeof(pubsub_priv_t *)) == sizeof(pubsub_priv_t *)) {
while (read(mod->pubsub_fd[0], &mm, sizeof(ps_priv_t *)) == sizeof(ps_priv_t *)) {
/*
* Actually tell msg ONLY if we are not deregistering module,
* ie: we are stopping looping on the context.
Expand All @@ -128,7 +128,7 @@ void run_pubsub_cb(module *mod, msg_t *msg) {
cb(msg, mod->userdata);

if (msg->is_pubsub) {
destroy_pubsub_msg((pubsub_priv_t *)msg->ps_msg);
destroy_pubsub_msg((ps_priv_t *)msg->ps_msg);
}
}

Expand Down Expand Up @@ -212,7 +212,7 @@ module_ret_code module_tell(const self_t *self, const self_t *recipient, const v
/* only same ctx modules can talk */
MOD_PARAM_ASSERT(self->ctx == recipient->ctx);

pubsub_priv_t *m = create_pubsub_msg(message, &mod->self, NULL, USER, size, autofree);
ps_priv_t *m = create_pubsub_msg(message, &mod->self, NULL, USER, size, autofree);
return tell_pubsub_msg(m, recipient->mod, recipient->ctx);
}

Expand Down
1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- [ ] Add a new parameter "bool global" to module_broadcast?
- [x] Rename pubsub_msg to ps_msg inside msg_t
- [x] Rename pubsub_msg_t to ps_msg_t
- [x] Rename pubsub_priv_t to ps_priv_t

### Generic
- [x] Add some diagnostic API, eg: module_dump() (to dump each module's state)
Expand Down
2 changes: 1 addition & 1 deletion docs/src/callbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Moreover, a module_pre_start function is declared too, but it is not needed by l
.. c:function:: receive(msg, userdata)
Poll callback, called when any event is ready on module's fd or when a PubSub message is received by a module. |br|
Use msg->is_pubsub to decide which internal message should be read (ie: pubsub_msg_t or fd_msg_t).
Use msg->is_pubsub to decide which internal message should be read (ie: ps_msg_t or fd_msg_t).

:param: :c:type:`const msg_t * const` msg: pointer to msg_t struct.
:param: :c:type:`const void *` userdata: pointer to userdata as set by m_set_userdata.
Expand Down
2 changes: 1 addition & 1 deletion docs/src/pubsub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Only module_is(), module_get_name/context() functions can be called passing as s
System messages
---------------

Beside USER messages (pubsub_msg_t.type), there are 6 system messages, respectively: LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED, MODULE_STARTED, MODULE_STOPPED. |br|
Beside USER messages (ps_msg_t.type), there are 6 system messages, respectively: LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED, MODULE_STARTED, MODULE_STOPPED. |br|
These pubsub messages are automatically sent by libmodule when matching functions are called, eg: |br|
* LOOP_STARTED(STOPPED) is sent whenever a loop starts(stops) looping. It is useful to actually start(stop) your pubsub messagging (eg: one module waits on LOOP_STARTED to send a pubsub message to another module, and so on...). It won't have any valued fields, except for type. |br|
* TOPIC_(DE)REGISTERED message is sent when any module registers a topic; it is useful for other modules to (un)subscribe to it. |br|
Expand Down

0 comments on commit a25e536

Please sign in to comment.