Skip to content

Commit

Permalink
Dropped module_register_topic/deregister_topic. Way simpler interface…
Browse files Browse the repository at this point in the history
…. Dropped TOPIC_(DE)REGISTERED sys messages too.
  • Loading branch information
FedeDP committed Aug 3, 2019
1 parent 15a111c commit a954672
Show file tree
Hide file tree
Showing 20 changed files with 37 additions and 213 deletions.
10 changes: 1 addition & 9 deletions Lib/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ static module_ret_code init_ctx(const char *ctx_name, m_context **context) {
(*context)->logger = default_logger;

(*context)->modules = map_new(false, NULL);
(*context)->topics = map_new(false, NULL);

(*context)->name = ctx_name;
if ((*context)->topics && (*context)->modules &&
if ((*context)->modules &&
map_put(ctx, (*context)->name, *context) == MAP_OK) {

return MOD_OK;
Expand All @@ -43,7 +42,6 @@ static module_ret_code init_ctx(const char *ctx_name, m_context **context) {
static void destroy_ctx(m_context *context) {
MODULE_DEBUG("Destroying context '%s'.\n", context->name);
map_free(context->modules);
map_free(context->topics);
poll_close(context->fd, &context->pevents, &context->max_events);
map_remove(ctx, context->name);
memhook._free(context);
Expand Down Expand Up @@ -271,11 +269,6 @@ module_ret_code module_register(const char *name, const char *ctx_name, self_t *
mod->state = IDLE;
mod->fds = NULL;

mod->subscriptions = map_new(false, NULL);
if (!mod->subscriptions) {
break;
}

mod->recvs = stack_new(NULL);
if (!mod->recvs) {
break;
Expand Down Expand Up @@ -303,7 +296,6 @@ module_ret_code module_register(const char *name, const char *ctx_name, self_t *
}
memhook._free(*self);
*self = NULL;
map_free(mod->subscriptions);
stack_free(mod->recvs);
memhook._free(mod);
return ret;
Expand Down
1 change: 0 additions & 1 deletion Lib/priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ struct _context {
map_t *modules; // Context's modules
void *pevents; // Context's polled events structs
int max_events; // Max number of returned events for epoll/kqueue
map_t *topics; // Context's registered topics
size_t running_mods; // Number of RUNNING modules in context
};

Expand Down
2 changes: 0 additions & 2 deletions Lib/public/module/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ _public_ module_ret_code module_ref(const self_t *self, const char *name, const
_public_ module_ret_code module_become(const self_t *self, const recv_cb new_recv);
_public_ module_ret_code module_unbecome(const self_t *self);

_public_ module_ret_code module_register_topic(const self_t *self, const char *topic);
_public_ module_ret_code module_deregister_topic(const self_t *self, const char *topic);
_public_ module_ret_code module_subscribe(const self_t *self, const char *topic);
_public_ module_ret_code module_unsubscribe(const self_t *self, const char *topic);

Expand Down
2 changes: 1 addition & 1 deletion Lib/public/module/module_cmn.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ typedef struct _self self_t;
enum module_states { IDLE = 0x1, RUNNING = 0x2, PAUSED = 0x4, STOPPED = 0x8 };

/* PubSub message types */
enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED, MODULE_STARTED, MODULE_STOPPED, MODULE_POISONPILL };
enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, MODULE_STARTED, MODULE_STOPPED, MODULE_POISONPILL };

typedef struct {
const char *topic;
Expand Down
3 changes: 1 addition & 2 deletions Lib/public/module/module_easy.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ static void _ctor2_ module_pre_start(void)

#define m_become(x) module_become(self(), receive_##x)
#define m_unbecome() module_unbecome(self())
#define m_register_topic(topic) module_register_topic(self(), topic)
#define m_deregister_topic(topic) module_deregister_topic(self(), topic)

#define m_subscribe(topic) module_subscribe(self(), topic)
#define m_unsubscribe(topic) module_unsubscribe(self(), topic)
#define m_tell(recipient, msg, size, free) module_tell(self(), recipient, msg, size, free)
Expand Down
62 changes: 14 additions & 48 deletions Lib/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ static ps_priv_t *create_pubsub_msg(const void *message, const self_t *sender, c
static map_ret_code tell_global(void *data, const char *key, void *value);
static void destroy_pubsub_msg(ps_priv_t *pubsub_msg);
static module_ret_code tell_pubsub_msg(ps_priv_t *m, module *mod, m_context *c, const bool global);
static module_ret_code publish_msg(const module *mod, const char *topic, const void *message,
static module_ret_code send_msg(const module *mod, module *recipient,
const char *topic, const void *message,
const ssize_t size, const bool autofree, const bool global);

static map_ret_code tell_if(void *data, const char *key, void *value) {
Expand Down Expand Up @@ -82,24 +83,16 @@ static module_ret_code tell_pubsub_msg(ps_priv_t *m, module *mod, m_context *c,
return MOD_OK;
}


static module_ret_code publish_msg(const module *mod, const char *topic, const void *message,
static module_ret_code send_msg(const module *mod, module *recipient,
const char *topic, const void *message,
const ssize_t size, const bool autofree, const bool global) {
MOD_PARAM_ASSERT(message);
MOD_PARAM_ASSERT(size > 0);

GET_CTX_PRIV((&mod->self));

void *tmp = NULL;
/*
* Only module that registered a topic can publish on the topic.
* Moreover, a publish can only be made on existent topic.
*/
if (!topic || ((tmp = map_get(c->topics, topic)) && tmp == mod)) {
ps_priv_t *m = create_pubsub_msg(message, &mod->self, topic, USER, size, autofree);
return tell_pubsub_msg(m, NULL, c, global);
}
return MOD_ERR;
ps_priv_t *m = create_pubsub_msg(message, &mod->self, topic, USER, size, autofree);
return tell_pubsub_msg(m, recipient, c, global);
}

/** Private API **/
Expand Down Expand Up @@ -162,40 +155,16 @@ module_ret_code module_ref(const self_t *self, const char *name, const self_t **
return MOD_OK;
}

module_ret_code module_register_topic(const self_t *self, const char *topic) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);
GET_CTX(self);

if (!map_has_key(c->topics, topic)) {
if (map_put(c->topics, topic, mod) == MAP_OK) {
tell_system_pubsub_msg(NULL, c, TOPIC_REGISTERED, &mod->self, topic);
return MOD_OK;
}
}
return MOD_ERR;
}

module_ret_code module_deregister_topic(const self_t *self, const char *topic) {
module_ret_code module_subscribe(const self_t *self, const char *topic) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);
GET_CTX(self);

void *tmp = map_get(c->topics, topic); // NULL if key is not present
/* Only same mod which registered topic can deregister it */
if (tmp == mod) {
if (map_remove(c->topics, topic) == MAP_OK) {
tell_system_pubsub_msg(NULL, c, TOPIC_DEREGISTERED, &mod->self, topic);
return MOD_OK;
}
/* Lazy subscriptions map init */
if (!mod->subscriptions) {
mod->subscriptions = map_new(false, NULL);
MOD_ALLOC_ASSERT(mod->subscriptions);
}
return MOD_ERR;
}

module_ret_code module_subscribe(const self_t *self, const char *topic) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);
GET_CTX(self);

if (!map_has_key(mod->subscriptions, topic) &&
/* Store pointer to mod as value, even if it will be unused; this should be a hashset */
Expand All @@ -221,29 +190,26 @@ module_ret_code module_unsubscribe(const self_t *self, const char *topic) {
module_ret_code module_tell(const self_t *self, const self_t *recipient, const void *message,
const ssize_t size, const bool autofree) {
GET_MOD(self);
MOD_PARAM_ASSERT(message);
MOD_PARAM_ASSERT(size > 0);
MOD_PARAM_ASSERT(recipient);
/* only same ctx modules can talk */
MOD_PARAM_ASSERT(self->ctx == recipient->ctx);

ps_priv_t *m = create_pubsub_msg(message, &mod->self, NULL, USER, size, autofree);
return tell_pubsub_msg(m, recipient->mod, recipient->ctx, false);
return send_msg(mod, recipient->mod, NULL, message, size, autofree, false);
}

module_ret_code module_publish(const self_t *self, const char *topic, const void *message,
const ssize_t size, const bool autofree) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);

return publish_msg(mod, topic, message, size, autofree, false);
return send_msg(mod, NULL, topic, message, size, autofree, false);
}

module_ret_code module_broadcast(const self_t *self, const void *message,
const ssize_t size, const bool autofree, bool global) {
GET_MOD(self);

return publish_msg(mod, NULL, message, size, autofree, global);
return send_msg(mod, NULL, NULL, message, size, autofree, global);
}

module_ret_code module_poisonpill(const self_t *self, const self_t *recipient) {
Expand Down
7 changes: 2 additions & 5 deletions Samples/Cpp/doggo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ static void module_pre_start(void) {
}

static void init(void) {

/* Doggo should subscribe to "leaving" topic */
m_subscribe("leaving");
}

static bool check(void) {
Expand Down Expand Up @@ -46,10 +47,6 @@ static void receive(const msg_t *msg, const void *userdata) {
m_log("???\n");
}
break;
case TOPIC_REGISTERED:
/* Doggo should subscribe to "leaving" topic */
m_subscribe(msg->ps_msg->topic);
break;
default:
break;
}
Expand Down
2 changes: 0 additions & 2 deletions Samples/Cpp/pippo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ static void receive(const msg_t *msg, const void *userdata) {
} else {
if (msg->ps_msg->type == USER && !strcmp((char *)msg->ps_msg->message, "BauBau")) {
m_become(ready);
/* Finally register Leaving topic */
m_register_topic("leaving");
m_log("Press 'p' to play with Doggo! Or 'f' to feed your Doggo. 's' to have a nap. 'w' to wake him up. 'q' to leave him for now.\n");
}
}
Expand Down
7 changes: 2 additions & 5 deletions Samples/Easy/doggo.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ static void module_pre_start(void) {
}

static void init(void) {

/* Doggo should subscribe to "leaving" topic */
m_subscribe("leaving");
}

static bool check(void) {
Expand Down Expand Up @@ -55,10 +56,6 @@ static void receive(const msg_t *msg, const void *userdata) {
m_log("???\n");
}
break;
case TOPIC_REGISTERED:
/* Doggo should subscribe to "leaving" topic */
m_subscribe(msg->ps_msg->topic);
break;
case MODULE_STOPPED: {
char *name = NULL;
module_get_name(msg->ps_msg->sender, &name);
Expand Down
2 changes: 0 additions & 2 deletions Samples/Easy/pippo.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ static void receive(const msg_t *msg, const void *userdata) {
!strcmp((char *)msg->ps_msg->message, "BauBau")) {

m_become(ready);
/* Finally register Leaving topic */
m_register_topic("leaving");
m_log("Press 'p' to play with Doggo! Or 'f' to feed your Doggo. 's' to have a nap. 'w' to wake him up. 'q' to leave him for now.\n");
}
}
Expand Down
7 changes: 2 additions & 5 deletions Samples/Poll/doggo.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ static void module_pre_start(void) {
}

static void init(void) {

/* Doggo should subscribe to "leaving" topic */
m_subscribe("leaving");
}

static bool check(void) {
Expand Down Expand Up @@ -45,10 +46,6 @@ static void receive(const msg_t *msg, const void *userdata) {
m_log("???\n");
}
break;
case TOPIC_REGISTERED:
/* Doggo should subscribe to "leaving" topic */
m_subscribe(msg->ps_msg->topic);
break;
default:
break;
}
Expand Down
2 changes: 0 additions & 2 deletions Samples/Poll/pippo.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ static void receive(const msg_t *msg, const void *userdata) {
} else {
if (msg->ps_msg->type == USER && !strcmp((char *)msg->ps_msg->message, "BauBau")) {
m_become(ready);
/* Finally register Leaving topic */
m_register_topic("leaving");
m_log("Press 'p' to play with Doggo! Or 'f' to feed your Doggo. 's' to have a nap. 'w' to wake him up. 'q' to leave him for now.\n");
}
}
Expand Down
7 changes: 2 additions & 5 deletions Samples/SharedSrc/mod.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ static void A_init(void) {
}

static void B_init(void) {

/* Doggo should subscribe to "leaving" topic */
module_subscribe(selfB, "leaving");
}

static bool evaluate(void) {
Expand Down Expand Up @@ -151,10 +152,6 @@ static void B_recv(const msg_t *msg, const void *userdata) {
module_log(selfB, "???\n");
}
break;
case TOPIC_REGISTERED:
/* Doggo should subscribe to "leaving" topic */
module_subscribe(selfB, msg->ps_msg->topic);
break;
default:
break;
}
Expand Down
5 changes: 4 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ conversely to module_stop that should stop module right away freeing all its enq
- [x] Always destroy messages in flush_pubsub_msg()? RIght now it delivers all of them if we're stopping looping on ctx
- [x] module_poisonpill should only be sent to RUNNING modules
- [x] when stop looping on a context, flush all pubsub messages to RUNNING modules only. Destroy messages for non-running modules.
- [x] Drop (de)register_topic?
- [x] mod->subscriptions map lazy creation

### Map
- [x] FIx: avoid incrementing map size on value update
Expand Down Expand Up @@ -82,7 +84,8 @@ conversely to module_stop that should stop module right away freeing all its enq
- [x] New modules_loop behaviour (stop looping when no RUNNING modules)
- [x] New Map API
- [x] New Stack API

- [x] Dropped (de)register_topic!

### Samples
- [x] Fix samples

Expand Down
2 changes: 1 addition & 1 deletion docs/src/data_structures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Types
enum module_states { IDLE = 0x1, RUNNING = 0x2, PAUSED = 0x4, STOPPED = 0x8 };
/* PubSub message types */
enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, TOPIC_REGISTERED, TOPIC_DEREGISTERED, MODULE_STARTED, MODULE_STOPPED, MODULE_POISONPILL };
enum msg_type { USER, LOOP_STARTED, LOOP_STOPPED, MODULE_STARTED, MODULE_STOPPED, MODULE_POISONPILL };
typedef struct {
const char *topic;
Expand Down

0 comments on commit a954672

Please sign in to comment.