Skip to content

Commit

Permalink
* Added new "global" parameter to module_broadcast, to actually broad…
Browse files Browse the repository at this point in the history
…cast a message to every module in every context.
  • Loading branch information
FedeDP committed Jul 13, 2019
1 parent a25e536 commit e7abdb1
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Lib/public/module/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ _public_ module_ret_code module_tell(const self_t *self, const self_t *recipient
_public_ module_ret_code module_publish(const self_t *self, const char *topic, const void *message,
const ssize_t size, const bool autofree);
_public_ module_ret_code module_broadcast(const self_t *self, const void *message,
const ssize_t size, const bool autofree);
const ssize_t size, const bool autofree, bool global);

#ifdef __cplusplus
}
Expand Down
4 changes: 2 additions & 2 deletions Lib/public/module/module_easy.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static void _ctor2_ module_pre_start(void)
#define m_unsubscribe(topic) module_unsubscribe(self(), topic)
#define m_tell(recipient, msg, size, free) module_tell(self(), recipient, msg, size, free)
#define m_publish(topic, msg, size, free) module_publish(self(), topic, msg, size, free)
#define m_broadcast(msg, size, free) module_broadcast(self(), msg, size, free)
#define m_broadcast(msg, size, free, global) module_broadcast(self(), msg, size, free, global)
#define m_tell_str(recipient, msg) module_tell(self(), recipient, (const void *)msg, strlen(msg), false)
#define m_publish_str(topic, msg) module_publish(self(), topic, (const void *)msg, strlen(msg), false)
#define m_broadcast_str(msg) module_broadcast(self(), (const void *)msg, strlen(msg), false)
#define m_broadcast_str(msg, global) module_broadcast(self(), (const void *)msg, strlen(msg), false, global)
27 changes: 21 additions & 6 deletions Lib/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ static ps_priv_t *create_pubsub_msg(const void *message, const self_t *sender, c
enum msg_type type, const size_t size, const bool autofree);
static void destroy_pubsub_msg(ps_priv_t *pubsub_msg);
static module_ret_code tell_pubsub_msg(ps_priv_t *m, module *mod, m_context *c);
static map_ret_code tell_global(void *data, const char *key, void *value);
static module_ret_code publish_msg(const module *mod, const char *topic, const void *message,
const ssize_t size, const bool autofree);
const ssize_t size, const bool autofree, const bool global);

static map_ret_code tell_if(void *data, const char *key, void *value) {
module *mod = (module *)value;
Expand Down Expand Up @@ -71,8 +72,17 @@ static module_ret_code tell_pubsub_msg(ps_priv_t *m, module *mod, m_context *c)
return MOD_OK;
}

static map_ret_code tell_global(void *data, const char *key, void *value) {
m_context *c = (m_context *)value;
ps_priv_t *msg = (ps_priv_t *)data;

tell_pubsub_msg(msg, NULL, c);
return MAP_OK;
}


static module_ret_code publish_msg(const module *mod, const char *topic, const void *message,
const ssize_t size, const bool autofree) {
const ssize_t size, const bool autofree, const bool global) {
MOD_PARAM_ASSERT(message);
MOD_PARAM_ASSERT(size > 0);

Expand All @@ -85,7 +95,12 @@ static module_ret_code publish_msg(const module *mod, const char *topic, const v
*/
if (!topic || ((tmp = map_get(c->topics, topic)) && tmp == mod)) {
ps_priv_t *m = create_pubsub_msg(message, &mod->self, topic, USER, size, autofree);
return tell_pubsub_msg(m, NULL, c);
if (!global) {
return tell_pubsub_msg(m, NULL, c);
}
if (map_iterate(ctx, tell_global, m) == MAP_OK) {
return MOD_OK;
}
}
return MOD_ERR;
}
Expand Down Expand Up @@ -221,12 +236,12 @@ module_ret_code module_publish(const self_t *self, const char *topic, const void
MOD_PARAM_ASSERT(topic);
GET_MOD(self);

return publish_msg(mod, topic, message, size, autofree);
return publish_msg(mod, topic, message, size, autofree, false);
}

module_ret_code module_broadcast(const self_t *self, const void *message,
const ssize_t size, const bool autofree) {
const ssize_t size, const bool autofree, bool global) {
GET_MOD(self);

return publish_msg(mod, NULL, message, size, autofree);
return publish_msg(mod, NULL, message, size, autofree, global);
}
7 changes: 7 additions & 0 deletions Samples/MultiCtx/a.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#endif
#include <unistd.h>
#include <stdint.h>
#include <string.h>

static const char *myCtx = "FirstCtx";
/*
Expand Down Expand Up @@ -86,6 +87,9 @@ static void receive(const msg_t *msg, const void *userdata) {
m_become(ready);
m_set_userdata(&counter);
}
} else if (msg->ps_msg->type == USER && !strcmp((const char *)msg->ps_msg->message, "Leave")) {
m_log("Other context left. Leaving...\n");
modules_ctx_quit(myCtx, 0);
}
}

Expand All @@ -108,5 +112,8 @@ static void receive_ready(const msg_t *msg, const void *userdata) {
if (*counter == 10) {
modules_ctx_quit(myCtx, 0);
}
} else if (msg->ps_msg->type == USER && !strcmp((const char *)msg->ps_msg->message, "Leave")) {
m_log("Other context left. Leaving...\n");
modules_ctx_quit(myCtx, 0);
}
}
2 changes: 2 additions & 0 deletions Samples/MultiCtx/b.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <bits/sigaction.h>
#endif
#include <unistd.h>
#include <string.h>

static const char *myCtx = "SecondCtx";
/*
Expand Down Expand Up @@ -80,6 +81,7 @@ static void receive(const msg_t *msg, const void *userdata) {
m_log("an error occurred while getting signalfd data.\n");
}
m_log("received signal %d. Leaving.\n", fdsi.ssi_signo);
m_broadcast_str("Leave", true);
modules_ctx_quit(myCtx, 0);
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
- [x] Fix memleaks!
- [x] Unsubscribe shopuld not check if topic is registered in ctx as otherwise umsubscribing from a deregistered topic would not work.
- [x] pubsub interface should take "const void *" instead of "const unsigned char *" as data
- [ ] Add a new parameter "bool global" to module_broadcast?
- [x] Add a new parameter "bool global" to module_broadcast?
- [x] Rename pubsub_msg to ps_msg inside msg_t
- [x] Rename pubsub_msg_t to ps_msg_t
- [x] Rename pubsub_priv_t to ps_priv_t
Expand Down
24 changes: 15 additions & 9 deletions docs/src/module.rst
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,18 @@ Where not specified, these functions return a :ref:`module_ret_code <module_ret_
:type size: :c:type:`const ssize_t`
:type autofree: :c:type:`const bool`

.. c:macro:: m_broadcast(msg, size, autofree)
.. c:macro:: m_broadcast(msg, size, autofree, global)
Broadcast a message in module's context.
Broadcast a message.

:param msg: data to be delivered to all modules in a context.
:param size: size of data to be delivered.
:param autofree: whether to autofree msg after last recipient's received it.
:param global: whether to broadcast to every context.
:type msg: :c:type:`const void *`
:type size: :c:type:`const ssize_t`
:type autofree: :c:type:`const bool`
:type global: :c:type:`const bool`

.. c:macro:: m_tell_str(recipient, msg)
Expand All @@ -220,17 +222,19 @@ Where not specified, these functions return a :ref:`module_ret_code <module_ret_
Publish a string message on a topic. Size is automatically computed through strlen, and autofree is set to false.

:param topic: topic on which publish message. NULL to broadcast message to all modules in same context. Note that only topic creator can publish message on topic.
:param topic: topic on which publish message. Note that only topic creator can publish message on topic.
:param msg: message to be published.
:type topic: :c:type:`const char *`
:type msg: :c:type:`const char *`

.. c:macro:: m_broadcast_str(msg)
.. c:macro:: m_broadcast_str(msg, global)
Broadcast a string message in module's context. Same as calling m_publish(NULL, msg). Size is automatically computed through strlen, and autofree is set to false.
Broadcast a string message. Same as calling m_publish(NULL, msg). Size is automatically computed through strlen, and autofree is set to false.

:param msg: message to be delivered to all modules in a context.
:param global: whether to broadcast to every context.
:type msg: :c:type:`const char *`
:type global: :c:type:`const bool`

.. _module_complex:

Expand Down Expand Up @@ -468,15 +472,17 @@ Again, where not specified, these functions return a :ref:`module_ret_code <modu
:type size: :c:type:`const ssize_t`
:type autofree: :c:type:`const bool`

.. c:function:: module_broadcast(self, msg, size, autofree)
.. c:function:: module_broadcast(self, msg, size, autofree, global)
Broadcast a message to all modules inside context.
Broadcast a message.

:param self: pointer to module's handler
:param msg: actual data to be published.
:param size: size of data to be published.
:param msg: data to be delivered to all modules in a context.
:param size: size of data to be delivered.
:param autofree: whether to autofree msg after last recipient's received it.
:param global: whether to broadcast to every context.
:type self: :c:type:`const self_t *`
:type msg: :c:type:`const void *`
:type size: :c:type:`const ssize_t`
:type autofree: :c:type:`const bool`
:type global: :c:type:`const bool`
8 changes: 4 additions & 4 deletions tests/test_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -439,28 +439,28 @@ void test_module_publish(void **state) {
void test_module_broadcast_NULL_self(void **state) {
(void) state; /* unused */

module_ret_code ret = module_broadcast(NULL, (unsigned char *)"hi!", strlen("hi!"), false);
module_ret_code ret = module_broadcast(NULL, (unsigned char *)"hi!", strlen("hi!"), false, false);
assert_false(ret == MOD_OK);
}

void test_module_broadcast_NULL_msg(void **state) {
(void) state; /* unused */

module_ret_code ret = module_broadcast(self, NULL, strlen("hi!"), false);
module_ret_code ret = module_broadcast(self, NULL, strlen("hi!"), false, false);
assert_false(ret == MOD_OK);
}

void test_module_broadcast_wrong_size(void **state) {
(void) state; /* unused */

module_ret_code ret = module_broadcast(self, (unsigned char *)"hi!", -1, false);
module_ret_code ret = module_broadcast(self, (unsigned char *)"hi!", -1, false, false);
assert_false(ret == MOD_OK);
}

void test_module_broadcast(void **state) {
(void) state; /* unused */

module_ret_code ret = module_broadcast(self, (unsigned char *)"hi3!", strlen("hi!"), false);
module_ret_code ret = module_broadcast(self, (unsigned char *)"hi3!", strlen("hi!"), false, false);
assert_true(ret == MOD_OK);
}

Expand Down

0 comments on commit e7abdb1

Please sign in to comment.