Skip to content

Commit

Permalink
Merge pull request #1850 from chu11/issue1847
Browse files Browse the repository at this point in the history
kvs-watch: Support FLUX_KVS_WATCH_UNIQ
  • Loading branch information
garlick committed Nov 20, 2018
2 parents 3f6a417 + b43e332 commit bb5105d
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 17 deletions.
11 changes: 6 additions & 5 deletions doc/man1/flux-kvs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Remove a kvs namespace.
*namespace-list*::
List all current namespaces and info on each namespace.

*get* [-j|-r|-t] [-a treeobj] [-l] [-w] [-W] [-f] [-c count] 'key' ['key...']::
*get* [-j|-r|-t] [-a treeobj] [-l] [-w] [-W] [-u] [-f] [-c count] 'key' ['key...']::
Retrieve the value stored under 'key'. If nothing has been stored under
'key', display an error message. If no options, value is displayed with
a newline appended (if value length is nonzero). If '-l', a 'key=' prefix is
Expand All @@ -79,10 +79,11 @@ snapshot reference. If '-w', after the initial value, display the
new value each time the key is written to until interrupted, or if '-c count'
is specified, until 'count' values have been displayed. If the
initial value does not yet exist, `-W` can be specified to wait for it
to be created. By default, only a direct write to a key is monitored,
which may miss several unique situations, such as the replacement of
an entire parent directory. The '-f' option can be specified to
monitor for many of these unique situations.
to be created. If '-u' is specified, only writes that change the key
value will be displayed. By default, only a direct write to a key is
monitored, which may miss several unique situations, such as the
replacement of an entire parent directory. The '-f' option can be
specified to monitor for many of these special situations.

*put* [-j|-r|-t] [-n] [-A] 'key=value' ['key=value...']::
Store 'value' under 'key' and commit it. If it already has a value,
Expand Down
7 changes: 6 additions & 1 deletion src/cmd/flux-kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,14 @@ static struct optparse_option get_opts[] = {
.usage = "Print key= before value",
},
{ .name = "watch", .key = 'w', .has_arg = 0,
.usage = "Monitor key changes",
.usage = "Monitor key writes",
},
{ .name = "waitcreate", .key = 'W', .has_arg = 0,
.usage = "Wait for creation to occur on watch",
},
{ .name = "uniq", .key = 'u', .has_arg = 0,
.usage = "Only monitor key writes if values have changed",
},
{ .name = "full", .key = 'f', .has_arg = 0,
.usage = "Monitor key changes with more complete accuracy",
},
Expand Down Expand Up @@ -692,6 +695,8 @@ void cmd_get_one (flux_t *h, const char *key, struct lookup_ctx *ctx)
flags |= FLUX_KVS_WATCH_WAITCREATE;
if (optparse_hasopt (ctx->p, "full"))
flags |= FLUX_KVS_WATCH_FULL;
if (optparse_hasopt (ctx->p, "uniq"))
flags |= FLUX_KVS_WATCH_UNIQ;
}
if (optparse_hasopt (ctx->p, "at")) {
const char *reference = optparse_get_str (ctx->p, "at", "");
Expand Down
3 changes: 2 additions & 1 deletion src/common/libkvs/kvs.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ enum kvs_op {
FLUX_KVS_WATCH_WAITCREATE = 8,
FLUX_KVS_TREEOBJ = 16,
FLUX_KVS_APPEND = 32,
FLUX_KVS_WATCH_FULL = 64
FLUX_KVS_WATCH_FULL = 64,
FLUX_KVS_WATCH_UNIQ = 128
};

typedef struct flux_kvs_namespace_itr flux_kvs_namespace_itr_t;
Expand Down
10 changes: 6 additions & 4 deletions src/common/libkvs/kvs_lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ struct lookup_ctx {

static const char *auxkey = "flux::lookup_ctx";

#define FLUX_KVS_WATCH_FLAGS (FLUX_KVS_WATCH_WAITCREATE \
| FLUX_KVS_WATCH_FULL \
| FLUX_KVS_WATCH_UNIQ)

static void free_ctx (struct lookup_ctx *ctx)
{
if (ctx) {
Expand Down Expand Up @@ -88,16 +92,14 @@ static int validate_lookup_flags (int flags, bool watch_ok)
{
if ((flags & FLUX_KVS_WATCH) && !watch_ok)
return -1;
if (flags & FLUX_KVS_WATCH_WAITCREATE
&& !(flags & FLUX_KVS_WATCH))
return -1;
if (flags & FLUX_KVS_WATCH_FULL
if (flags & FLUX_KVS_WATCH_FLAGS
&& !(flags & FLUX_KVS_WATCH))
return -1;

flags &= ~FLUX_KVS_WATCH;
flags &= ~FLUX_KVS_WATCH_WAITCREATE;
flags &= ~FLUX_KVS_WATCH_FULL;
flags &= ~FLUX_KVS_WATCH_UNIQ;
switch (flags) {
case 0:
case FLUX_KVS_TREEOBJ:
Expand Down
13 changes: 7 additions & 6 deletions src/modules/kvs-watch/kvs-watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ static bool array_match (json_t *a, const char *key)
return false;
}

static int handle_full_response (flux_t *h,
struct watcher *w,
const void *data,
int len)
static int handle_compare_response (flux_t *h,
struct watcher *w,
const void *data,
int len)
{
if (!w->responded) {
/* this is the first response case, simply store the first
Expand Down Expand Up @@ -334,8 +334,9 @@ static int handle_lookup_response (flux_future_t *f, struct watcher *w)
goto error;
}
if (!w->mute) {
if (w->flags & FLUX_KVS_WATCH_FULL) {
if (handle_full_response (h, w, data, len) < 0)
if (w->flags & FLUX_KVS_WATCH_FULL
|| w->flags & FLUX_KVS_WATCH_UNIQ) {
if (handle_compare_response (h, w, data, len) < 0)
goto error;
}
else {
Expand Down
25 changes: 25 additions & 0 deletions t/t1007-kvs-lookup-watch.t
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,31 @@ test_expect_success NO_CHAIN_LINT 'flux kvs get --watch sees duplicate commited
wait $pid
'

test_expect_success NO_CHAIN_LINT 'flux kvs get --watch and --uniq do not see duplicate commited values' '
flux kvs put test.f=1 &&
flux kvs get --count=3 --watch --uniq test.f >seq4.out &
pid=$! &&
$waitfile --count=1 --timeout=10 \
--pattern="[0-9]+" seq4.out >/dev/null &&
for i in $(seq 2 20); \
do flux kvs put --no-merge test.f=1; \
done &&
for i in $(seq 2 20); \
do flux kvs put --no-merge test.f=2; \
done &&
for i in $(seq 2 20); \
do flux kvs put --no-merge test.f=3; \
done &&
cat >expected <<-EOF &&
1
2
3
EOF
wait $pid &&
test_cmp expected seq4.out
'

# in --watch & --waitcreate tests, call wait_watcherscount_nonzero to
# ensure background watcher has started, otherwise test can be racy

Expand Down

0 comments on commit bb5105d

Please sign in to comment.