Skip to content

Commit

Permalink
Properly honor current module's callback when flushing pubsub messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
FedeDP committed Jan 7, 2019
1 parent 3b64d0f commit 4d74942
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Lib/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ module_ret_code module_register(const char *name, const char *ctx_name, const se
break;
}

mod->hook = *hook;
memcpy(&mod->hook, hook, sizeof(userhook));
mod->state = IDLE;
mod->fds = NULL;

Expand Down
30 changes: 9 additions & 21 deletions Lib/modules.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ module_ret_code modules_set_memalloc_hook(const memalloc_hook *hook) {
MOD_ASSERT(hook->_realloc, "NULL realloc fn.", MOD_ERR);
MOD_ASSERT(hook->_calloc, "NULL calloc fn.", MOD_ERR);
MOD_ASSERT(hook->_free, "NULL free fn.", MOD_ERR);

memhook._malloc = hook->_malloc;
memhook._realloc = hook->_realloc;
memhook._calloc = hook->_calloc;
memhook._free = hook->_free;
memcpy(&memhook, hook, sizeof(memalloc_hook));
} else {
memhook._malloc = malloc;
memhook._realloc = realloc;
Expand Down Expand Up @@ -94,13 +90,7 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, const int max_even
}

if (!msg.is_pubsub || msg.pubsub_msg) {
/* If module is using some different receive function, honor it. */
recv_cb cb = stack_peek(mod->recvs);
if (!cb) {
/* Fallback to module default receive */
cb = mod->hook.recv;
}
cb(&msg, mod->userdata);
run_pubsub_cb(mod, &msg);

/* Properly free pubsub msg */
if (msg.is_pubsub) {
Expand All @@ -127,9 +117,10 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, const int max_even

/* Tell every module that loop is stopped */
tell_system_pubsub_msg(c, LOOP_STOPPED, NULL);

/* Flush pubsub msg to avoid memleaks */
map_iterate(c->modules, flush_pubsub_msg, NULL);

poll_destroy_pevents(&c->pevents, &c->max_events);
c->looping = false;
return c->quit_code;
Expand All @@ -139,12 +130,9 @@ module_ret_code modules_ctx_loop_events(const char *ctx_name, const int max_even

module_ret_code modules_ctx_quit(const char *ctx_name, const uint8_t quit_code) {
FIND_CTX(ctx_name);

if (c->looping) {
c->quit = true;
c->quit_code = quit_code;
return MOD_OK;
}
MODULE_DEBUG("Context not looping.\n");
return MOD_ERR;
MOD_ASSERT(c->looping, "Context not looping.", MOD_ERR);

c->quit = true;
c->quit_code = quit_code;
return MOD_OK;
}
3 changes: 3 additions & 0 deletions Lib/priv.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <stdlib.h>
#include "map.h"
#include "stack.h"
Expand Down Expand Up @@ -113,6 +115,7 @@ int evaluate_module(void *data, void *m);
module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const char *topic);
int flush_pubsub_msg(void *data, void *m);
void destroy_pubsub_msg(pubsub_msg_t *m);
void run_pubsub_cb(module *mod, const msg_t *msg);

/* Defined in priv.c */
char *mem_strdup(const char *s);
Expand Down
15 changes: 13 additions & 2 deletions Lib/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ int flush_pubsub_msg(void *data, void *m) {
*/
if (!data) {
MODULE_DEBUG("Flushing pubsub message for module '%s'.\n", mod->name);
const msg_t msg = { .is_pubsub = 1, .pubsub_msg = mm };
mod->hook.recv(&msg, mod->userdata);
const msg_t msg = { .is_pubsub = true, .pubsub_msg = mm };
run_pubsub_cb(mod, &msg);
}
destroy_pubsub_msg(mm);
}
Expand All @@ -98,6 +98,17 @@ void destroy_pubsub_msg(pubsub_msg_t *m) {
memhook._free(m);
}


void run_pubsub_cb(module *mod, const msg_t *msg) {
/* If module is using some different receive function, honor it. */
recv_cb cb = stack_peek(mod->recvs);
if (!cb) {
/* Fallback to module default receive */
cb = mod->hook.recv;
}
cb(msg, mod->userdata);
}

/** Public API **/

module_ret_code module_ref(const self_t *self, const char *name, const self_t **modref) {
Expand Down
3 changes: 3 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ module_ref should then return a new self_t* object, add it to a stack in module.
- [x] Fix issue in map_new if m->data calloc fails: map object was not memsetted to 0; it would lead to a crash when map_free was called
- [x] Update doc!

### Fix
- [x] Actually honor current module's callback when flushing pubsub messages

### Examples
- [x] Update examples!

Expand Down
4 changes: 2 additions & 2 deletions docs/src/lifecycle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

<br />

Module Lifecycle
================
Lifecycle
=========

Easy API
--------
Expand Down
20 changes: 10 additions & 10 deletions docs/src/pubsub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

<br />

PubSub interface
================
PubSub
======

PubSub concept
--------------
Concepts
--------

Those unfamiliar with actor like messaging, may wonder what a pubsub messaging is. |br|
PubSub (Publisher-Subscriber) messaging is much like a producer/consumer architecture: an entity registers a "topic" on which it will send messages. |br|
Other entities can then subscribe to the topic to receive those messages. |br|

PubSub implementation
---------------------
Implementation
--------------

Since libmodule 2.1, pubsub implementation is async and makes use of unix pipes. |br|
When sending a message to other modules, a pubsub message is allocated and its address is written in recipient module's writable end of pipe. |br|
Expand All @@ -23,15 +23,15 @@ Since libmodule 4.0.0, module_tell() makes use of module references: it means re
Note that you cannot call any function on a module's reference as you cannot impersonate another module. |br|
Only module_is(), module_get_name/context() functions can be called passing as self_t handler a module's reference.

PubSub system messages
----------------------
System messages
---------------

Beside USER messages (pubsub_msg_t.type), there are 4 system messages, with type respectively: LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED. |br|
These pubsub messages are automatically sent by libmodule (note that sender will be NULL) when matching functions are called. |br|
For example, you can use TOPIC_REGISTERED message (note that pubsub_msg_t.topic will be valued matching newly created topic) to subscribe to a topic as soon as it appears in current context.

PubSub notes
------------
Notes
-----

Note that a context must be looping to receive any pubsub message. |br|
Moreover, when a context stops looping, all pubsub messages will be flushed and thus delivered to each RUNNING module. |br|
Expand Down

0 comments on commit 4d74942

Please sign in to comment.