Skip to content

Commit

Permalink
Merge fa3d15d into b877c17
Browse files Browse the repository at this point in the history
  • Loading branch information
chu11 committed Feb 17, 2017
2 parents b877c17 + fa3d15d commit 3dbfb83
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 21 deletions.
57 changes: 46 additions & 11 deletions src/modules/kvs/kvs.c
Expand Up @@ -95,6 +95,7 @@ typedef struct {
zlist_t *requests;
json_object *names;
int nprocs;
bool mergeable;
int count;
int errnum;
ctx_t *ctx;
Expand Down Expand Up @@ -598,22 +599,44 @@ void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w,
flux_watcher_start (ctx->idle_w);
}

/* Merge ready commits, where merging consists of popping the "donor"
* commit off the ready list, and appending its ops to the top commit.
* The top commit can be appended to if it hasn't started, or is still
* building the rootcpy, e.g. stalled walking the namespace.
/* Merge ready commits that are mergeable, where merging consists of
* popping the "donor" commit off the ready list, and appending its
* ops to the top commit. The top commit can be appended to if it
* hasn't started, or is still building the rootcpy, e.g. stalled
* walking the namespace.
*
* Break when an unmergeable commit is discovered. We do not wish to
* merge non-adjacent fences, as it can create undesireable out of
* order scenarios. e.g.
*
* commit #1 is mergeable: set A=1
* commit #2 is non-mergeable: set A=2
* commit #3 is mergeable: set A=3
*
* If we were to merge commit #1 and commit #3, A=2 would be set after
* A=3.
*/
void commit_merge_all (ctx_t *ctx)
{
fence_t *f = zlist_first (ctx->ready);

if (f && f->errnum == 0 && !f->rootcpy_stored) {
if (f
&& f->mergeable
&& f->errnum == 0
&& !f->rootcpy_stored) {
fence_t *nf;
nf = zlist_pop (ctx->ready);
assert (nf == f);
while ((nf = zlist_pop (ctx->ready))) {
while ((nf = zlist_first (ctx->ready))) {
int i, len;

/* We've merged as many as we currently can */
if (!nf->mergeable)
break;

/* Fence is mergeable, pop off list */
nf = zlist_pop (ctx->ready);

if (Jget_ar_len (nf->names, &len)) {
for (i = 0; i < len; i++) {
const char *name;
Expand Down Expand Up @@ -1096,7 +1119,8 @@ static void fence_destroy (fence_t *f)
}
}

static fence_t *fence_create (ctx_t *ctx, const char *name, int nprocs)
static fence_t *fence_create (ctx_t *ctx, const char *name, int nprocs,
bool mergeable)
{
fence_t *f;

Expand All @@ -1106,6 +1130,7 @@ static fence_t *fence_create (ctx_t *ctx, const char *name, int nprocs)
goto error;
}
f->nprocs = nprocs;
f->mergeable = mergeable;
f->ctx = ctx;
f->names = Jnew_ar ();
Jadd_ar_str (f->names, name);
Expand Down Expand Up @@ -1210,6 +1235,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w,
ctx_t *ctx = arg;
const char *json_str, *name;
int nprocs;
bool mergeable;
json_object *in = NULL;
json_object *ops = NULL;
fence_t *f;
Expand All @@ -1219,7 +1245,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w,
goto done;
}
if (!(in = Jfromstr (json_str))
|| kp_tfence_dec (in, &name, &nprocs, &ops) < 0) {
|| kp_tfence_dec (in, &name, &nprocs, &mergeable, &ops) < 0) {
errno = EPROTO;
flux_log_error (h, "%s payload decode", __FUNCTION__);
goto done;
Expand All @@ -1228,7 +1254,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w,
* occurs after we know the fence name
*/
if (!(f = fence_lookup (ctx, name))) {
if (!(f = fence_create (ctx, name, nprocs))) {
if (!(f = fence_create (ctx, name, nprocs, mergeable))) {
flux_log_error (h, "%s fence_create %s", __FUNCTION__, name);
goto done;
}
Expand All @@ -1238,6 +1264,10 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w,
goto done;
}
}
else {
if (!mergeable)
f->mergeable = false;
}
if (fence_append_ops (f, ops) < 0) {
flux_log_error (h, "%s fence_append_ops %s", __FUNCTION__, name);
goto done;
Expand All @@ -1262,6 +1292,7 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *w,
ctx_t *ctx = arg;
const char *json_str, *name;
int nprocs;
bool mergeable;
json_object *in = NULL;
json_object *ops = NULL;
fence_t *f;
Expand All @@ -1272,18 +1303,22 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *w,
errno = EPROTO;
goto error;
}
if (kp_tfence_dec (in, &name, &nprocs, &ops) < 0) {
if (kp_tfence_dec (in, &name, &nprocs, &mergeable, &ops) < 0) {
errno = EPROTO;
goto error;
}
if (!(f = fence_lookup (ctx, name))) {
if (!(f = fence_create (ctx, name, nprocs)))
if (!(f = fence_create (ctx, name, nprocs, mergeable)))
goto error;
if (fence_add (ctx, f) < 0) {
fence_destroy (f);
goto error;
}
}
else {
if (!mergeable)
f->mergeable = false;
}
if (fence_append_request (f, msg) < 0)
goto error;
if (ctx->rank == 0) {
Expand Down
11 changes: 11 additions & 0 deletions src/modules/kvs/kvs.h
Expand Up @@ -5,6 +5,12 @@
#include <stdint.h>
#include <flux/core.h>

/* Flags for commit and fence operations
*/
enum {
KVS_NO_MERGE = 1, /* disallow commits to be mergeable with others */
};

typedef struct kvsdir_struct kvsdir_t;

typedef int (*kvs_set_f)(const char *key, const char *json_str, void *arg,
Expand Down Expand Up @@ -174,12 +180,14 @@ int kvs_mkdir (flux_t *h, const char *key);
* Returns -1 on error (errno set), 0 on success.
*/
int kvs_commit (flux_t *h);
int kvs_commit_flags (flux_t *h, int flags);

/* kvs_commit_begin() sends the commit request and returns immediately.
* kvs_commit_finish() blocks until the response is received, then returns.
* Use flux_rpc_then() to arrange for the commit to complete asynchronously.
*/
flux_rpc_t *kvs_commit_begin (flux_t *h);
flux_rpc_t *kvs_commit_begin_flags (flux_t *h, int flags);
int kvs_commit_finish (flux_rpc_t *rpc);

/* kvs_fence() is a collective commit operation. nprocs tasks make the
Expand All @@ -193,12 +201,15 @@ int kvs_commit_finish (flux_rpc_t *rpc);
* Returns -1 on error (errno set), 0 on success.
*/
int kvs_fence (flux_t *h, const char *name, int nprocs);
int kvs_fence_flags (flux_t *h, const char *name, int nprocs, int flags);

/* kvs_fence_begin() sends the fence request and returns immediately.
* kvs_fence_finish() blocks until the response is received, then returns.
* Use flux_rpc_then() to arrange for the fence to complete asynchronously.
*/
flux_rpc_t *kvs_fence_begin (flux_t *h, const char *name, int nprocs);
flux_rpc_t *kvs_fence_begin_flags (flux_t *h, const char *name, int nprocs,
int flags);
int kvs_fence_finish (flux_rpc_t *rpc);

/* Operations (put, unlink, symlink, mkdir) may be associated with a named
Expand Down
83 changes: 81 additions & 2 deletions src/modules/kvs/libkvs.c
Expand Up @@ -1245,6 +1245,25 @@ flux_rpc_t *kvs_commit_begin (flux_t *h)
return rpc;
}

flux_rpc_t *kvs_commit_begin_flags (flux_t *h, int flags)
{
zuuid_t *uuid = NULL;
flux_rpc_t *rpc = NULL;
int saved_errno;

if (!(uuid = zuuid_new ())) {
errno = ENOMEM;
goto done;
}
if (!(rpc = kvs_fence_begin_flags (h, zuuid_str (uuid), 1, flags)))
goto done;
done:
saved_errno = errno;
zuuid_destroy (&uuid);
errno = saved_errno;
return rpc;
}

int kvs_commit_finish (flux_rpc_t *rpc)
{
return flux_rpc_get (rpc, NULL);
Expand All @@ -1265,6 +1284,21 @@ int kvs_commit (flux_t *h)
return rc;
}

int kvs_commit_flags (flux_t *h, int flags)
{
flux_rpc_t *rpc = NULL;
int rc = -1;

if (!(rpc = kvs_commit_begin_flags (h, flags)))
goto done;
if (kvs_commit_finish (rpc) < 0)
goto done;
rc = 0;
done:
flux_rpc_destroy (rpc);
return rc;
}

flux_rpc_t *kvs_fence_begin (flux_t *h, const char *name, int nprocs)
{
kvsctx_t *ctx = getctx (h);
Expand All @@ -1276,11 +1310,41 @@ flux_rpc_t *kvs_fence_begin (flux_t *h, const char *name, int nprocs)
if (ctx->fence_ops)
fence_ops = zhash_lookup (ctx->fence_ops, name);
if (fence_ops) {
if (!(in = kp_tfence_enc (name, nprocs, fence_ops)))
if (!(in = kp_tfence_enc (name, nprocs, fence_ops, 0)))
goto done;
zhash_delete (ctx->fence_ops, name);
} else {
if (!(in = kp_tfence_enc (name, nprocs, ctx->ops)))
if (!(in = kp_tfence_enc (name, nprocs, ctx->ops, 0)))
goto done;
Jput (ctx->ops);
ctx->ops = NULL;
}
if (!(rpc = flux_rpc (h, "kvs.fence", Jtostr (in), FLUX_NODEID_ANY, 0)))
goto done;
done:
saved_errno = errno;
Jput (in);
errno = saved_errno;
return rpc;
}

flux_rpc_t *kvs_fence_begin_flags (flux_t *h, const char *name, int nprocs,
int flags)
{
kvsctx_t *ctx = getctx (h);
json_object *in = NULL;
flux_rpc_t *rpc = NULL;
int saved_errno = errno;
json_object *fence_ops = NULL;

if (ctx->fence_ops)
fence_ops = zhash_lookup (ctx->fence_ops, name);
if (fence_ops) {
if (!(in = kp_tfence_enc (name, nprocs, fence_ops, flags)))
goto done;
zhash_delete (ctx->fence_ops, name);
} else {
if (!(in = kp_tfence_enc (name, nprocs, ctx->ops, flags)))
goto done;
Jput (ctx->ops);
ctx->ops = NULL;
Expand Down Expand Up @@ -1336,6 +1400,21 @@ int kvs_fence (flux_t *h, const char *name, int nprocs)
return rc;
}

int kvs_fence_flags (flux_t *h, const char *name, int nprocs, int flags)
{
flux_rpc_t *rpc = NULL;
int rc = -1;

if (!(rpc = kvs_fence_begin_flags (h, name, nprocs, flags)))
goto done;
if (kvs_fence_finish (rpc) < 0)
goto done;
rc = 0;
done:
flux_rpc_destroy (rpc);
return rc;
}

int kvs_get_version (flux_t *h, int *versionp)
{
flux_rpc_t *rpc;
Expand Down
11 changes: 9 additions & 2 deletions src/modules/kvs/proto.c
Expand Up @@ -206,13 +206,16 @@ int kp_tunwatch_dec (json_object *o, const char **key)

/* kvs.fence
*/
json_object *kp_tfence_enc (const char *name, int nprocs, json_object *ops)
json_object *kp_tfence_enc (const char *name, int nprocs,
json_object *ops, int flags)
{
json_object *o = Jnew ();
json_object *empty_ops = NULL;

Jadd_str (o, "name", name);
Jadd_int (o, "nprocs", nprocs);
if (flags & KVS_NO_MERGE)
Jadd_bool (o, "mergeable", false);
if (!ops)
ops = empty_ops = Jnew_ar();
Jadd_obj (o, "ops", ops); /* takes a ref on ops */
Expand All @@ -221,7 +224,7 @@ json_object *kp_tfence_enc (const char *name, int nprocs, json_object *ops)
}

int kp_tfence_dec (json_object *o, const char **name, int *nprocs,
json_object **ops)
bool *mergeable, json_object **ops)
{
int rc = -1;

Expand All @@ -234,6 +237,10 @@ int kp_tfence_dec (json_object *o, const char **name, int *nprocs,
errno = EPROTO;
goto done;
}
if (mergeable) {
if (!Jget_bool (o, "mergeable", mergeable))
*mergeable = true;
}
rc = 0;
done:
return rc;
Expand Down
5 changes: 3 additions & 2 deletions src/modules/kvs/proto.h
Expand Up @@ -39,9 +39,10 @@ int kp_tunwatch_dec (json_object *o, const char **key);
/* kvs.fence
* kvs.relayfence
*/
json_object *kp_tfence_enc (const char *name, int nprocs, json_object *ops);
json_object *kp_tfence_enc (const char *name, int nprocs, json_object *ops,
int flags);
int kp_tfence_dec (json_object *o, const char **name, int *nprocs,
json_object **ops);
bool *mergeable, json_object **ops);

/* kvs.getroot (request)
*/
Expand Down

0 comments on commit 3dbfb83

Please sign in to comment.