Skip to content

Commit

Permalink
Added a queue implementation. Properly use memhook instead of plain f…
Browse files Browse the repository at this point in the history
…ree in stack.
  • Loading branch information
FedeDP committed Sep 4, 2019
1 parent 86a9466 commit 2a61147
Show file tree
Hide file tree
Showing 13 changed files with 486 additions and 9 deletions.
46 changes: 46 additions & 0 deletions Lib/public/module/queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once

#include "module_cmn.h"

/** Queue interface **/

typedef enum {
QUEUE_WRONG_PARAM = -4,
QUEUE_MISSING,
QUEUE_ERR,
QUEUE_OMEM,
QUEUE_OK
} queue_ret_code;

/* Callback for queue_iterate */
typedef queue_ret_code (*queue_cb)(void *, void *);

/* Fn for queue_set_dtor */
typedef void (*queue_dtor)(void *);

/* Incomplete struct declaration for queue */
typedef struct _queue queue_t;

/* Incomplete struct declaration for queue iterator */
typedef struct _queue_itr queue_itr_t;

#ifdef __cplusplus
extern "C"{
#endif

_public_ queue_t *queue_new(const queue_dtor fn);
_public_ queue_itr_t *queue_itr_new(const queue_t *q);
_public_ queue_itr_t *queue_itr_next(queue_itr_t *itr);
_public_ void *queue_itr_get_data(const queue_itr_t *itr);
_public_ queue_ret_code queue_itr_set_data(const queue_itr_t *itr, void *value);
_public_ queue_ret_code queue_iterate(const queue_t *q, const queue_cb fn, void *userptr);
_public_ queue_ret_code queue_enqueue(queue_t *q, void *data);
_public_ void *queue_dequeue(queue_t *q);
_public_ void *queue_peek(const queue_t *q);
_public_ queue_ret_code queue_clear(queue_t *q);
_public_ queue_ret_code queue_free(queue_t *q);
_public_ ssize_t queue_length(const queue_t *q);

#ifdef __cplusplus
}
#endif
2 changes: 1 addition & 1 deletion Lib/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,4 @@ module_ret_code module_poisonpill(const self_t *self, const self_t *recipient) {
MOD_PARAM_ASSERT(module_is(recipient, RUNNING));

return tell_system_pubsub_msg(recipient->mod, c, MODULE_POISONPILL, &mod->self, NULL);
}
}
154 changes: 154 additions & 0 deletions Lib/queue.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include "poll_priv.h"
#include "queue.h"

#define QUEUE_PARAM_ASSERT(cond) MOD_RET_ASSERT(cond, QUEUE_WRONG_PARAM);

typedef struct _elem {
void *userptr;
struct _elem *prev;
} queue_elem;

struct _queue {
size_t len;
queue_dtor dtor;
queue_elem *head;
queue_elem *tail;
};

struct _queue_itr {
queue_elem *elem;
};

/** Public API **/

queue_t *queue_new(const queue_dtor fn) {
queue_t *q = memhook._calloc(1, sizeof(queue_t));
if (q) {
q->dtor = fn;
}
return q;
}

queue_itr_t *queue_itr_new(const queue_t *q) {
MOD_RET_ASSERT(queue_length(q) > 0, NULL);

queue_itr_t *itr = memhook._malloc(sizeof(queue_itr_t));
if (itr) {
itr->elem = q->head;
}
return itr;
}

queue_itr_t *queue_itr_next(queue_itr_t *itr) {
MOD_RET_ASSERT(itr, NULL);

itr->elem = itr->elem->prev;
if (!itr->elem) {
memhook._free(itr);
itr = NULL;
}
return itr;
}

void *queue_itr_get_data(const queue_itr_t *itr) {
MOD_RET_ASSERT(itr, NULL);

return itr->elem->userptr;
}

queue_ret_code queue_itr_set_data(const queue_itr_t *itr, void *value) {
QUEUE_PARAM_ASSERT(itr);
QUEUE_PARAM_ASSERT(value);

itr->elem->userptr = value;
return QUEUE_OK;
}

queue_ret_code queue_iterate(const queue_t *q, const queue_cb fn, void *userptr) {
QUEUE_PARAM_ASSERT(fn);
MOD_RET_ASSERT(queue_length(q) > 0, QUEUE_MISSING);

queue_elem *elem = q->head;
while (elem) {
queue_ret_code rc = fn(userptr, elem->userptr);
if (rc < QUEUE_OK) {
/* Stop right now with error */
return rc;
}
if (rc > QUEUE_OK) {
/* Stop right now with MAP_OK */
return QUEUE_OK;
}
elem = elem->prev;
}
return QUEUE_OK;
}

queue_ret_code queue_enqueue(queue_t *q, void *data) {
QUEUE_PARAM_ASSERT(q);
QUEUE_PARAM_ASSERT(data);

queue_elem *elem = memhook._calloc(1, sizeof(queue_elem));
if (elem) {
q->len++;
elem->userptr = data;
if (q->tail) {
q->tail->prev = elem;
}
q->tail = elem;
if (!q->head) {
q->head = q->tail;
}
return QUEUE_OK;
}
return QUEUE_OMEM;
}

void *queue_dequeue(queue_t *q) {
MOD_RET_ASSERT(queue_length(q) > 0, NULL);

queue_elem *elem = q->head;
if (q->tail == q->head) {
q->tail = NULL;
}
q->head = q->head->prev;

void *data = elem->userptr;
memhook._free(elem);
q->len--;
return data;
}

void *queue_peek(const queue_t *q) {
MOD_RET_ASSERT(queue_length(q) > 0, NULL);

return q->head->userptr;
}

queue_ret_code queue_clear(queue_t *q) {
QUEUE_PARAM_ASSERT(q);

queue_elem *elem = NULL;
while ((elem = q->head) && q->len > 0) {
void *data = queue_dequeue(q);
if (q->dtor) {
q->dtor(data);
}
}
return QUEUE_OK;
}

queue_ret_code queue_free(queue_t *q) {
queue_ret_code ret = queue_clear(q);
if (ret == QUEUE_OK) {
memhook._free(q);
}
return ret;
}

ssize_t queue_length(const queue_t *q) {
QUEUE_PARAM_ASSERT(q);

return q->len;
}

2 changes: 1 addition & 1 deletion Lib/stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ stack_ret_code stack_clear(stack_t *s) {
stack_ret_code stack_free(stack_t *s) {
stack_ret_code ret = stack_clear(s);
if (ret == STACK_OK) {
free(s);
memhook._free(s);
}
return ret;
}
Expand Down
5 changes: 5 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ conversely to module_stop that should stop module right away freeing all its enq
- [x] Set autofree in stack_new()
- [x] Drop stack_set_dtor

### Queue
- [x] Add queue API
- [x] Add doc

### Generic
- [x] Add some diagnostic API, eg: module_dump() (to dump each module's state)
- [x] Add a module_load/unload function, to load a module from a compiled object at runtime
Expand Down Expand Up @@ -111,3 +115,4 @@ conversely to module_stop that should stop module right away freeing all its enq

## Ideas
- [ ] Akka-persistence like message store? (ie: store all messages and replay them)
- [ ] module_msg_ref/unref to forcefully keep an autofree message alive?
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Welcome to libmodule's documentation!
src/modules
src/map
src/stack
src/queue

Indices and tables
==================
Expand Down
2 changes: 1 addition & 1 deletion docs/src/map.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Where not specified, these functions return a map_ret_code.

.. c:function:: map_itr_next(itr)
Get next iterator. If next iterator is past last element, iterator will be automatically freed for you.
Get next iterator. If next iterator is past last element, iterator will be automatically freed.

:param itr: pointer to map_itr_t
:type itr: :c:type:`map_itr_t *`
Expand Down
2 changes: 1 addition & 1 deletion docs/src/module.rst
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ Again, where not specified, these functions return a :ref:`module_ret_code <modu
:type autofree: :c:type:`const bool`
:type global: :c:type:`const bool`

.. c:macro:: module_poisonpill(self, recipient)
.. c:function:: module_poisonpill(self, recipient)
Enqueue a POISONPILL message to recipient. This allows to stop another module after it flushes its pubsub messages.

Expand Down

0 comments on commit 2a61147

Please sign in to comment.