Skip to content

Commit

Permalink
Merge 67e5931 into a4d438a
Browse files Browse the repository at this point in the history
  • Loading branch information
chu11 committed Jan 10, 2018
2 parents a4d438a + 67e5931 commit 69ef139
Showing 1 changed file with 187 additions and 49 deletions.
236 changes: 187 additions & 49 deletions src/modules/kvs/kvs.c
Expand Up @@ -97,6 +97,14 @@ struct kvs_cb_data {
int errnum;
};

struct getroot_handler {
flux_t *h;
flux_msg_handler_t *mh;
flux_msg_t *msg;
void *arg;
flux_msg_handler_f cb;
};

static void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
static void commit_check_cb (flux_reactor_t *r, flux_watcher_t *w,
Expand Down Expand Up @@ -397,49 +405,161 @@ static void setroot (kvs_ctx_t *ctx, struct kvsroot *root,
}
}

static int getroot_rpc (kvs_ctx_t *ctx, const char *namespace, int *rootseq,
blobref_t rootref, int *flagsp)
static void getroot_handler_destroy (void *arg)
{
flux_future_t *f;
struct getroot_handler *gh = arg;
if (gh) {
if (gh->msg)
flux_msg_destroy (gh->msg);
free (gh);
}
}

static struct getroot_handler *getroot_handler_create (flux_msg_handler_f cb,
flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct getroot_handler *gh = NULL;
int saved_errno;

if (!(gh = calloc (1, sizeof (*gh)))) {
errno = ENOMEM;
goto error;
}


gh->h = h;
gh->mh = mh;
gh->arg = arg;
gh->cb = cb;

if (!(gh->msg = flux_msg_copy (msg, true)))
goto error;

return gh;
error:
saved_errno = errno;
getroot_handler_destroy (gh);
errno = saved_errno;
return NULL;
}

static void getroot_rpc_response (flux_future_t *f, void *arg)
{
kvs_ctx_t *ctx = arg;
struct getroot_handler *gh;
const char *namespace;
int rootseq, flags;
const char *ref;
int flags;
int saved_errno, rc = -1;
struct kvsroot *root;
int save_errno;

/* XXX: future make asynchronous */
if (!(f = flux_rpc_pack (ctx->h, "kvs.getroot", FLUX_NODEID_UPSTREAM, 0,
"{ s:s }",
"namespace", namespace))) {
saved_errno = errno;
goto done;
gh = flux_future_aux_get (f, "handler");
assert (gh);

if (flux_request_unpack (gh->msg, NULL, "{ s:s }",
"namespace", &namespace) < 0) {
flux_log_error (ctx->h, "%s: flux_request_unpack", __FUNCTION__);
goto error;
}

if (flux_rpc_get_unpack (f, "{ s:i s:s s:i }",
"rootseq", rootseq,
"rootseq", &rootseq,
"rootref", &ref,
"flags", &flags) < 0) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: flux_rpc_get_unpack", __FUNCTION__);
goto done;
goto error;
}
if (strlen (ref) > sizeof (blobref_t) - 1) {
saved_errno = EPROTO;
goto done;

/* possible root initialized by another message before we got this
* response. Not relevant if namespace in process of being removed. */
if (!(root = lookup_root (ctx, namespace))) {
if (!(root = create_root (ctx, namespace, flags))) {
flux_log_error (ctx->h, "%s: create_root", __FUNCTION__);
goto error;
}

if (event_subscribe (ctx, namespace) < 0) {
save_errno = errno;
remove_root (ctx, namespace);
errno = save_errno;
flux_log_error (ctx->h, "%s: event_subscribe", __FUNCTION__);
goto error;
}
}
strcpy (rootref, ref);
if (flagsp)
(*flagsp) = flags;
rc = 0;
done:

/* if root now in process of being removed, error will be handled via
* the original callback
*/
if (!root->remove)
setroot (ctx, root, ref, rootseq);

gh->cb (gh->h, gh->mh, gh->msg, gh->arg);

flux_future_destroy (f);
if (rc < 0)
errno = saved_errno;
return rc;
return;

error:
if (flux_respond (ctx->h, gh->msg, errno, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__);
flux_future_destroy (f);
}

static int getroot_rpc (kvs_ctx_t *ctx,
const char *namespace,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
flux_msg_handler_f cb,
int *rootseq,
blobref_t rootref,
int *flagsp)
{
flux_future_t *f = NULL;
struct getroot_handler *gh = NULL;
int saved_errno;

if (!(f = flux_rpc_pack (ctx->h, "kvs.getroot", FLUX_NODEID_UPSTREAM, 0,
"{ s:s }",
"namespace", namespace)))
goto error;

if (!(gh = getroot_handler_create (cb, ctx->h, mh, msg, ctx))) {
flux_log_error (ctx->h, "%s: getroot_handler_create", __FUNCTION__);
goto error;
}

if (flux_future_aux_set (f, "handler", gh,
(flux_free_f)getroot_handler_destroy) < 0) {
flux_log_error (ctx->h, "%s: flux_future_aux_set", __FUNCTION__);
goto error;
}
gh = NULL; /* owned by future now */

if (flux_future_then (f, -1., getroot_rpc_response, ctx) < 0)
goto error;

return 0;
error:
saved_errno = errno;
getroot_handler_destroy (gh);
flux_future_destroy (f);
errno = saved_errno;
return -1;
}

static struct kvsroot *getroot (kvs_ctx_t *ctx, const char *namespace)
static struct kvsroot *getroot (kvs_ctx_t *ctx, const char *namespace,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
flux_msg_handler_f cb,
bool *stall)
{
struct kvsroot *root;
blobref_t rootref;
int save_errno, rootseq, flags;
int rootseq, flags;

(*stall) = false;

if (!(root = lookup_root_safe (ctx, namespace))) {
if (ctx->rank == 0) {
Expand All @@ -449,25 +569,12 @@ static struct kvsroot *getroot (kvs_ctx_t *ctx, const char *namespace)
return NULL;
}
else {
if (getroot_rpc (ctx, namespace, &rootseq, rootref, &flags) < 0) {
flux_log_error (ctx->h, "getroot_rpc");
return NULL;
}

if (!(root = create_root (ctx, namespace, flags))) {
flux_log_error (ctx->h, "create_root");
return NULL;
}

setroot (ctx, root, rootref, rootseq);

if (event_subscribe (ctx, namespace) < 0) {
save_errno = errno;
remove_root (ctx, namespace);
errno = save_errno;
flux_log_error (ctx->h, "event_subscribe");
if (getroot_rpc (ctx, namespace, mh, msg, cb,
&rootseq, rootref, &flags) < 0) {
flux_log_error (ctx->h, "getroot_rpc_async");
return NULL;
}
(*stall) = true;
}
}
return root;
Expand Down Expand Up @@ -1175,6 +1282,7 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *mh,
lookup_t *lh = NULL;
const char *root_ref = NULL;
wait_t *wait = NULL;
bool stall = false;
int rc = -1;
int ret;

Expand All @@ -1192,8 +1300,12 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto done;
}

if (!(root = getroot (ctx, namespace)))
if (!(root = getroot (ctx, namespace, mh, msg, get_request_cb,
&stall))) {
if (stall)
goto stall;
goto done;
}

/* rootdir is optional */
(void)flux_request_unpack (msg, NULL, "{ s:o }",
Expand Down Expand Up @@ -1332,6 +1444,7 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *mh,
wait_t *watcher = NULL;
bool isreplay = false;
bool out = false;
bool stall = false;
int rc = -1;
int saved_errno, ret;

Expand All @@ -1348,8 +1461,12 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto done;
}

if (!(root = getroot (ctx, namespace)))
if (!(root = getroot (ctx, namespace, mh, msg, watch_request_cb,
&stall))) {
if (stall)
goto stall;
goto done;
}

if (!(lh = lookup_create (ctx->cache,
ctx->epoch,
Expand Down Expand Up @@ -1706,6 +1823,7 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
const char *namespace;
const char *name;
int saved_errno, nprocs, flags;
bool stall = false;
json_t *ops = NULL;
fence_t *f;

Expand All @@ -1719,8 +1837,12 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (!(root = getroot (ctx, namespace)))
if (!(root = getroot (ctx, namespace, mh, msg, fence_request_cb,
&stall))) {
if (stall)
goto stall;
goto error;
}

if (!(f = commit_mgr_lookup_fence (root->cm, name))) {
if (!(f = fence_create (name, nprocs, flags))) {
Expand Down Expand Up @@ -1772,6 +1894,8 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
error:
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
stall:
return;
}


Expand All @@ -1785,6 +1909,7 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *mh,
struct kvsroot *root;
int saved_errno, rootseq;
wait_t *wait = NULL;
bool stall = false;

if (flux_request_unpack (msg, NULL, "{ s:i s:s }",
"rootseq", &rootseq,
Expand All @@ -1793,8 +1918,12 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (!(root = getroot (ctx, namespace)))
if (!(root = getroot (ctx, namespace, mh, msg, sync_request_cb,
&stall))) {
if (stall)
goto stall;
goto error;
}

if (root->seq < rootseq) {
if (!(wait = wait_create_msg_handler (h, mh, msg,
Expand All @@ -1820,6 +1949,8 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *mh,
error:
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
stall:
return;
}

static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh,
Expand Down Expand Up @@ -1847,8 +1978,13 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh,
/* If root is not initialized, we have to intialize ourselves
* first.
*/
if (!(root = getroot (ctx, namespace)))
bool stall = false;
if (!(root = getroot (ctx, namespace, mh, msg, getroot_request_cb,
&stall))) {
if (stall)
goto done;
goto error;
}
}

if (flux_respond_pack (h, msg, "{ s:i s:s s:i }",
Expand All @@ -1858,6 +1994,8 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto error;
}

done:
return;

error:
Expand Down

0 comments on commit 69ef139

Please sign in to comment.