Skip to content

Commit

Permalink
Merge pull request #1622 from garlick/getroot
Browse files Browse the repository at this point in the history
kvs: add kvs-watch module
  • Loading branch information
chu11 committed Aug 22, 2018
2 parents 62a8aa5 + 896ac45 commit 4a2674b
Show file tree
Hide file tree
Showing 30 changed files with 1,605 additions and 9 deletions.
1 change: 1 addition & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ AC_CONFIG_FILES( \
src/modules/Makefile \
src/modules/connector-local/Makefile \
src/modules/kvs/Makefile \
src/modules/kvs-watch/Makefile \
src/modules/content-sqlite/Makefile \
src/modules/barrier/Makefile \
src/modules/wreck/Makefile \
Expand Down
7 changes: 7 additions & 0 deletions doc/man1/flux-kvs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ of synchronization between peers is: node A puts a value, commits it,
reads version, sends version to node B. Node B waits for version, gets
value.

*getroot* [-w] [-c count] [-s | -o | -b]::
Retrieve the current KVS root, displaying it as an RFC 11 dirref object.
If '-b' is specified, display it as a blobref. If '-o' is specified,
display the namespace owner. If '-s' is specified, display the root
sequence number. If '-w' is specified, display the current root,
then a new value each time it is updated, up to 'count', if specified.


AUTHOR
------
Expand Down
12 changes: 11 additions & 1 deletion doc/man3/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ MAN3_FILES_PRIMARY = \
flux_kvs_namespace_create.3 \
flux_kvs_namespace_list.3 \
flux_kvs_set_namespace.3 \
flux_kvs_getroot.3 \
idset_create.3

# These files are generated as roff .so includes of a primary page.
Expand Down Expand Up @@ -156,6 +157,11 @@ MAN3_FILES_SECONDARY = \
flux_kvs_lookup_get_dir.3 \
flux_kvs_lookup_get_treeobj.3 \
flux_kvs_lookup_get_symlink.3 \
flux_kvs_getroot_get_treeobj.3 \
flux_kvs_getroot_get_blobref.3 \
flux_kvs_getroot_get_sequence.3 \
flux_kvs_getroot_get_owner.3 \
flux_kvs_getroot_cancel.3 \
flux_kvs_fence.3 \
flux_kvs_txn_destroy.3 \
flux_kvs_txn_put.3 \
Expand Down Expand Up @@ -278,14 +284,18 @@ flux_rpc_raw.3: flux_rpc.3
flux_rpc_get.3: flux_rpc.3
flux_rpc_get_unpack.3: flux_rpc.3
flux_rpc_get_raw.3: flux_rpc.3
flux_rpc_get_error.3: flux_rpc.3
flux_kvs_lookupat.3: flux_kvs_lookup.3
flux_kvs_lookup_get.3: flux_kvs_lookup.3
flux_kvs_lookup_get_unpack.3: flux_kvs_lookup.3
flux_kvs_lookup_get_raw.3: flux_kvs_lookup.3
flux_kvs_lookup_get_dir.3: flux_kvs_lookup.3
flux_kvs_lookup_treeobj.3: flux_kvs_lookup.3
flux_kvs_lookup_symlink.3: flux_kvs_lookup.3
flux_kvs_getroot_get_treeobj.3: flux_kvs_getroot.3
flux_kvs_getroot_get_blobref.3: flux_kvs_getroot.3
flux_kvs_getroot_get_sequence.3: flux_kvs_getroot.3
flux_kvs_getroot_get_owner.3: flux_kvs_getroot.3
flux_kvs_getroot_cancel.3: flux_kvs_getroot.3
flux_kvs_fence.3: flux_kvs_commit.3
flux_kvs_txn_destroy.3: flux_kvs_txn_create.3
flux_kvs_txn_put.3: flux_kvs_txn_create.3
Expand Down
125 changes: 125 additions & 0 deletions doc/man3/flux_kvs_getroot.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
flux_kvs_getroot(3)
===================
:doctype: manpage


NAME
----
flux_kvs_getroot, flux_kvs_getroot_get_treeobj, flux_kvs_getroot_get_blobref, flux_kvs_getroot_get_sequence, flux_kvs_getroot_get_owner, flux_kvs_getroot_cancel - look up KVS root hash


SYNOPSIS
--------
#include <flux/core.h>

flux_future_t *flux_kvs_getroot (flux_t *h,
const char *ns,
int flags);

int flux_kvs_getroot_get_treeobj (flux_future_t *f,
const char **treeobj);

int flux_kvs_getroot_get_blobref (flux_future_t *f,
const char **blobref);

int flux_kvs_getroot_get_sequence (flux_future_t *f,
int *seq);

int flux_kvs_getroot_get_owner (flux_future_t *f,
uint32_t *owner);

int flux_kvs_getroot_cancel (flux_future_t *f);


DESCRIPTION
-----------
`flux_kvs_getroot()` sends a request via handle _h_ to the `kvs` or `kvs-watch`
service to look up the current root hash for namespace _ns_. A `flux_future_t`
object is returned, which acts as handle for synchronization and container
for the response. _flags_ modifies the request as described below.
Upon future fulfillment, these functions can decode the result:
`flux_kvs_getroot_get_treeobj()` obtains the root hash in the form
of an RFC 11 _dirref_ treeobj, suitable to be passed to `flux_kvs_lookupat(3)`.
`flux_kvs_getroot_get_blobref()` obtains the RFC 10 blobref, suitable to
be passed to `flux_content_load(3)`.
`flux_kvs_getroot_get_sequence()` retrieves the monotonic sequence number
for the root.
`flux_kvs_getroot_get_owner()` retrieves the namespace owner.
If the `FLUX_KVS_WATCH` flag is used, the current root is returned immediately.
Thereafter, a new response is returned each time the root is updated.
`flux_future_reset()` must be called after each response to destroy the
current result value and re-arm the future. The stream of responses may
be stopped at any time with `flux_kvs_getroot_cancel()`, after which
the caller should wait for the future to be fulfilled with an ENODATA error.
After an error, the future may be safely destroyed.
FLAGS
-----

The following are valid bits in the _flags_ mask passed as an argument
to `flux_kvs_lookup()`:

FLUX_KVS_WATCH::
The current root is returned immediately. Thereafter, a new response is
returned each time the root is updated, as described above.


RETURN VALUE
------------
`flux_kvs_getroot()` returns a `flux_future_t` on success, or NULL on
failure with errno set appropriately.
The other functions return zero on success, or -1 on failure with errno
set appropriately.
ERRORS
------

EINVAL::
One of the arguments was invalid.

ENOMEM::
Out of memory.

EPROTO::
A request was malformed.

ENOSYS::
The kvs or kvs-watch module is not loaded.

EPERM::
The requesting user is not permitted to access the requested namespace.

ENODATA::
A stream of responses has been terminated by a call to
`flux_kvs_getroot_cancel()`.


AUTHOR
------
This page is maintained by the Flux community.
RESOURCES
---------
Github: <http://github.com/flux-framework>


COPYRIGHT
---------
include::COPYRIGHT.adoc[]
SEE ALSO
---------
flux_kvs_lookup (3), flux_future_get (3), flux_content_load (3).
2 changes: 1 addition & 1 deletion doc/man3/flux_rpc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ flux_rpc(3)

NAME
----
flux_rpc, flux_rpc_pack, flux_rpc_raw, flux_rpc_get, flux_rpc_get_unpack, flux_rpc_get_raw, flux_rpc_get_error - perform a remote procedure call to a Flux service
flux_rpc, flux_rpc_pack, flux_rpc_raw, flux_rpc_get, flux_rpc_get_unpack, flux_rpc_get_raw - perform a remote procedure call to a Flux service


SYNOPSIS
Expand Down
2 changes: 2 additions & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,5 @@ epilog
gpubind
gpus
fulfillments
enodata
getroot
1 change: 1 addition & 0 deletions etc/rc1
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ flux module load -r all barrier
flux module load -r 0 content-sqlite
flux module load -r 0 kvs
flux module load -r all -x 0 kvs
flux module load -r all kvs-watch
flux module load -r all aggregator

flux module load -r all resource-hwloc & pids="$pids $!"
Expand Down
1 change: 1 addition & 0 deletions etc/rc3
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ flux module remove -r 0 cron
flux module remove -r all job
flux module remove -r all resource-hwloc
flux module remove -r all aggregator
flux module remove -r all kvs-watch
flux module remove -r all kvs
flux module remove -r all barrier

Expand Down
112 changes: 112 additions & 0 deletions src/cmd/flux-kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ int cmd_copy (optparse_t *p, int argc, char **argv);
int cmd_move (optparse_t *p, int argc, char **argv);
int cmd_dir (optparse_t *p, int argc, char **argv);
int cmd_ls (optparse_t *p, int argc, char **argv);
int cmd_getroot (optparse_t *p, int argc, char **argv);

static int get_window_width (optparse_t *p, int fd);
static void dump_kvs_dir (const flux_kvsdir_t *dir, int maxcol,
Expand Down Expand Up @@ -185,6 +186,25 @@ static struct optparse_option unlink_opts[] = {
OPTPARSE_TABLE_END
};

static struct optparse_option getroot_opts[] = {
{ .name = "blobref", .key = 'b', .has_arg = 0,
.usage = "Show root as a blobref rather that an RFC 11 dirref",
},
{ .name = "sequence", .key = 's', .has_arg = 0,
.usage = "Show sequence number",
},
{ .name = "owner", .key = 'o', .has_arg = 0,
.usage = "Show owner",
},
{ .name = "watch", .key = 'w', .has_arg = 0,
.usage = "Monitor root changes",
},
{ .name = "count", .key = 'c', .has_arg = 1, .arginfo = "COUNT",
.usage = "Display at most COUNT changes",
},
OPTPARSE_TABLE_END
};

static struct optparse_subcommand subcommands[] = {
{ "namespace-create",
"name [name...]",
Expand Down Expand Up @@ -305,6 +325,13 @@ static struct optparse_subcommand subcommands[] = {
0,
NULL
},
{ "getroot",
"[-w] [-c COUNT] [-s|-o|-b]",
"Get KVS root treeobj",
cmd_getroot,
0,
getroot_opts
},
OPTPARSE_SUBCMD_END
};

Expand Down Expand Up @@ -1563,6 +1590,91 @@ int cmd_move (optparse_t *p, int argc, char **argv)
return (0);
}

struct getroot_ctx {
optparse_t *p;
int count;
int maxcount;
};

void getroot_continuation (flux_future_t *f, void *arg)
{
struct getroot_ctx *ctx = arg;

if (optparse_hasopt (ctx->p, "watch") && flux_rpc_get (f, NULL) < 0
&& errno == ENODATA) {
flux_future_destroy (f);
return; // EOF
}
if (ctx->maxcount == 0 || ctx->count < ctx->maxcount) {
if (optparse_hasopt (ctx->p, "owner")) {
uint32_t owner;

if (flux_kvs_getroot_get_owner (f, &owner) < 0)
log_err_exit ("flux_kvs_getroot_get_owner");
printf ("%lu\n", (unsigned long)owner);
}
else if (optparse_hasopt (ctx->p, "sequence")) {
int sequence;

if (flux_kvs_getroot_get_sequence (f, &sequence) < 0)
log_err_exit ("flux_kvs_getroot_get_sequence");
printf ("%d\n", sequence);
}
else if (optparse_hasopt (ctx->p, "blobref")) {
const char *blobref;

if (flux_kvs_getroot_get_blobref (f, &blobref) < 0)
log_err_exit ("flux_kvs_getroot_get_blobref");
printf ("%s\n", blobref);
}
else {
const char *treeobj;

if (flux_kvs_getroot_get_treeobj (f, &treeobj) < 0)
log_err_exit ("flux_kvs_getroot_get_treeobj");
printf ("%s\n", treeobj);
}
}
fflush (stdout);
if (optparse_hasopt (ctx->p, "watch")) {
flux_future_reset (f);
if (ctx->maxcount > 0 && ++ctx->count == ctx->maxcount) {
if (flux_kvs_getroot_cancel (f) < 0)
log_err_exit ("flux_kvs_getroot_cancel");
}
}
else
flux_future_destroy (f);
}

int cmd_getroot (optparse_t *p, int argc, char **argv)
{
flux_t *h = optparse_get_data (p, "flux_handle");
int optindex = optparse_option_index (p);
flux_future_t *f;
int flags = 0;
struct getroot_ctx ctx;

ctx.p = p;
ctx.count = 0;
ctx.maxcount = optparse_get_int (p, "count", 0);
if (ctx.maxcount < 0)
log_msg_exit ("count value must be >= 0");
if (optindex != argc) {
optparse_print_usage (p);
exit (1);
}
if (optparse_hasopt (p, "watch"))
flags |= FLUX_KVS_WATCH;
if (!(f = flux_kvs_getroot (h, NULL, flags)))
log_err_exit ("flux_kvs_getroot");
if (flux_future_then (f, -1., getroot_continuation, &ctx) < 0)
log_err_exit ("flux_future_then");
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
log_err_exit ("flux_reactor_run");
return (0);
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
6 changes: 6 additions & 0 deletions src/common/libflux/rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ flux_future_t *flux_rpc_pack (flux_t *h, const char *topic, uint32_t nodeid,
return f;
}

uint32_t flux_rpc_get_matchtag (flux_future_t *f)
{
struct flux_rpc *rpc = flux_future_aux_get (f, "flux::rpc");
return rpc ? rpc->matchtag : FLUX_MATCHTAG_NONE;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
4 changes: 4 additions & 0 deletions src/common/libflux/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ int flux_rpc_get_unpack (flux_future_t *f, const char *fmt, ...);

int flux_rpc_get_raw (flux_future_t *f, const void **data, int *len);

/* Accessor for RPC matchtag (see RFC 6).
*/
uint32_t flux_rpc_get_matchtag (flux_future_t *f);

#ifdef __cplusplus
}
#endif
Expand Down
Loading

0 comments on commit 4a2674b

Please sign in to comment.