Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvs: Support FLUX_KVS_WATCH_FULL #1848

Merged
merged 4 commits into from Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions doc/man1/flux-kvs.adoc
Expand Up @@ -68,18 +68,21 @@ Remove a kvs namespace.
*namespace-list*::
List all current namespaces and info on each namespace.

*get* [-j|-r|-t] [-a treeobj] [-l] [-w] [-W] [-c count] 'key' ['key...']::
*get* [-j|-r|-t] [-a treeobj] [-l] [-w] [-W] [-f] [-c count] 'key' ['key...']::
Retrieve the value stored under 'key'. If nothing has been stored under
'key', display an error message. If no options, value is displayed with
a newline appended (if value length is nonzero). If '-l', a 'key=' prefix is
added. If '-j', value is interpreted as encoded JSON and formatted accordingly.
If '-r', value is displayed without a newline. If '-t', the RFC 11 object
is displayed. '-a treeobj' causes the lookup to be relative to an RFC 11
snapshot reference. If '-w', after the initial value, display the
new value each time the key changes until interrupted, or if '-c count'
new value each time the key is written to until interrupted, or if '-c count'
is specified, until 'count' values have been displayed. If the
initial value does not yet exist, `-W` can be specified to wait for it
to be created.
to be created. By default, only a direct write to a key is monitored,
which may miss several unique situations, such as the replacement of
an entire parent directory. The '-f' option can be specified to
monitor for many of these unique situations.

*put* [-j|-r|-t] [-n] [-A] 'key=value' ['key=value...']::
Store 'value' under 'key' and commit it. If it already has a value,
Expand Down
7 changes: 6 additions & 1 deletion src/cmd/flux-kvs.c
Expand Up @@ -107,6 +107,9 @@ static struct optparse_option get_opts[] = {
{ .name = "waitcreate", .key = 'W', .has_arg = 0,
.usage = "Wait for creation to occur on watch",
},
{ .name = "full", .key = 'f', .has_arg = 0,
.usage = "Monitor key changes with more complete accuracy",
},
{ .name = "count", .key = 'c', .has_arg = 1, .arginfo = "COUNT",
.usage = "Display at most COUNT changes",
},
Expand Down Expand Up @@ -245,7 +248,7 @@ static struct optparse_subcommand subcommands[] = {
NULL
},
{ "get",
"[-j|-r|-t] [-a treeobj] [-l] [-w] [-W] [-c COUNT] key [key...]",
"[-j|-r|-t] [-a treeobj] [-l] [-w] [-W] [-f] [-c COUNT] key [key...]",
"Get value stored under key",
cmd_get,
0,
Expand Down Expand Up @@ -687,6 +690,8 @@ void cmd_get_one (flux_t *h, const char *key, struct lookup_ctx *ctx)
flags |= FLUX_KVS_WATCH;
if (optparse_hasopt (ctx->p, "waitcreate"))
flags |= FLUX_KVS_WATCH_WAITCREATE;
if (optparse_hasopt (ctx->p, "full"))
flags |= FLUX_KVS_WATCH_FULL;
}
if (optparse_hasopt (ctx->p, "at")) {
const char *reference = optparse_get_str (ctx->p, "at", "");
Expand Down
1 change: 1 addition & 0 deletions src/common/libkvs/kvs.h
Expand Up @@ -50,6 +50,7 @@ enum kvs_op {
FLUX_KVS_WATCH_WAITCREATE = 8,
FLUX_KVS_TREEOBJ = 16,
FLUX_KVS_APPEND = 32,
FLUX_KVS_WATCH_FULL = 64
};

typedef struct flux_kvs_namespace_itr flux_kvs_namespace_itr_t;
Expand Down
4 changes: 4 additions & 0 deletions src/common/libkvs/kvs_lookup.c
Expand Up @@ -91,9 +91,13 @@ static int validate_lookup_flags (int flags, bool watch_ok)
if (flags & FLUX_KVS_WATCH_WAITCREATE
&& !(flags & FLUX_KVS_WATCH))
return -1;
if (flags & FLUX_KVS_WATCH_FULL
&& !(flags & FLUX_KVS_WATCH))
return -1;

flags &= ~FLUX_KVS_WATCH;
flags &= ~FLUX_KVS_WATCH_WAITCREATE;
flags &= ~FLUX_KVS_WATCH_FULL;
switch (flags) {
case 0:
case FLUX_KVS_TREEOBJ:
Expand Down
82 changes: 78 additions & 4 deletions src/modules/kvs-watch/kvs-watch.c
Expand Up @@ -52,6 +52,8 @@ struct watcher {
zlist_t *lookups; // list of futures, in commit order

struct namespace *ns; // back pointer for removal
void *prev; // previous watch for KVS_WATCH_FULL
int prev_len; // previous watch len for KVS_WATCH_FULL
};

/* Current KVS root.
Expand Down Expand Up @@ -103,6 +105,7 @@ static void watcher_destroy (struct watcher *w)
flux_future_destroy (f);
zlist_destroy (&w->lookups);
}
free (w->prev);
free (w);
errno = saved_errno;
}
Expand Down Expand Up @@ -252,9 +255,70 @@ static bool array_match (json_t *a, const char *key)
return false;
}

static int handle_full_response (flux_t *h,
struct watcher *w,
const void *data,
int len)
{
if (!w->responded) {
/* this is the first response case, simply store the first
* data */
if (!(w->prev = malloc (len))) {
errno = ENOMEM;
return -1;
}
memcpy (w->prev, data, len);
w->prev_len = len;

if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}
}
else {
/* not first response case, compare to previous to see if
* respond should be done, update data if necessary */
if (w->prev_len == len
&& !memcmp (w->prev, data, len))
return 0;

if (len > w->prev_len) {
if (!(w->prev = realloc (w->prev, len)))
return -1;
}
memcpy (w->prev, data, len);
w->prev_len = len;

if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}
}

w->responded = true;
return 0;
}

static int handle_normal_response (flux_t *h,
struct watcher *w,
const void *data,
int len)
{
if (flux_respond_raw (h, w->request, data, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
return -1;
}

w->responded = true;
return 0;
}

/* New value of key is available in future 'f' container.
* Send response to watcher using raw payload from lookup response.
* Return 0 on success, -1 on error (caller should destroy watcher).
*
* Exception for FLUX_KVS_WATCH_FULL, must check if value is
* different than old value.
*/
static int handle_lookup_response (flux_future_t *f, struct watcher *w)
{
Expand All @@ -270,9 +334,14 @@ static int handle_lookup_response (flux_future_t *f, struct watcher *w)
goto error;
}
if (!w->mute) {
if (flux_respond_raw (h, w->request, data, len) < 0)
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
w->responded = true;
if (w->flags & FLUX_KVS_WATCH_FULL) {
if (handle_full_response (h, w, data, len) < 0)
goto error;
}
else {
if (handle_normal_response (h, w, data, len) < 0)
goto error;
}
}
return 0;
error:
Expand Down Expand Up @@ -416,8 +485,13 @@ static void watcher_respond (struct namespace *ns, struct watcher *w)
* kvs.lookupat request with the requestor's creds, in case the key lookup
* traverses to a new namespace. Leave it up to the KVS module to ensure
* the requestor is permitted to access *that* namespace.
*
* Note on FLUX_KVS_WATCH_FULL: A lookup / comparison is done on every
* change.
*/
else if (w->rootseq == -1 || array_match (ns->commit->keys, w->key)) {
else if (w->rootseq == -1
|| (w->flags & FLUX_KVS_WATCH_FULL)
|| array_match (ns->commit->keys, w->key)) {
flux_future_t *f;
if (!(f = lookupat (ns->ctx->h,
w->flags,
Expand Down
152 changes: 152 additions & 0 deletions t/t1007-kvs-lookup-watch.t
Expand Up @@ -144,6 +144,158 @@ test_expect_success NO_CHAIN_LINT 'flux kvs get, --watch & --waitcreate, create
grep "Operation not supported" waitcreate4.out
'

# full checks

# in full checks, we create a directory that we will use to
# get a treeobj. We then use that treeobj to overwrite another
# directory.

# to handle racy issues, wait until a value has been seen by a get
# --watch. Note that we can't use waitfile or flux kvs get here, b/c
# we are specifically testing against --watch.
wait_kvs_value() {
key=$1
value=$2
i=0
while [ "$(flux kvs get --watch --count=1 $key 2> /dev/null)" != "$value" ] \
&& [ $i -lt ${KVS_WAIT_ITERS} ]
do
sleep 0.1
i=$((i + 1))
done
return $(loophandlereturn $i)
}

test_expect_success NO_CHAIN_LINT 'flux kvs get --watch w/o --full doesnt detect change' '
flux kvs unlink -Rf test &&
flux kvs put test.dir_orig.a="abc" &&
flux kvs get --watch --count=2 test.dir_orig.a > full1.out 2>&1 &
pid=$! &&
wait_watcherscount_nonzero primary &&
flux kvs put test.dir_new.a="xyz" &&
DIRREF=$(flux kvs get --treeobj test.dir_new) &&
flux kvs put --treeobj test.dir_orig="${DIRREF}" &&
wait_kvs_value test.dir_orig.a xyz &&
flux kvs put test.dir_orig.a="def" &&
$waitfile --count=1 --timeout=10 \
--pattern="def" full1.out >/dev/null &&
wait $pid
'

# to handle racy issues, wait until ENOENT has been seen by a get
# --watch. Note that we can't use waitfile or flux kvs get here, b/c
# we are specifically testing against --watch
wait_kvs_enoent() {
key=$1
i=0
while flux kvs get --watch --count=1 $key 2> /dev/null \
&& [ $i -lt ${KVS_WAIT_ITERS} ]
do
sleep 0.1
i=$((i + 1))
done
return $(loophandlereturn $i)
}

test_expect_success NO_CHAIN_LINT 'flux kvs get --watch w/o --full doesnt detect ENOENT' '
flux kvs unlink -Rf test &&
flux kvs put test.dir_orig.a="abc" &&
flux kvs get --watch --count=2 test.dir_orig.a > full2.out 2>&1 &
pid=$! &&
wait_watcherscount_nonzero primary &&
flux kvs put test.dir_new.b="xyz" &&
DIRREF=$(flux kvs get --treeobj test.dir_new) &&
flux kvs put --treeobj test.dir_orig="${DIRREF}" &&
wait_kvs_enoent test.dir_orig.a &&
flux kvs put test.dir_orig.a="def" &&
$waitfile --count=1 --timeout=10 \
--pattern="def" full2.out >/dev/null &&
wait $pid
'

test_expect_success NO_CHAIN_LINT 'flux kvs get --watch w/ --full detects change' '
flux kvs unlink -Rf test &&
flux kvs put test.dir_orig.a="abc" &&
flux kvs get --watch --full --count=2 test.dir_orig.a > full3.out 2>&1 &
pid=$! &&
wait_watcherscount_nonzero primary &&
flux kvs put test.dir_new.a="xyz" &&
DIRREF=$(flux kvs get --treeobj test.dir_new) &&
flux kvs put --treeobj test.dir_orig="${DIRREF}" &&
$waitfile --count=1 --timeout=10 \
--pattern="xyz" full3.out >/dev/null &&
wait $pid
'

test_expect_success NO_CHAIN_LINT 'flux kvs get --watch w/ --full detects ENOENT' '
flux kvs unlink -Rf test &&
flux kvs put test.dir_orig.a="abc" &&
flux kvs get --watch --full --count=2 test.dir_orig.a > full4.out 2>&1 &
pid=$! &&
wait_watcherscount_nonzero primary &&
flux kvs put test.dir_new.b="xyz" &&
DIRREF=$(flux kvs get --treeobj test.dir_new) &&
flux kvs put --treeobj test.dir_orig="${DIRREF}" &&
! wait $pid
'

test_expect_success NO_CHAIN_LINT 'flux kvs get --watch w/ --full works with changing data sizes' '
flux kvs unlink -Rf test &&
flux kvs put test.dir.a="abc" &&
flux kvs get --watch --full --count=5 test.dir.a > full5.out 2>&1 &
pid=$! &&
wait_watcherscount_nonzero primary &&
flux kvs put test.dir.a="abcdefghijklmnopqrstuvwxyz" &&
flux kvs put test.dir.a="xyz" &&
flux kvs put test.dir.a="abcdefghijklmnopqrstuvwxyz" &&
flux kvs put test.dir.a="abc" &&
wait $pid &&
cat >expected <<-EOF &&
abc
abcdefghijklmnopqrstuvwxyz
xyz
abcdefghijklmnopqrstuvwxyz
abc
EOF
test_cmp expected full5.out
'

test_expect_success NO_CHAIN_LINT 'flux kvs get --watch w/ --full doesnt work with non-changing data' '
flux kvs unlink -Rf test &&
flux kvs put test.dir.a="abc" &&
flux kvs get --watch --full --count=3 test.dir.a > full6.out 2>&1 &
pid=$! &&
wait_watcherscount_nonzero primary &&
flux kvs put test.dir.a="abc" &&
flux kvs put test.dir.a="abcdefghijklmnopqrstuvwxyz" &&
flux kvs put test.dir.a="abcdefghijklmnopqrstuvwxyz" &&
flux kvs put test.dir.a="xyz" &&
wait $pid &&
cat >expected <<-EOF &&
abc
abcdefghijklmnopqrstuvwxyz
xyz
EOF
test_cmp expected full6.out
'

test_expect_success NO_CHAIN_LINT 'flux kvs get --watch w/ --full & --waitcreate works' '
flux kvs unlink -Rf test &&
flux kvs get --watch --full --waitcreate --count=3 test.dir.a > full7.out 2>&1 &
pid=$! &&
wait_watcherscount_nonzero primary &&
flux kvs put test.dir.a="abc" &&
flux kvs put test.dir.a="def" &&
flux kvs put test.dir.a="xyz" &&
wait $pid &&
cat >expected <<-EOF &&
abc
def
xyz
EOF
test_cmp expected full7.out
'

# Security checks

test_expect_success 'flux kvs get --watch denies guest access to primary namespace' '
Expand Down