Skip to content

Commit

Permalink
new(core): introduced a tokenbucket like API for module.
Browse files Browse the repository at this point in the history
Moreover, cleaned up TODO.

Signed-off-by: Federico Di Pierro <nierro92@gmail.com>
  • Loading branch information
FedeDP committed Sep 6, 2022
1 parent bec233b commit 585f5ec
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 511 deletions.
21 changes: 17 additions & 4 deletions Lib/core/ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,29 @@ static void push_evt(m_mod_t *mod, evt_priv_t *evt) {
ev_src_t *src = evt->src;
m_evt_t *msg = &evt->evt;

const bool is_batch_timer = src && src->type == M_SRC_TYPE_TMR && src->flags & M_SRC_INTERNAL;
const bool is_internal = src && (src->flags & M_SRC_INTERNAL);
bool force = false;

/*
* If it is a batch timer event, unref it as
* If it is an internal event, unref it as
* it does not need to be recved by user;
* else, push it onto the message queue.
*/
if (is_batch_timer) {
if (is_internal) {
const bool is_batch_timer = src->userptr == &mod->batch;
const bool is_tb_timer = src->userptr == &mod->tb;

m_mem_unref(evt);
/* When batch timer elapses, force flush events to user */
force = is_batch_timer;

/* If this is the tokenbucket timer, refill one token */
if (is_tb_timer) {
if (mod->tb.tokens < mod->tb.burst) {
mod->tb.tokens++;
}
}

} else {
m_queue_enqueue(mod->batch.events, evt);
/*
Expand Down Expand Up @@ -164,7 +178,6 @@ static void push_evt(m_mod_t *mod, evt_priv_t *evt) {
* run the pubsub callback!
*/
if (force ||
is_batch_timer ||
m_queue_len(mod->batch.events) >= mod->batch.len) {

/*
Expand Down
28 changes: 13 additions & 15 deletions Lib/core/evts.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,16 @@ _public_ m_mod_t *m_mod_ref(const m_mod_t *mod, const char *name) {
_public_ int m_mod_become(m_mod_t *mod, m_evt_cb new_on_evt) {
M_PARAM_ASSERT(new_on_evt);
M_MOD_ASSERT_STATE(mod, M_MOD_RUNNING);
M_MOD_CONSUME_TOKEN(mod);

int ret = m_stack_push(mod->recvs, new_on_evt);
if (ret == 0) {
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
}
return ret;
return m_stack_push(mod->recvs, new_on_evt);;
}

_public_ int m_mod_unbecome(m_mod_t *mod) {
M_MOD_ASSERT_STATE(mod, M_MOD_RUNNING);
M_MOD_CONSUME_TOKEN(mod);

if (m_stack_pop(mod->recvs) != NULL) {
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
return 0;
}
return -EINVAL;
Expand All @@ -60,6 +57,7 @@ _public_ int m_mod_unbecome(m_mod_t *mod) {
_public_ int m_mod_stash(m_mod_t *mod, const m_evt_t *evt) {
M_MOD_ASSERT(mod);
M_PARAM_ASSERT(evt);
M_MOD_CONSUME_TOKEN(mod);

evt_priv_t *priv_evt = (evt_priv_t *)evt;
m_src_flags prio_flags = 0;
Expand All @@ -69,16 +67,13 @@ _public_ int m_mod_stash(m_mod_t *mod, const m_evt_t *evt) {
// Cannot stash HIGH priority evts!
M_RET_ASSERT(!(prio_flags & M_SRC_PRIO_HIGH), -EPERM);

int ret = m_queue_enqueue(mod->stashed, m_mem_ref((void *)evt));
if (ret == 0) {
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
}
return ret;
return m_queue_enqueue(mod->stashed, m_mem_ref((void *)evt));;
}

_public_ ssize_t m_mod_unstash(m_mod_t *mod, size_t len) {
M_MOD_ASSERT(mod);
M_PARAM_ASSERT(len > 0);
M_MOD_CONSUME_TOKEN(mod);

m_queue_t *unstashed = m_queue_new(mem_dtor);
M_ALLOC_ASSERT(unstashed);
Expand Down Expand Up @@ -106,6 +101,7 @@ _public_ ssize_t m_mod_unstash(m_mod_t *mod, size_t len) {

_public_ int m_mod_set_batch_size(m_mod_t *mod, size_t len) {
M_MOD_ASSERT(mod);
M_MOD_CONSUME_TOKEN(mod);

mod->batch.len = len;
return 0;
Expand All @@ -114,19 +110,21 @@ _public_ int m_mod_set_batch_size(m_mod_t *mod, size_t len) {
_public_ int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ms) {
M_MOD_ASSERT(mod);

// src_deregister and src_register already consume a token

/* If it was already set, remove old timer */
if (mod->batch.timer.ms != 0) {
deregister_src(mod, M_SRC_TYPE_TMR, &mod->batch.timer);
m_mod_src_deregister_tmr(mod, &mod->batch.timer);
}
mod->batch.timer.clock_id = CLOCK_MONOTONIC;
mod->batch.timer.ms = timeout_ms;
if (timeout_ms != 0) {
// If batching by size was disabled
// If batching by size is disabled
if (mod->batch.len == 0) {
// Set a maximum value for batching so that only timed batching will be effective
mod->batch.len = -1;
mod->batch.len = SIZE_MAX;
}
return register_src(mod, M_SRC_TYPE_TMR, &mod->batch.timer, M_SRC_INTERNAL | M_SRC_PRIO_HIGH, NULL);
return m_mod_src_register_tmr(mod, &mod->batch.timer, M_SRC_INTERNAL | M_SRC_PRIO_HIGH, &mod->batch);
}
return 0;
}
44 changes: 38 additions & 6 deletions Lib/core/mod.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ static int init_pubsub_fd(m_mod_t *mod) {
static int manage_srcs(m_mod_t *mod, m_ctx_t *c, int flag, bool stop) {
int ret = 0;

fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
for (int i = 0; i < M_SRC_TYPE_END; i++) {
m_itr_foreach(mod->srcs[i], {
ev_src_t *t = m_itr_get(m_itr);
Expand Down Expand Up @@ -520,6 +519,10 @@ _public_ int m_mod_register(const char *name, m_ctx_t *c, m_mod_t **mod_ref, con
break;
}

// Initially disabled token bucket
mod->tb.burst = UINT64_MAX;
mod->tb.tokens = UINT64_MAX;

if (m_map_put(c->modules, mod->name, mod) == 0) {
mod->state = M_MOD_IDLE;

Expand All @@ -543,6 +546,35 @@ _public_ int m_mod_deregister(m_mod_t **mod) {
return mod_deregister(mod, true);
}

_public_ int m_mod_set_tokenbucket(m_mod_t *mod, uint16_t rate, uint64_t burst) {
M_MOD_ASSERT(mod);
M_PARAM_ASSERT(rate <= 1000);

// src_deregister and src_register already consume a token

/* If it was already set, remove old timer */
if (mod->tb.timer.ms != 0) {
m_mod_src_deregister_tmr(mod, &mod->tb.timer);
}

// Rate 0 -> disable tb
if (rate == 0) {
mod->tb.rate = 0;
mod->tb.burst = UINT64_MAX;
mod->tb.tokens = UINT64_MAX;
memset(&mod->tb.timer, 0, sizeof(mod->tb.timer));
return 0;
}

// Store new values and create new token bucket timer src
mod->tb.rate = rate;
mod->tb.burst = burst;
mod->tb.tokens = burst;
mod->tb.timer.clock_id = CLOCK_MONOTONIC;
mod->tb.timer.ms = 1 / rate;
return m_mod_src_register_tmr(mod, &mod->tb.timer, M_SRC_INTERNAL | M_SRC_PRIO_HIGH, &mod->tb);
}

_public_ __attribute__((format (printf, 2, 3))) int m_mod_log(const m_mod_t *mod, const char *fmt, ...) {
M_MOD_ASSERT(mod);
M_MOD_CTX(mod);
Expand Down Expand Up @@ -622,6 +654,7 @@ _public_ m_ctx_t *m_mod_ctx(const m_mod_t *mod) {

_public_ int m_mod_start(m_mod_t *mod) {
M_MOD_ASSERT_STATE(mod, M_MOD_IDLE | M_MOD_STOPPED);
M_MOD_CONSUME_TOKEN(mod);

int ret = start(mod, true);
M_MOD_BOUND(m_mod_start);
Expand All @@ -630,6 +663,7 @@ _public_ int m_mod_start(m_mod_t *mod) {

_public_ int m_mod_pause(m_mod_t *mod) {
M_MOD_ASSERT_STATE(mod, M_MOD_RUNNING);
M_MOD_CONSUME_TOKEN(mod);

int ret = stop(mod, false);
M_MOD_BOUND(m_mod_pause);
Expand All @@ -638,6 +672,7 @@ _public_ int m_mod_pause(m_mod_t *mod) {

_public_ int m_mod_resume(m_mod_t *mod) {
M_MOD_ASSERT_STATE(mod, M_MOD_PAUSED);
M_MOD_CONSUME_TOKEN(mod);

int ret = start(mod, false);
M_MOD_BOUND(m_mod_resume);
Expand All @@ -646,6 +681,7 @@ _public_ int m_mod_resume(m_mod_t *mod) {

_public_ int m_mod_stop(m_mod_t *mod) {
M_MOD_ASSERT_STATE(mod, M_MOD_RUNNING | M_MOD_PAUSED);
M_MOD_CONSUME_TOKEN(mod);

int ret = stop(mod, true);
M_MOD_BOUND(m_mod_stop);
Expand All @@ -655,11 +691,7 @@ _public_ int m_mod_stop(m_mod_t *mod) {
_public_ int m_mod_bind(m_mod_t *mod, m_mod_t *ref) {
M_MOD_ASSERT(mod);
M_MOD_ASSERT(ref);
M_MOD_CONSUME_TOKEN(mod);

/*
* NOTE: dependency cycle is not avoided.
* This is deliberately not too smart to avoid costly checks.
* Be careful.
*/
return m_list_insert(ref->bound_mods, m_mem_ref(mod));
}
13 changes: 13 additions & 0 deletions Lib/core/mod.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
#define M_MOD_ASSERT_STATE(mod, state) \
M_MOD_ASSERT(mod); \
M_RET_ASSERT(m_mod_is(mod, state), -EACCES)

#define M_MOD_CONSUME_TOKEN(mod) \
M_RET_ASSERT(mod->tb.tokens > 0, -EAGAIN) \
mod->tb.tokens--; \
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);

typedef struct {
uint64_t registration_time;
Expand All @@ -31,6 +36,13 @@ typedef struct {
m_queue_t *events;
} mod_batch_t;

typedef struct {
uint16_t rate;
uint64_t burst;
uint64_t tokens;
m_src_tmr_t timer;
} mod_tb_t;

/* Struct that holds data for each module */
/*
* MEM-REFS for mod:
Expand All @@ -52,6 +64,7 @@ struct _mod {
void *fs; // FS module priv data. NULL if unsupported
CONST const char *name; // module's name
mod_batch_t batch; // Events' batching informations
mod_tb_t tb; // Mod's tockenbucket
CONST void *dlhandle; // Handle for plugin (NULL if not a plugin)
m_bst_t *srcs[M_SRC_TYPE_END]; // module's event sources
m_map_t *subscriptions; // module's subscriptions (map of ev_src_t*)
Expand Down
14 changes: 7 additions & 7 deletions Lib/core/ps.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ static int send_msg(m_mod_t *mod, const m_mod_t *recipient, const char *topic,
M_PARAM_ASSERT(message);

mod->stats.sent_msgs++;
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
ps_priv_t m = { { false, mod, topic, message }, flags, NULL };
return tell_pubsub_msg(&m, recipient, mod->ctx);
}
Expand All @@ -146,7 +145,6 @@ int tell_system_pubsub_msg(const m_mod_t *recipient, m_ctx_t *c, m_mod_t *sender
if (sender) {
// A module sent a M_PS_MOD_POISONPILL message to another, or it was stopped
sender->stats.sent_msgs++;
fetch_ms(&sender->stats.last_seen, &sender->stats.action_ctr);
}
ps_priv_t m = { { true, sender, topic, NULL }, 0, NULL };
return tell_pubsub_msg(&m, recipient, c);
Expand Down Expand Up @@ -214,7 +212,8 @@ void call_pubsub_cb(m_mod_t *mod, m_queue_t *evts) {
cb(mod, evts);

mod->stats.recv_msgs += m_queue_len(evts);
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);

fetch_ms(&mod->stats.last_seen, NULL);

end:
/* Destroy events */
Expand All @@ -227,6 +226,7 @@ _public_ int m_mod_ps_subscribe(m_mod_t *mod, const char *topic, m_src_flags fla
M_MOD_ASSERT_PERM(mod, M_MOD_DENY_SUB);
M_PARAM_ASSERT(topic);
M_SRC_ASSERT_PRIO_FLAGS();
M_MOD_CONSUME_TOKEN(mod);

/* Check if it is a valid regex: compile it */
regex_t regex;
Expand Down Expand Up @@ -261,20 +261,17 @@ _public_ int m_mod_ps_subscribe(m_mod_t *mod, const char *topic, m_src_flags fla
memcpy(&ps_src->reg, &regex, sizeof(regex_t));
ps_src->topic = sub->flags & M_SRC_DUP ? mem_strdup(topic) : topic;
ret = m_map_put(mod->subscriptions, ps_src->topic, sub); // M_MAP_VAL_ALLOW_UPDATE -> this will dtor old elem before updating
if (ret == 0) {
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
}
}
return ret;
}

_public_ int m_mod_ps_unsubscribe(m_mod_t *mod, const char *topic) {
M_MOD_ASSERT_PERM(mod, M_MOD_DENY_SUB);
M_PARAM_ASSERT(topic);
M_MOD_CONSUME_TOKEN(mod);

int ret = m_map_remove(mod->subscriptions, topic);
if (ret == 0) {
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
if (m_map_len(mod->subscriptions) == 0) {
m_map_free(&mod->subscriptions);
}
Expand All @@ -287,13 +284,15 @@ _public_ int m_mod_ps_tell(m_mod_t *mod, const m_mod_t *recipient, const void *m
M_PARAM_ASSERT(recipient);
/* only same ctx modules can talk */
M_PARAM_ASSERT(mod->ctx == recipient->ctx);
M_MOD_CONSUME_TOKEN(mod);

return send_msg(mod, recipient, NULL, message, flags);
}

_public_ int m_mod_ps_publish(m_mod_t *mod, const char *topic, const void *message, m_ps_flags flags) {
M_MOD_ASSERT_PERM(mod, M_MOD_DENY_PUB);
M_RET_ASSERT(!is_system_message(topic), -EPERM);
M_MOD_CONSUME_TOKEN(mod);

return send_msg(mod, NULL, topic, message, flags);
}
Expand All @@ -304,6 +303,7 @@ _public_ int m_mod_ps_poisonpill(m_mod_t *mod, const m_mod_t *recipient) {
/* only same ctx modules can talk */
M_PARAM_ASSERT(mod->ctx == recipient->ctx);
M_PARAM_ASSERT(m_mod_is(recipient, M_MOD_RUNNING));
M_MOD_CONSUME_TOKEN(mod);

return tell_system_pubsub_msg(recipient, mod->ctx, mod, M_PS_MOD_POISONPILL);
}
3 changes: 3 additions & 0 deletions Lib/core/public/module/mod.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ int m_mod_src_deregister_thresh(m_mod_t *mod, const m_src_thresh_t *thr);
int m_mod_set_batch_size(m_mod_t *mod, size_t len);
int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ms);

/* Mod tokenbucket */
int m_mod_set_tokenbucket(m_mod_t *mod, uint16_t rate, uint64_t burst);

/* Generic event source registering functions */
#define m_mod_src_register(mod, X, flags, userptr) _Generic((X) + 0, \
int: m_mod_src_register_fd, \
Expand Down
9 changes: 3 additions & 6 deletions Lib/core/src.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ int init_src(m_mod_t *mod, m_src_types t) {
int register_src(m_mod_t *mod, m_src_types type, const void *src_data,
m_src_flags flags, const void *userptr) {
M_MOD_ASSERT(mod);
M_MOD_CONSUME_TOKEN(mod);
M_SRC_ASSERT_PRIO_FLAGS();
ev_src_t *src = m_mem_new(sizeof(ev_src_t), src_priv_dtor);
M_ALLOC_ASSERT(src);
Expand Down Expand Up @@ -225,7 +226,6 @@ int register_src(m_mod_t *mod, m_src_types type, const void *src_data,

int ret = m_bst_insert(mod->srcs[type], src);
if (ret == 0) {
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
/* If a src is registered at runtime, start receiving its events */
if (m_mod_is(mod, M_MOD_RUNNING)) {
M_MOD_CTX(mod);
Expand All @@ -244,12 +244,9 @@ int register_src(m_mod_t *mod, m_src_types type, const void *src_data,

int deregister_src(m_mod_t *mod, m_src_types type, void *src_data) {
M_MOD_ASSERT(mod);
M_MOD_CONSUME_TOKEN(mod);

int ret = m_bst_remove(mod->srcs[type], src_data);
if (ret == 0) {
fetch_ms(&mod->stats.last_seen, &mod->stats.action_ctr);
}
return ret;
return m_bst_remove(mod->srcs[type], src_data);
}

int start_task(m_ctx_t *c, ev_src_t *src) {
Expand Down

0 comments on commit 585f5ec

Please sign in to comment.