Skip to content

Commit

Permalink
modules/kvs: Refactor waitqueue msg handler use
Browse files Browse the repository at this point in the history
Use new msg_cb_handler internal API instead of prior msg handler
code.
  • Loading branch information
chu11 committed Jan 13, 2018
1 parent 22b1db4 commit 0c3dfce
Showing 1 changed file with 16 additions and 20 deletions.
36 changes: 16 additions & 20 deletions src/modules/kvs/waitqueue.c
Expand Up @@ -29,22 +29,15 @@
#include <flux/core.h>

#include "waitqueue.h"

struct handler {
flux_msg_handler_f cb;
flux_t *h;
flux_msg_handler_t *mh;
flux_msg_t *msg;
void *arg;
};
#include "msg_cb_handler.h"

#define WAIT_MAGIC 0xafad7777
struct wait_struct {
int magic;
int usecount;
wait_cb_f cb;
void *cb_arg;
struct handler hand; /* optional special case */
msg_cb_handler_t *mcb; /* optional special case */
};

#define WAITQUEUE_MAGIC 0xafad7778
Expand Down Expand Up @@ -77,13 +70,10 @@ wait_t *wait_create_msg_handler (flux_t *h, flux_msg_handler_t *mh,
{
wait_t *w = wait_create (NULL, NULL);
if (w) {
w->hand.cb = cb;
w->hand.arg = arg;
w->hand.h = h;
w->hand.mh = mh;
if (msg && !(w->hand.msg = flux_msg_copy (msg, true))) {
if (!(w->mcb = msg_cb_handler_create (h, mh, msg, arg, cb))) {
int saved_errno = errno;
wait_destroy (w);
errno = ENOMEM;
errno = saved_errno;
return NULL;
}
}
Expand All @@ -95,7 +85,7 @@ void wait_destroy (wait_t *w)
if (w) {
assert (w->magic == WAIT_MAGIC);
assert (w->usecount == 0);
flux_msg_destroy (w->hand.msg);
msg_cb_handler_destroy (w->mcb);
w->magic = ~WAIT_MAGIC;
free (w);
}
Expand Down Expand Up @@ -155,8 +145,8 @@ static void wait_runone (wait_t *w)
if (--w->usecount == 0) {
if (w->cb)
w->cb (w->cb_arg);
else if (w->hand.cb)
w->hand.cb (w->hand.h, w->hand.mh, w->hand.msg, w->hand.arg);
else if (w->mcb)
msg_cb_handler_call (w->mcb);
wait_destroy (w);
}
}
Expand Down Expand Up @@ -202,7 +192,11 @@ int wait_destroy_msg (waitqueue_t *q, wait_test_msg_f cb, void *arg)

w = zlist_first (q->q);
while (w) {
if (w->hand.msg && cb != NULL && cb (w->hand.msg, arg)) {
const flux_msg_t *msgcpy = NULL;

if (w->mcb)
msgcpy = msg_cb_handler_get_msgcopy (w->mcb);
if (msgcpy && cb != NULL && cb (msgcpy, arg)) {
if (!tmp && !(tmp = zlist_new ())) {
saved_errno = ENOMEM;
goto error;
Expand All @@ -211,7 +205,9 @@ int wait_destroy_msg (waitqueue_t *q, wait_test_msg_f cb, void *arg)
saved_errno = ENOMEM;
goto error;
}
w->hand.cb = NULL; // prevent wait_runone from restarting handler
/* prevent wait_runone from restarting handler by clearing
* callback function */
msg_cb_handler_set_cb (w->mcb, NULL);
count++;
}
w = zlist_next (q->q);
Expand Down

0 comments on commit 0c3dfce

Please sign in to comment.