diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 57f21637be8d..e60102515de5 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -95,6 +95,7 @@ typedef struct { zlist_t *requests; json_object *names; int nprocs; + bool mergeable; int count; int errnum; ctx_t *ctx; @@ -598,22 +599,44 @@ void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w, flux_watcher_start (ctx->idle_w); } -/* Merge ready commits, where merging consists of popping the "donor" - * commit off the ready list, and appending its ops to the top commit. - * The top commit can be appended to if it hasn't started, or is still - * building the rootcpy, e.g. stalled walking the namespace. +/* Merge ready commits that are mergeable, where merging consists of + * popping the "donor" commit off the ready list, and appending its + * ops to the top commit. The top commit can be appended to if it + * hasn't started, or is still building the rootcpy, e.g. stalled + * walking the namespace. + * + * Break when an unmergeable commit is discovered. We do not wish to + * merge non-adjacent fences, as it can create undesireable out of + * order scenarios. e.g. + * + * commit #1 is mergeable: set A=1 + * commit #2 is non-mergeable: set A=2 + * commit #3 is mergeable: set A=3 + * + * If we were to merge commit #1 and commit #3, A=2 would be set after + * A=3. */ void commit_merge_all (ctx_t *ctx) { fence_t *f = zlist_first (ctx->ready); - if (f && f->errnum == 0 && !f->rootcpy_stored) { + if (f + && f->mergeable + && f->errnum == 0 + && !f->rootcpy_stored) { fence_t *nf; nf = zlist_pop (ctx->ready); assert (nf == f); - while ((nf = zlist_pop (ctx->ready))) { + while ((nf = zlist_first (ctx->ready))) { int i, len; + /* We've merged as many as we currently can */ + if (!nf->mergeable) + break; + + /* Fence is mergeable, pop off list */ + nf = zlist_pop (ctx->ready); + if (Jget_ar_len (nf->names, &len)) { for (i = 0; i < len; i++) { const char *name; @@ -1096,7 +1119,8 @@ static void fence_destroy (fence_t *f) } } -static fence_t *fence_create (ctx_t *ctx, const char *name, int nprocs) +static fence_t *fence_create (ctx_t *ctx, const char *name, int nprocs, + bool mergeable) { fence_t *f; @@ -1106,6 +1130,7 @@ static fence_t *fence_create (ctx_t *ctx, const char *name, int nprocs) goto error; } f->nprocs = nprocs; + f->mergeable = mergeable; f->ctx = ctx; f->names = Jnew_ar (); Jadd_ar_str (f->names, name); @@ -1210,6 +1235,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w, ctx_t *ctx = arg; const char *json_str, *name; int nprocs; + bool mergeable; json_object *in = NULL; json_object *ops = NULL; fence_t *f; @@ -1219,7 +1245,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w, goto done; } if (!(in = Jfromstr (json_str)) - || kp_tfence_dec (in, &name, &nprocs, &ops) < 0) { + || kp_tfence_dec (in, &name, &nprocs, &mergeable, &ops) < 0) { errno = EPROTO; flux_log_error (h, "%s payload decode", __FUNCTION__); goto done; @@ -1228,7 +1254,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w, * occurs after we know the fence name */ if (!(f = fence_lookup (ctx, name))) { - if (!(f = fence_create (ctx, name, nprocs))) { + if (!(f = fence_create (ctx, name, nprocs, mergeable))) { flux_log_error (h, "%s fence_create %s", __FUNCTION__, name); goto done; } @@ -1238,6 +1264,10 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w, goto done; } } + else { + if (!mergeable) + f->mergeable = false; + } if (fence_append_ops (f, ops) < 0) { flux_log_error (h, "%s fence_append_ops %s", __FUNCTION__, name); goto done; @@ -1262,6 +1292,7 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *w, ctx_t *ctx = arg; const char *json_str, *name; int nprocs; + bool mergeable; json_object *in = NULL; json_object *ops = NULL; fence_t *f; @@ -1272,18 +1303,22 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *w, errno = EPROTO; goto error; } - if (kp_tfence_dec (in, &name, &nprocs, &ops) < 0) { + if (kp_tfence_dec (in, &name, &nprocs, &mergeable, &ops) < 0) { errno = EPROTO; goto error; } if (!(f = fence_lookup (ctx, name))) { - if (!(f = fence_create (ctx, name, nprocs))) + if (!(f = fence_create (ctx, name, nprocs, mergeable))) goto error; if (fence_add (ctx, f) < 0) { fence_destroy (f); goto error; } } + else { + if (!mergeable) + f->mergeable = false; + } if (fence_append_request (f, msg) < 0) goto error; if (ctx->rank == 0) { diff --git a/src/modules/kvs/kvs.h b/src/modules/kvs/kvs.h index ce4ea6c3650c..f19fb9666144 100644 --- a/src/modules/kvs/kvs.h +++ b/src/modules/kvs/kvs.h @@ -5,6 +5,12 @@ #include #include +/* Flags for commit and fence operations + */ +enum { + KVS_NO_MERGE = 1, /* disallow commits to be mergeable with others */ +}; + typedef struct kvsdir_struct kvsdir_t; typedef int (*kvs_set_f)(const char *key, const char *json_str, void *arg, @@ -174,12 +180,14 @@ int kvs_mkdir (flux_t *h, const char *key); * Returns -1 on error (errno set), 0 on success. */ int kvs_commit (flux_t *h); +int kvs_commit_flags (flux_t *h, int flags); /* kvs_commit_begin() sends the commit request and returns immediately. * kvs_commit_finish() blocks until the response is received, then returns. * Use flux_rpc_then() to arrange for the commit to complete asynchronously. */ flux_rpc_t *kvs_commit_begin (flux_t *h); +flux_rpc_t *kvs_commit_begin_flags (flux_t *h, int flags); int kvs_commit_finish (flux_rpc_t *rpc); /* kvs_fence() is a collective commit operation. nprocs tasks make the @@ -193,12 +201,15 @@ int kvs_commit_finish (flux_rpc_t *rpc); * Returns -1 on error (errno set), 0 on success. */ int kvs_fence (flux_t *h, const char *name, int nprocs); +int kvs_fence_flags (flux_t *h, const char *name, int nprocs, int flags); /* kvs_fence_begin() sends the fence request and returns immediately. * kvs_fence_finish() blocks until the response is received, then returns. * Use flux_rpc_then() to arrange for the fence to complete asynchronously. */ flux_rpc_t *kvs_fence_begin (flux_t *h, const char *name, int nprocs); +flux_rpc_t *kvs_fence_begin_flags (flux_t *h, const char *name, int nprocs, + int flags); int kvs_fence_finish (flux_rpc_t *rpc); /* Operations (put, unlink, symlink, mkdir) may be associated with a named diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index 208c41652319..8e848b1d1633 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -1245,6 +1245,25 @@ flux_rpc_t *kvs_commit_begin (flux_t *h) return rpc; } +flux_rpc_t *kvs_commit_begin_flags (flux_t *h, int flags) +{ + zuuid_t *uuid = NULL; + flux_rpc_t *rpc = NULL; + int saved_errno; + + if (!(uuid = zuuid_new ())) { + errno = ENOMEM; + goto done; + } + if (!(rpc = kvs_fence_begin_flags (h, zuuid_str (uuid), 1, flags))) + goto done; +done: + saved_errno = errno; + zuuid_destroy (&uuid); + errno = saved_errno; + return rpc; +} + int kvs_commit_finish (flux_rpc_t *rpc) { return flux_rpc_get (rpc, NULL); @@ -1265,6 +1284,21 @@ int kvs_commit (flux_t *h) return rc; } +int kvs_commit_flags (flux_t *h, int flags) +{ + flux_rpc_t *rpc = NULL; + int rc = -1; + + if (!(rpc = kvs_commit_begin_flags (h, flags))) + goto done; + if (kvs_commit_finish (rpc) < 0) + goto done; + rc = 0; +done: + flux_rpc_destroy (rpc); + return rc; +} + flux_rpc_t *kvs_fence_begin (flux_t *h, const char *name, int nprocs) { kvsctx_t *ctx = getctx (h); @@ -1276,11 +1310,41 @@ flux_rpc_t *kvs_fence_begin (flux_t *h, const char *name, int nprocs) if (ctx->fence_ops) fence_ops = zhash_lookup (ctx->fence_ops, name); if (fence_ops) { - if (!(in = kp_tfence_enc (name, nprocs, fence_ops))) + if (!(in = kp_tfence_enc (name, nprocs, fence_ops, 0))) goto done; zhash_delete (ctx->fence_ops, name); } else { - if (!(in = kp_tfence_enc (name, nprocs, ctx->ops))) + if (!(in = kp_tfence_enc (name, nprocs, ctx->ops, 0))) + goto done; + Jput (ctx->ops); + ctx->ops = NULL; + } + if (!(rpc = flux_rpc (h, "kvs.fence", Jtostr (in), FLUX_NODEID_ANY, 0))) + goto done; +done: + saved_errno = errno; + Jput (in); + errno = saved_errno; + return rpc; +} + +flux_rpc_t *kvs_fence_begin_flags (flux_t *h, const char *name, int nprocs, + int flags) +{ + kvsctx_t *ctx = getctx (h); + json_object *in = NULL; + flux_rpc_t *rpc = NULL; + int saved_errno = errno; + json_object *fence_ops = NULL; + + if (ctx->fence_ops) + fence_ops = zhash_lookup (ctx->fence_ops, name); + if (fence_ops) { + if (!(in = kp_tfence_enc (name, nprocs, fence_ops, flags))) + goto done; + zhash_delete (ctx->fence_ops, name); + } else { + if (!(in = kp_tfence_enc (name, nprocs, ctx->ops, flags))) goto done; Jput (ctx->ops); ctx->ops = NULL; @@ -1336,6 +1400,21 @@ int kvs_fence (flux_t *h, const char *name, int nprocs) return rc; } +int kvs_fence_flags (flux_t *h, const char *name, int nprocs, int flags) +{ + flux_rpc_t *rpc = NULL; + int rc = -1; + + if (!(rpc = kvs_fence_begin_flags (h, name, nprocs, flags))) + goto done; + if (kvs_fence_finish (rpc) < 0) + goto done; + rc = 0; +done: + flux_rpc_destroy (rpc); + return rc; +} + int kvs_get_version (flux_t *h, int *versionp) { flux_rpc_t *rpc; diff --git a/src/modules/kvs/proto.c b/src/modules/kvs/proto.c index 0654863585f3..ab60bae29c83 100644 --- a/src/modules/kvs/proto.c +++ b/src/modules/kvs/proto.c @@ -206,13 +206,16 @@ int kp_tunwatch_dec (json_object *o, const char **key) /* kvs.fence */ -json_object *kp_tfence_enc (const char *name, int nprocs, json_object *ops) +json_object *kp_tfence_enc (const char *name, int nprocs, + json_object *ops, int flags) { json_object *o = Jnew (); json_object *empty_ops = NULL; Jadd_str (o, "name", name); Jadd_int (o, "nprocs", nprocs); + if (flags & KVS_NO_MERGE) + Jadd_bool (o, "mergeable", false); if (!ops) ops = empty_ops = Jnew_ar(); Jadd_obj (o, "ops", ops); /* takes a ref on ops */ @@ -221,7 +224,7 @@ json_object *kp_tfence_enc (const char *name, int nprocs, json_object *ops) } int kp_tfence_dec (json_object *o, const char **name, int *nprocs, - json_object **ops) + bool *mergeable, json_object **ops) { int rc = -1; @@ -234,6 +237,10 @@ int kp_tfence_dec (json_object *o, const char **name, int *nprocs, errno = EPROTO; goto done; } + if (mergeable) { + if (!Jget_bool (o, "mergeable", mergeable)) + *mergeable = true; + } rc = 0; done: return rc; diff --git a/src/modules/kvs/proto.h b/src/modules/kvs/proto.h index 8122ba68cdf9..979029d37fa5 100644 --- a/src/modules/kvs/proto.h +++ b/src/modules/kvs/proto.h @@ -39,9 +39,10 @@ int kp_tunwatch_dec (json_object *o, const char **key); /* kvs.fence * kvs.relayfence */ -json_object *kp_tfence_enc (const char *name, int nprocs, json_object *ops); +json_object *kp_tfence_enc (const char *name, int nprocs, json_object *ops, + int flags); int kp_tfence_dec (json_object *o, const char **name, int *nprocs, - json_object **ops); + bool *mergeable, json_object **ops); /* kvs.getroot (request) */ diff --git a/t/kvs/basic.c b/t/kvs/basic.c index 4f5e481fc2d3..1c9cdc5daa18 100644 --- a/t/kvs/basic.c +++ b/t/kvs/basic.c @@ -46,6 +46,7 @@ static const struct option longopts[] = { void cmd_get (flux_t *h, int argc, char **argv); void cmd_type (flux_t *h, int argc, char **argv); void cmd_put (flux_t *h, int argc, char **argv); +void cmd_put_no_merge (flux_t *h, int argc, char **argv); void cmd_unlink (flux_t *h, int argc, char **argv); void cmd_link (flux_t *h, int argc, char **argv); void cmd_readlink (flux_t *h, int argc, char **argv); @@ -76,6 +77,7 @@ void usage (void) "Usage: basic get key\n" " basic type key\n" " basic put key=val\n" +" basic put_no_merge key=val\n" " basic unlink key\n" " basic link target link_name\n" " basic readlink key\n" @@ -133,6 +135,8 @@ int main (int argc, char *argv[]) cmd_type (h, argc - optind, argv + optind); else if (!strcmp (cmd, "put")) cmd_put (h, argc - optind, argv + optind); + else if (!strcmp (cmd, "put_no_merge")) + cmd_put_no_merge (h, argc - optind, argv + optind); else if (!strcmp (cmd, "unlink")) cmd_unlink (h, argc - optind, argv + optind); else if (!strcmp (cmd, "link")) @@ -301,6 +305,24 @@ void cmd_put (flux_t *h, int argc, char **argv) log_err_exit ("kvs_commit"); } +void cmd_put_no_merge (flux_t *h, int argc, char **argv) +{ + if (argc == 0) + log_msg_exit ("put_no_merge: specify one key=value pair"); + char *key = xstrdup (argv[0]); + char *val = strchr (key, '='); + if (!val) + log_msg_exit ("put_no_merge: you must specify a value as key=value"); + *val++ = '\0'; + if (kvs_put (h, key, val) < 0) { + if (errno != EINVAL || kvs_put_string (h, key, val) < 0) + log_err_exit ("%s", key); + } + free (key); + if (kvs_commit_flags (h, KVS_NO_MERGE) < 0) + log_err_exit ("kvs_commit"); +} + void cmd_unlink (flux_t *h, int argc, char **argv) { if (argc != 1) diff --git a/t/kvs/commitmerge.c b/t/kvs/commitmerge.c index 427c5c6cc208..d98f0ff0389f 100644 --- a/t/kvs/commitmerge.c +++ b/t/kvs/commitmerge.c @@ -68,14 +68,21 @@ static int threadcount = -1; static int changecount = 0; static char *prefix = NULL; static char *key = NULL; +static bool nopt = false; static int watch_init = 0; static pthread_cond_t watch_init_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t watch_init_lock = PTHREAD_MUTEX_INITIALIZER; +#define OPTIONS "n" +static const struct option longopts[] = { + {"nomerge", no_argument, 0, 'n'}, + {0, 0, 0, 0}, +}; + static void usage (void) { - fprintf (stderr, "Usage: commitmerge threadcount prefix\n"); + fprintf (stderr, "Usage: commitmerge [--nomerge] threadcount prefix\n"); exit (1); } @@ -193,8 +200,14 @@ void *committhread (void *arg) if (kvs_put_int (t->h, key, t->n) < 0) log_err_exit ("%s", key); - if (kvs_commit (t->h) < 0) - log_err_exit ("kvs_commit"); + if (nopt) { + if (kvs_commit_flags (t->h, KVS_NO_MERGE) < 0) + log_err_exit ("kvs_commit"); + } + else { + if (kvs_commit (t->h) < 0) + log_err_exit ("kvs_commit"); + } flux_close (t->h); return NULL; @@ -203,10 +216,20 @@ void *committhread (void *arg) int main (int argc, char *argv[]) { thd_t *thd; - int i, rc; + int i, rc, ch; log_init (basename (argv[0])); + while ((ch = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) { + switch (ch) { + case 'n': + nopt = true; + break; + default: + usage (); + } + } + if (argc - optind != 2) usage (); diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index bfd70f3dfb92..e32d50f9300b 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -209,6 +209,24 @@ EOF test_cmp expected output ' +test_expect_success 'kvs: put using no-merge flag' ' + ${KVSBASIC} unlink $TEST && + ${KVSBASIC} put_no_merge $DIR.a=69 && + ${KVSBASIC} put_no_merge $DIR.b.c.d.e.f.g=70 && + ${KVSBASIC} put_no_merge $DIR.c.a.b=3.14 && + ${KVSBASIC} put_no_merge $DIR.d=\"snerg\" && + ${KVSBASIC} put_no_merge $DIR.e=true && + ${KVSBASIC} dir -r $DIR | sort >output && + cat >expected <