Skip to content

Commit

Permalink
Code arrangement improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
FedeDP committed Jan 6, 2019
1 parent adad475 commit bced612
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 212 deletions.
216 changes: 7 additions & 209 deletions Lib/module.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "module.h"
#include "poll_priv.h"

/** Generic module interface **/

static module_ret_code init_ctx(const char *ctx_name, m_context **context);
static void destroy_ctx(m_context *context);
static m_context *check_ctx(const char *ctx_name);
Expand All @@ -9,12 +11,6 @@ static module_ret_code init_pubsub_fd(module *mod);
static void default_logger(const self_t *self, const char *fmt, va_list args, const void *userdata);
static module_ret_code _register_fd(module *mod, const int fd, const bool autoclose, const void *userptr);
static module_ret_code _deregister_fd(module *mod, const int fd);
static int tell_if(void *data, void *m);
static pubsub_msg_t *create_pubsub_msg(const unsigned char *message, const self_t *sender, const char *topic, enum msg_type type, const size_t size);
static module_ret_code tell_pubsub_msg(pubsub_msg_t *m, module *mod, m_context *c);
static module_ret_code publish_msg(const module *mod, const char *topic,
const unsigned char *message, const ssize_t size);
static inline _pure_ bool _module_is(const module *mod, const enum module_states st);
static int manage_fds(module *mod, m_context *c, const int flag, const bool stop);
static module_ret_code start(module *mod, const char *err_str);
static module_ret_code stop(module *mod, const char *err_str, const bool stop);
Expand Down Expand Up @@ -146,54 +142,6 @@ static module_ret_code _deregister_fd(module *mod, const int fd) {
return MOD_ERR;
}

static int tell_if(void *data, void *m) {
module *mod = (module *)m;
const pubsub_msg_t *msg = (pubsub_msg_t *)data;

/*
* Only if mod is actually running or paused and
* it is a SYSTEM message or
* topic is null or this module is subscribed to topic
*/
if (_module_is(mod, RUNNING | PAUSED) && (msg->type != USER || !msg->topic ||
map_has_key(mod->subscriptions, msg->topic))) {

MODULE_DEBUG("Telling a message to %s.\n", mod->name);

pubsub_msg_t *mm = create_pubsub_msg(msg->message, msg->sender, msg->topic, msg->type, msg->size);
if (!mm || write(mod->pubsub_fd[1], &mm, sizeof(pubsub_msg_t *)) != sizeof(pubsub_msg_t *)) {
MODULE_DEBUG("Failed to write message for %s: %s\n", mod->name, strerror(errno));
}
}
return MAP_OK;
}

static pubsub_msg_t *create_pubsub_msg(const unsigned char *message, const self_t *sender, const char *topic,
enum msg_type type, const size_t size) {
pubsub_msg_t *m = memhook._malloc(sizeof(pubsub_msg_t));
if (m) {
m->message = message;
m->sender = sender;
m->topic = mem_strdup(topic);
*(int *)&m->type = type;
*(size_t *)&m->size = size;
}
return m;
}

static module_ret_code tell_pubsub_msg(pubsub_msg_t *m, module *mod, m_context *c) {
if (mod) {
tell_if(m, mod);
} else {
map_iterate(c->modules, tell_if, m);
}
return MOD_OK;
}

static inline bool _module_is(const module *mod, const enum module_states st) {
return mod->state & st;
}

static int manage_fds(module *mod, m_context *c, const int flag, const bool stop) {
module_poll_t *tmp = mod->fds;
int ret = 0;
Expand Down Expand Up @@ -253,6 +201,11 @@ static module_ret_code stop(module *mod, const char *err_str, const bool stop) {

/** Private API **/

bool _module_is(const module *mod, const enum module_states st) {
return mod->state & st;
}


int evaluate_module(void *data, void *m) {
module *mod = (module *)m;
if (_module_is(mod, IDLE) && mod->hook.evaluate()) {
Expand All @@ -262,42 +215,6 @@ int evaluate_module(void *data, void *m) {
return MAP_OK;
}

char *mem_strdup(const char *s) {
char *new = NULL;
if (s) {
const int len = strlen(s) + 1;
new = memhook._malloc(len);
if (new) {
memcpy(new, s, len);
}
}
return new;
}

int flush_pubsub_msg(void *data, void *m) {
module *mod = (module *)m;
pubsub_msg_t *mm = NULL;

while (read(mod->pubsub_fd[0], &mm, sizeof(pubsub_msg_t *)) == sizeof(pubsub_msg_t *)) {
/*
* Actually tell msg ONLY if we are not deregistering module,
* ie: we are stopping looping on the context.
*/
if (!data) {
MODULE_DEBUG("Flushing pubsub message for module '%s'.\n", mod->name);
const msg_t msg = { .is_pubsub = 1, .pubsub_msg = mm };
mod->hook.recv(&msg, mod->userdata);
}
destroy_pubsub_msg(mm);
}
return 0;
}

void destroy_pubsub_msg(pubsub_msg_t *m) {
memhook._free((char *)m->topic);
memhook._free(m);
}

/** Public API **/

module_ret_code module_register(const char *name, const char *ctx_name, const self_t **self, const userhook *hook) {
Expand Down Expand Up @@ -487,125 +404,6 @@ module_ret_code module_get_context(const self_t *self, char **ctx) {
return MOD_OK;
}

/** Actor-like PubSub interface **/

module_ret_code module_ref(const self_t *self, const char *name, const self_t **modref) {
MOD_PARAM_ASSERT(name);
MOD_PARAM_ASSERT(modref);
MOD_PARAM_ASSERT(!*modref);

GET_CTX(self);
CTX_GET_MOD(name, c);

*modref = mod->self;
return MOD_OK;
}

module_ret_code module_register_topic(const self_t *self, const char *topic) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);
GET_CTX(self);

if (!map_has_key(c->topics, topic)) {
if (map_put(c->topics, topic, mod, true, false) == MAP_OK) {
tell_system_pubsub_msg(c, TOPIC_REGISTERED, topic);
return MOD_OK;
}
}
return MOD_ERR;
}

module_ret_code module_deregister_topic(const self_t *self, const char *topic) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);
GET_CTX(self);

void *tmp = map_get(c->topics, topic); // NULL if key is not present
/* Only same mod which registered topic can deregister it */
if (tmp == mod) {
if (map_remove(c->topics, topic) == MAP_OK) {
tell_system_pubsub_msg(c, TOPIC_DEREGISTERED, topic);
return MOD_OK;
}
}
return MOD_ERR;
}

module_ret_code module_subscribe(const self_t *self, const char *topic) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);
GET_CTX(self);

/* If topic exists and we are not already subscribed */
if (map_has_key(c->topics, topic) && !map_has_key(mod->subscriptions, topic)) {
/* Store pointer to mod as value, even if it will be unused; this should be a hashset */
if (map_put(mod->subscriptions, topic, mod, true, false) == MAP_OK) {
return MOD_OK;
}
}
return MOD_ERR;
}

module_ret_code module_unsubscribe(const self_t *self, const char *topic) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);

if (map_remove(mod->subscriptions, topic) == MAP_OK) {
return MOD_OK;
}
return MOD_ERR;
}

module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const char *topic) {
pubsub_msg_t m = { .topic = topic, .sender = NULL, .message = NULL, .type = type, .size = 0 };
return tell_pubsub_msg(&m, NULL, c);
}

module_ret_code module_tell(const self_t *self, const self_t *recipient, const unsigned char *message,
const ssize_t size) {
GET_MOD(self);
MOD_PARAM_ASSERT(message);
MOD_PARAM_ASSERT(size > 0);
MOD_PARAM_ASSERT(recipient);
/* only same ctx modules can talk */
MOD_PARAM_ASSERT(self->ctx == recipient->ctx);

pubsub_msg_t m = { .topic = NULL, .message = message, .sender = mod->self, .type = USER, .size = size };
return tell_pubsub_msg(&m, recipient->mod, recipient->ctx);
}

static module_ret_code publish_msg(const module *mod, const char *topic,
const unsigned char *message, const ssize_t size) {
MOD_PARAM_ASSERT(message);
MOD_PARAM_ASSERT(size > 0);

GET_CTX_PURE(mod->self);

void *tmp = NULL;
/*
* Only module that registered a topic can publish on the topic.
* Moreover, a publish can only be made on existent topic.
*/
if (!topic || ((tmp = map_get(c->topics, topic)) && tmp == mod)) {
pubsub_msg_t m = { .topic = topic, .message = message, .sender = mod->self, .type = USER, .size = size };
return tell_pubsub_msg(&m, NULL, c);
}
return MOD_ERR;
}

module_ret_code module_publish(const self_t *self, const char *topic, const unsigned char *message, const ssize_t size) {
MOD_PARAM_ASSERT(topic);
GET_MOD(self);

return publish_msg(mod, topic, message, size);
}

module_ret_code module_broadcast(const self_t *self, const unsigned char *message, const ssize_t size) {
GET_MOD(self);

return publish_msg(mod, NULL, message, size);
}

/** Module state getters **/

bool module_is(const self_t *self, const enum module_states st) {
Expand Down
2 changes: 1 addition & 1 deletion Lib/poll_priv.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "module_priv.h"
#include "priv.h"
#include <errno.h>
#include <string.h>
#include <fcntl.h>
Expand Down
14 changes: 14 additions & 0 deletions Lib/priv.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "priv.h"
#include <string.h>

char *mem_strdup(const char *s) {
char *new = NULL;
if (s) {
const int len = strlen(s) + 1;
new = memhook._malloc(len);
if (new) {
memcpy(new, s, len);
}
}
return new;
}
6 changes: 6 additions & 0 deletions Lib/module_priv.h → Lib/priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,16 @@ struct _self {
const bool is_ref; // is this a reference?
};

/* Defined in module.c */
_pure_ bool _module_is(const module *mod, const enum module_states st);
int evaluate_module(void *data, void *m);

/* Defined in pubsub.c */
module_ret_code tell_system_pubsub_msg(m_context *c, enum msg_type type, const char *topic);
int flush_pubsub_msg(void *data, void *m);
void destroy_pubsub_msg(pubsub_msg_t *m);

/* Defined in priv.c */
char *mem_strdup(const char *s);

extern map_t *ctx;
Expand Down

0 comments on commit bced612

Please sign in to comment.