Skip to content

Commit

Permalink
Added support for regex topic matching.
Browse files Browse the repository at this point in the history
  • Loading branch information
FedeDP committed Aug 18, 2019
1 parent 973e426 commit 3b70803
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Lib/priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ struct _module {
const void *userdata; // module's user defined data
enum module_states state; // module's state
const char *name; // module's name
fd_priv_t *fds; // module's fds to be polled
fd_priv_t *fds; // module's fds to be polled
map_t *subscriptions; // module's subscriptions
int pubsub_fd[2]; // In and Out pipe for pubsub msg
self_t self; // pointer to self (and thus context)
Expand Down
2 changes: 1 addition & 1 deletion Lib/public/module/map.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ typedef struct _map_itr map_itr_t;
extern "C"{
#endif

_public_ map_t *map_new(const bool autofree, const map_dtor fn);
_public_ map_t *map_new(const bool keysdup, const map_dtor fn);
_public_ map_itr_t *map_itr_new(const map_t *m);
_public_ map_itr_t *map_itr_next(map_itr_t *itr);
_public_ map_ret_code map_itr_remove(map_itr_t *itr);
Expand Down
59 changes: 48 additions & 11 deletions Lib/pubsub.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#include "module.h"
#include "poll_priv.h"
#include <regex.h>

/** Actor-like PubSub interface **/

static void regex_dtor(void *data);
static bool is_subscribed(mod_t *mod, const char *topic);
static map_ret_code tell_if(void *data, const char *key, void *value);
static ps_priv_t *create_pubsub_msg(const void *message, const self_t *sender, const char *topic,
enum msg_type type, const size_t size, const bool autofree);
Expand All @@ -13,14 +16,39 @@ static module_ret_code send_msg(const mod_t *mod, mod_t *recipient,
const char *topic, const void *message,
const ssize_t size, const bool autofree, const bool global);

static void regex_dtor(void *data) {
regfree(data);
memhook._free(data);
}

static bool is_subscribed(mod_t *mod, const char *topic) {
/* Check if module is directly subscribed to topic */
if (map_has_key(mod->subscriptions, topic)) {
return true;
}

/* Check if any stored subscriptions is a regex that matches topic */
map_itr_t *itr = map_itr_new(mod->subscriptions);
for (; itr; itr = map_itr_next(itr)) {
const regex_t *reg = map_itr_get_data(itr);

/* Execute regular expression */
int ret = regexec(reg, topic, 0, NULL, 0);
if (!ret) {
return true;
}
}
return false;
}

static map_ret_code tell_if(void *data, const char *key, void *value) {
mod_t *mod = (mod_t *)value;
ps_priv_t *msg = (ps_priv_t *)data;

if (_module_is(mod, RUNNING | PAUSED) && // mod is running or paused
((msg->msg.type != USER && msg->msg.sender != &mod->self) || // system messages with sender != this module (avoid sending ourselves system messages produced by us)
(msg->msg.type == USER && // it is a publish and mod is subscribed on topic, or it is a broadcast/direct tell message
(!msg->msg.topic || map_has_key(mod->subscriptions, msg->msg.topic))))) {
(!msg->msg.topic || is_subscribed(mod, msg->msg.topic))))) {

MODULE_DEBUG("Telling a message to '%s'.\n", mod->name);

Expand Down Expand Up @@ -160,17 +188,26 @@ module_ret_code module_subscribe(const self_t *self, const char *topic) {
GET_MOD(self);
GET_CTX(self);

/* Lazy subscriptions map init */
if (!mod->subscriptions) {
mod->subscriptions = map_new(false, NULL);
MOD_ALLOC_ASSERT(mod->subscriptions);
}

if (!map_has_key(mod->subscriptions, topic) &&
/* Store pointer to mod as value, even if it will be unused; this should be a hashset */
map_put(mod->subscriptions, topic, mod) == MAP_OK) {
/* Check if it is a valid regex: compile it */
regex_t regex;
int ret = regcomp(&regex, topic, REG_NOSUB);
if (!ret) {
MODULE_DEBUG("'%s' is a valid regex.\n", topic);

return MOD_OK;
/* Lazy subscriptions map init */
if (!mod->subscriptions) {
mod->subscriptions = map_new(false, regex_dtor);
MOD_ALLOC_ASSERT(mod->subscriptions);
}

if (!map_has_key(mod->subscriptions, topic)) {
/* Store regex on heap */
regex_t *reg = memhook._malloc(sizeof(regex_t));
memcpy(reg, &regex, sizeof(regex_t));
if (map_put(mod->subscriptions, topic, reg) == MAP_OK) {
return MOD_OK;
}
}
}
return MOD_ERR;
}
Expand Down
4 changes: 2 additions & 2 deletions Samples/Easy/doggo.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ static void module_pre_start(void) {
}

static void init(void) {
/* Doggo should subscribe to "leaving" topic */
m_subscribe("leaving");
/* Doggo should subscribe to "leaving" topic, as regex */
m_subscribe("leav[i+]ng");
}

static bool check(void) {
Expand Down
1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ conversely to module_stop that should stop module right away freeing all its enq
- [x] when stop looping on a context, flush all pubsub messages to RUNNING modules only. Destroy messages for non-running modules.
- [x] Drop (de)register_topic?
- [x] mod->subscriptions map lazy creation
- [x] Support regex topic subscriptions

### Map
- [x] FIx: avoid incrementing map size on value update
Expand Down
6 changes: 3 additions & 3 deletions docs/src/map.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ API

Where not specified, these functions return a map_ret_code.

.. c:function:: map_new(const bool autofree, const map_dtor fn)
.. c:function:: map_new(const bool keysdup, const map_dtor fn)
Create a new map_t object.

:param autofree: whether keys lifetime should be managed by map
:param keysdup: whether keys lifetime should be managed by map
:param fn: callback called on value destroy. If NULL, values won't be automatically destroyed.
:type autofree: :c:type:`const bool`
:type keysdup: :c:type:`const bool`
:type fn: :c:type:`const map_dtor`

:returns: pointer to newly allocated map_t.
Expand Down
6 changes: 3 additions & 3 deletions docs/src/module.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Where not specified, these functions return a :ref:`module_ret_code <module_ret_

.. c:macro:: m_log(fmt, args)
Logger function for this module. Call it the same way you'd call printf
Logger function for this module. Call it the same way you'd call printf.

:param fmt: log's format.
:param args: variadic argument.
Expand All @@ -145,7 +145,7 @@ Where not specified, these functions return a :ref:`module_ret_code <module_ret_

.. c:macro:: m_subscribe(topic)
Subscribes the module to a topic. If module is already subscribed to topic, MODULE_ERR will be returned
Subscribes the module to a topic. If module is already subscribed to topic, MODULE_ERR will be returned. Note that a regex is a valid topic too.

:param topic: topic to which subscribe.
:type topic: :c:type:`const char *`
Expand Down Expand Up @@ -403,7 +403,7 @@ Again, where not specified, these functions return a :ref:`module_ret_code <modu

.. c:function:: module_subscribe(self, topic)
Subscribes the module to a topic. If module is already subscribed to topic, MODULE_ERR will be returned.
Subscribes the module to a topic. If module is already subscribed to topic, MODULE_ERR will be returned. Note that a regex is a valid topic too.

:param self: pointer to module's handler
:param topic: topic to which subscribe.
Expand Down
7 changes: 7 additions & 0 deletions docs/src/pubsub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ It will have valued type and sender fields; sender will be set to started(stoppe

Finally, note that system messages with valued sender won't be sent to modules that actually generated the message.

Topics
------

Topics are just strings. Thus, if subscribing to "foo" topic, a module will receive: broadcasted messages, messages told directly to it and messages published on "foo" topic. |br|
Note that you can subscribe to regex patterns too, eg: "f[o+]" would receive messages published on "fo", "foo", "fooo" etc etc. |br|
When you unsubscribe from a regex topic, you should pass same regex to which you subscribed.

Notes
-----

Expand Down

0 comments on commit 3b70803

Please sign in to comment.