Skip to content

Commit

Permalink
RB: make the "sem" abstraction into a notifier
Browse files Browse the repository at this point in the history
Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
  • Loading branch information
asalkeld committed Feb 18, 2013
1 parent 59243fb commit 6ba0547
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 61 deletions.
96 changes: 67 additions & 29 deletions lib/ringbuffer.c
Expand Up @@ -114,6 +114,14 @@ static void _rb_chunk_reclaim(struct qb_ringbuffer_s * rb);
qb_ringbuffer_t *
qb_rb_open(const char *name, size_t size, uint32_t flags,
size_t shared_user_data_size)
{
return qb_rb_open_2(name, size, flags, shared_user_data_size, NULL);
}

qb_ringbuffer_t *
qb_rb_open_2(const char *name, size_t size, uint32_t flags,
size_t shared_user_data_size,
struct qb_rb_notifier *notifiers)
{
struct qb_ringbuffer_s *rb;
size_t real_size;
Expand Down Expand Up @@ -179,9 +187,17 @@ qb_rb_open(const char *name, size_t size, uint32_t flags,
rb->shared_hdr->read_pt = 0;
(void)strlcpy(rb->shared_hdr->hdr_path, path, PATH_MAX);
}
error = qb_rb_sem_create(rb, flags);
if (notifiers && notifiers->post_fn) {
error = 0;
memcpy(&rb->notifier,
notifiers,
sizeof(struct qb_rb_notifier));
} else {
error = qb_rb_sem_create(rb, flags);
}
if (error < 0) {
qb_util_perror(LOG_ERR, "couldn't get a semaphore");
errno = -error;
qb_util_perror(LOG_ERR, "couldn't create a semaphore");
goto cleanup_hdr;
}

Expand Down Expand Up @@ -241,8 +257,8 @@ qb_rb_open(const char *name, size_t size, uint32_t flags,
}
if (rb && (flags & QB_RB_FLAG_CREATE)) {
unlink(rb->shared_hdr->hdr_path);
if (rb->sem_destroy_fn) {
(void)rb->sem_destroy_fn(rb);
if (rb->notifier.destroy_fn) {
(void)rb->notifier.destroy_fn(rb->notifier.instance);
}
}
if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) {
Expand All @@ -253,17 +269,19 @@ qb_rb_open(const char *name, size_t size, uint32_t flags,
return NULL;
}


void
qb_rb_close(struct qb_ringbuffer_s * rb)
{
if (rb == NULL) {
return;
}
qb_enter();

(void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count);
if (rb->flags & QB_RB_FLAG_CREATE) {
if (rb->sem_destroy_fn) {
(void)rb->sem_destroy_fn(rb);
if (rb->notifier.destroy_fn) {
(void)rb->notifier.destroy_fn(rb->notifier.instance);
}
unlink(rb->shared_hdr->data_path);
unlink(rb->shared_hdr->hdr_path);
Expand All @@ -285,9 +303,10 @@ qb_rb_force_close(struct qb_ringbuffer_s * rb)
if (rb == NULL) {
return;
}
qb_enter();

if (rb->sem_destroy_fn) {
(void)rb->sem_destroy_fn(rb);
if (rb->notifier.destroy_fn) {
(void)rb->notifier.destroy_fn(rb->notifier.instance);
}
unlink(rb->shared_hdr->data_path);
unlink(rb->shared_hdr->hdr_path);
Expand Down Expand Up @@ -336,6 +355,10 @@ qb_rb_space_free(struct qb_ringbuffer_s * rb)
if (rb == NULL) {
return -EINVAL;
}
if (rb->notifier.space_used_fn) {
return (rb->shared_hdr->word_size * sizeof(uint32_t)) -
rb->notifier.space_used_fn(rb->notifier.instance);
}
write_size = rb->shared_hdr->write_pt;
read_size = rb->shared_hdr->read_pt;

Expand All @@ -345,7 +368,7 @@ qb_rb_space_free(struct qb_ringbuffer_s * rb)
} else if (write_size < read_size) {
space_free = (read_size - write_size) - 1;
} else {
if (rb->sem_getvalue_fn && rb->sem_getvalue_fn(rb) > 0) {
if (rb->notifier.q_len_fn && rb->notifier.q_len_fn(rb->notifier.instance) > 0) {
space_free = 0;
} else {
space_free = rb->shared_hdr->word_size;
Expand All @@ -366,6 +389,9 @@ qb_rb_space_used(struct qb_ringbuffer_s * rb)
if (rb == NULL) {
return -EINVAL;
}
if (rb->notifier.space_used_fn) {
return rb->notifier.space_used_fn(rb->notifier.instance);
}
write_size = rb->shared_hdr->write_pt;
read_size = rb->shared_hdr->read_pt;

Expand All @@ -387,11 +413,10 @@ qb_rb_chunks_used(struct qb_ringbuffer_s *rb)
if (rb == NULL) {
return -EINVAL;
}
if (rb->sem_getvalue_fn) {
return rb->sem_getvalue_fn(rb);
} else {
return -ENOTSUP;
if (rb->notifier.q_len_fn) {
return rb->notifier.q_len_fn(rb->notifier.instance);
}
return -ENOTSUP;
}

void *
Expand Down Expand Up @@ -474,7 +499,8 @@ qb_rb_chunk_commit(struct qb_ringbuffer_s * rb, size_t len)
QB_RB_CHUNK_MAGIC_SET(rb, old_write_pt, QB_RB_CHUNK_MAGIC);

DEBUG_PRINTF("commit [%zd] read: %u, write: %u -> %u (%u)\n",
(rb->sem_getvalue_fn ? rb->sem_getvalue_fn(rb) : 0),
(rb->notifier.q_len_fn ?
rb->notifier.q_len_fn(rb->notifier.instance) : 0),
rb->shared_hdr->read_pt,
old_write_pt,
rb->shared_hdr->write_pt,
Expand All @@ -483,11 +509,10 @@ qb_rb_chunk_commit(struct qb_ringbuffer_s * rb, size_t len)
/*
* post the notification to the reader
*/
if (rb->sem_post_fn) {
return rb->sem_post_fn(rb);
} else {
return 0;
if (rb->notifier.post_fn) {
return rb->notifier.post_fn(rb->notifier.instance, len);
}
return 0;
}

ssize_t
Expand Down Expand Up @@ -519,6 +544,7 @@ _rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
{
uint32_t old_read_pt;
uint32_t new_read_pt;
uint32_t old_chunk_size;
uint32_t chunk_magic;

old_read_pt = rb->shared_hdr->read_pt;
Expand All @@ -527,6 +553,7 @@ _rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
return;
}

old_chunk_size = QB_RB_CHUNK_SIZE_GET(rb, old_read_pt);
new_read_pt = qb_rb_chunk_step(rb, old_read_pt);

/*
Expand All @@ -543,8 +570,18 @@ _rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
*/
rb->shared_hdr->read_pt = new_read_pt;

if (rb->notifier.reclaim_fn) {
int rc = rb->notifier.reclaim_fn(rb->notifier.instance,
old_chunk_size);
if (rc < 0) {
errno = -rc;
qb_util_perror(LOG_WARNING, "reclaim_fn");
}
}

DEBUG_PRINTF("reclaim [%zd]: read: %u -> %u, write: %u\n",
(rb->sem_getvalue_fn ? rb->sem_getvalue_fn(rb) : 0),
(rb->notifier.q_len_fn ?
rb->notifier.q_len_fn(rb->notifier.instance) : 0),
old_read_pt,
rb->shared_hdr->read_pt,
rb->shared_hdr->write_pt);
Expand All @@ -570,8 +607,8 @@ qb_rb_chunk_peek(struct qb_ringbuffer_s * rb, void **data_out, int32_t timeout)
if (rb == NULL) {
return -EINVAL;
}
if (rb->sem_timedwait_fn) {
res = rb->sem_timedwait_fn(rb, timeout);
if (rb->notifier.timedwait_fn) {
res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout);
}
if (res < 0 && res != -EIDRM) {
if (res == -ETIMEDOUT) {
Expand All @@ -585,8 +622,8 @@ qb_rb_chunk_peek(struct qb_ringbuffer_s * rb, void **data_out, int32_t timeout)
read_pt = rb->shared_hdr->read_pt;
chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);
if (chunk_magic != QB_RB_CHUNK_MAGIC) {
if (rb->sem_post_fn) {
(void)rb->sem_post_fn(rb);
if (rb->notifier.post_fn) {
(void)rb->notifier.post_fn(rb->notifier.instance, res);
}
return 0;
}
Expand All @@ -607,11 +644,12 @@ qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len,
if (rb == NULL) {
return -EINVAL;
}
if (rb->sem_timedwait_fn) {
res = rb->sem_timedwait_fn(rb, timeout);
if (rb->notifier.timedwait_fn) {
res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout);
}
if (res < 0 && res != -EIDRM) {
if (res != -ETIMEDOUT) {
errno = -res;
qb_util_perror(LOG_ERR, "sem_timedwait");
}
return res;
Expand All @@ -621,10 +659,10 @@ qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len,
chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);

if (chunk_magic != QB_RB_CHUNK_MAGIC) {
if (rb->sem_timedwait_fn == NULL) {
if (rb->notifier.timedwait_fn == NULL) {
return -ETIMEDOUT;
} else {
(void)rb->sem_post_fn(rb);
(void)rb->notifier.post_fn(rb->notifier.instance, res);
#ifdef EBADMSG
return -EBADMSG;
#else
Expand All @@ -638,10 +676,10 @@ qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len,
qb_util_log(LOG_ERR,
"trying to recv chunk of size %d but %d available",
len, chunk_size);
(void)rb->sem_post_fn(rb);
(void)rb->notifier.post_fn(rb->notifier.instance, chunk_size);
return -ENOBUFS;
}
;

memcpy(data_out,
QB_RB_CHUNK_DATA_GET(rb, read_pt),
chunk_size);
Expand Down

0 comments on commit 6ba0547

Please sign in to comment.