Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvs: remove watch handlers & api #2011

Merged
merged 7 commits into from Feb 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 0 additions & 54 deletions src/bindings/python/flux/kvs.py
Expand Up @@ -116,24 +116,6 @@ def dropcache(flux_handle):
return RAW.flux_kvs_dropcache(flux_handle)


def watch_once(flux_handle, key):
"""
Watches the selected key until the next change, then returns the
updated value of the key
"""
if isdir(flux_handle, key):
directory = get_dir(flux_handle)
# The wrapper automatically unpacks directory's handle
RAW.flux_kvs_watch_once_dir(flux_handle, None, directory)
return directory

out_json_str = ffi.new("char *[1]")
RAW.flux_kvs_watch_once(flux_handle, None, key, out_json_str)
if out_json_str[0] == ffi.NULL:
return None
return json.loads(ffi.string(out_json_str[0]).decode("utf-8"))


class KVSDir(WrapperPimpl, collections.MutableMapping):
# pylint: disable=too-many-ancestors, too-many-public-methods

Expand Down Expand Up @@ -294,14 +276,6 @@ def __exit__(self, type_arg, value, tb):
self.commit()
return False

def watch_once(self, key):
"""
Watches the selected key until the next change, then returns the
updated value of the key
"""
full_key = self.key_at(key)
return watch_once(self.fhdl, full_key)


def join(*args):
return ".".join([a for a in args if len(a) > 0])
Expand All @@ -328,31 +302,3 @@ def walk(directory, topdown=False, flux_handle=None):
raise ValueError("If directory is a key, flux_handle must be specified")
directory = KVSDir(flux_handle, directory)
return inner_walk(directory, "", topdown)


@ffi.callback("kvs_set_f")
def kvs_watch_wrapper(key, value, arg, errnum):
(callback, real_arg) = ffi.from_handle(arg)
if errnum == errno.ENOENT:
value = None
else:
value = json.loads(ffi.string(value).decode("utf-8"))
key = ffi.string(key)
ret = callback(key, value, real_arg, errnum)
return ret if ret is not None else 0


KVSWATCHES = {}


def watch(flux_handle, key, fun, arg):
warg = (fun, arg)
KVSWATCHES[key] = warg
return RAW.flux_kvs_watch(
flux_handle, None, key, kvs_watch_wrapper, ffi.new_handle(warg)
)


def unwatch(flux_handle, key):
KVSWATCHES.pop(key, None)
return RAW.flux_kvs_unwatch(flux_handle, key)
225 changes: 0 additions & 225 deletions src/cmd/flux-kvs.c
Expand Up @@ -33,7 +33,6 @@ int cmd_readlink (optparse_t *p, int argc, char **argv);
int cmd_mkdir (optparse_t *p, int argc, char **argv);
int cmd_version (optparse_t *p, int argc, char **argv);
int cmd_wait (optparse_t *p, int argc, char **argv);
int cmd_watch (optparse_t *p, int argc, char **argv);
int cmd_dropcache (optparse_t *p, int argc, char **argv);
int cmd_copy (optparse_t *p, int argc, char **argv);
int cmd_move (optparse_t *p, int argc, char **argv);
Expand Down Expand Up @@ -164,25 +163,6 @@ static struct optparse_option ls_opts[] = {
OPTPARSE_TABLE_END
};

static struct optparse_option watch_opts[] = {
{ .name = "namespace", .key = 'N', .has_arg = 1,
.usage = "Specify KVS namespace to use.",
},
{ .name = "recursive", .key = 'R', .has_arg = 0,
.usage = "Recursively display keys under subdirectories",
},
{ .name = "directory", .key = 'd', .has_arg = 0,
.usage = "List directory entries and not values",
},
{ .name = "current", .key = 'o', .has_arg = 0,
.usage = "Output current value before changes",
},
{ .name = "count", .key = 'c', .has_arg = 1,
.usage = "Display at most count changes",
},
OPTPARSE_TABLE_END
};

static struct optparse_option dropcache_opts[] = {
{ .name = "all", .key = 'a', .has_arg = 0,
.usage = "Drop KVS across all ranks",
Expand Down Expand Up @@ -331,13 +311,6 @@ static struct optparse_subcommand subcommands[] = {
0,
dropcache_opts
},
{ "watch",
"[-N ns] [-R] [-d] [-o] [-c count] key",
"Watch key and output changes",
cmd_watch,
OPTPARSE_OPT_HIDDEN,
watch_opts
},
{ "version",
"[-N ns]",
"Display curent KVS version",
Expand Down Expand Up @@ -1127,204 +1100,6 @@ int cmd_wait (optparse_t *p, int argc, char **argv)
return (0);
}

#define WATCH_DIR_SEPARATOR "======================"

static void watch_dump_key (const char *json_str,
const char *arg,
bool *prev_output_iskey)
{
output_key_json_str (NULL, json_str, arg);
fflush (stdout);
*prev_output_iskey = true;
}

static void watch_dump_kvsdir (flux_kvsdir_t *dir, const char *ns,
bool Ropt, bool dopt, const char *arg) {
if (!dir) {
output_key_json_str (NULL, NULL, arg);
printf ("%s\n", WATCH_DIR_SEPARATOR);
return;
}

dump_kvs_dir (dir, 0, ns, Ropt, dopt);
printf ("%s\n", WATCH_DIR_SEPARATOR);
fflush (stdout);
}

int watch_kvs_lookup_wrapper (flux_t *h, const char *ns, const char *key,
char **valp)
{
flux_future_t *f = NULL;
const char *tmp;
int rc = -1;

if (!(f = flux_kvs_lookup (h, ns, 0, key)))
goto done;
if (flux_kvs_lookup_get (f, &tmp) < 0)
goto done;
if (!(*valp = strdup (tmp))) {
errno = ENOMEM;
goto done;
}
rc = 0;
done:
flux_future_destroy (f);
return rc;
}

int watch_kvs_dir_wrapper (flux_t *h, const char *ns, const char *key,
flux_kvsdir_t **dirp)
{
flux_future_t *f = NULL;
const flux_kvsdir_t *tmp;
int rc = -1;

if (!(f = flux_kvs_lookup (h, ns, FLUX_KVS_READDIR, key)))
goto done;
if (flux_kvs_lookup_get_dir (f, &tmp) < 0)
goto done;
if (!((*dirp) = flux_kvsdir_copy (tmp))) {
goto done;
}
rc = 0;
done:
flux_future_destroy (f);
return rc;
}

int cmd_watch (optparse_t *p, int argc, char **argv)
{
flux_t *h;
flux_kvsdir_t *dir = NULL;
char *json_str = NULL;
char *key;
int count;
const char *ns = NULL;
bool Ropt;
bool dopt;
bool oopt;
bool isdir = false;
bool prev_output_iskey = false;
int optindex;
int rc;

h = (flux_t *)optparse_get_data (p, "flux_handle");

optindex = optparse_option_index (p);

if ((optindex - argc) == 0) {
optparse_print_usage (p);
exit (1);
}
if (optindex != (argc - 1))
log_msg_exit ("watch: specify one key");

ns = optparse_get_str (p, "namespace", NULL);
Ropt = optparse_hasopt (p, "recursive");
dopt = optparse_hasopt (p, "directory");
oopt = optparse_hasopt (p, "current");
count = optparse_get_int (p, "count", -1);

key = argv[optindex];

rc = watch_kvs_lookup_wrapper (h, ns, key, &json_str);
if (rc < 0 && (errno != ENOENT && errno != EISDIR))
log_err_exit ("%s", key);

/* key is a directory, setup for dir logic appropriately */
if (rc < 0 && errno == EISDIR) {
rc = watch_kvs_dir_wrapper (h, ns, key, &dir);
if (rc < 0 && errno != ENOENT)
log_err_exit ("%s", key);
isdir = true;
free (json_str);
json_str = NULL;
}

if (oopt) {
if (isdir)
watch_dump_kvsdir (dir, ns, Ropt, dopt, key);
else
watch_dump_key (json_str, key, &prev_output_iskey);
}

while (count && (rc == 0 || (rc < 0 && errno == ENOENT))) {
if (isdir) {
rc = flux_kvs_watch_once_dir (h, ns, &dir, "%s", key);
if (rc < 0 && (errno != ENOENT && errno != ENOTDIR)) {
printf ("%s: %s\n", key, flux_strerror (errno));
if (dir)
flux_kvsdir_destroy (dir);
dir = NULL;
}
else if (rc < 0 && errno == ENOENT) {
if (dir)
flux_kvsdir_destroy (dir);
dir = NULL;
watch_dump_kvsdir (dir, ns, Ropt, dopt, key);
}
else if (!rc) {
watch_dump_kvsdir (dir, ns, Ropt, dopt, key);
}
else { /* rc < 0 && errno == ENOTDIR */
/* We were watching a dir that is now a key, need to
* reset logic to the 'key' part of this loop */
isdir = false;
if (dir)
flux_kvsdir_destroy (dir);
dir = NULL;

rc = watch_kvs_lookup_wrapper (h, ns, key, &json_str);
if (rc < 0 && errno != ENOENT)
printf ("%s: %s\n", key, flux_strerror (errno));
else
watch_dump_key (json_str, key, &prev_output_iskey);
}
}
else {
rc = flux_kvs_watch_once (h, ns, key, &json_str);
if (rc < 0 && (errno != ENOENT && errno != EISDIR)) {
printf ("%s: %s\n", key, flux_strerror (errno));
free (json_str);
json_str = NULL;
}
else if (rc < 0 && errno == ENOENT) {
free (json_str);
json_str = NULL;
watch_dump_key (NULL, key, &prev_output_iskey);
}
else if (!rc) {
watch_dump_key (json_str, key, &prev_output_iskey);
}
else { /* rc < 0 && errno == EISDIR */
/* We were watching a key that is now a dir. So we
* have to move to the directory branch of this loop.
*/
isdir = true;
free (json_str);
json_str = NULL;

/* Output dir separator from prior key */
if (prev_output_iskey) {
printf ("%s\n", WATCH_DIR_SEPARATOR);
prev_output_iskey = false;
}

rc = watch_kvs_dir_wrapper (h, ns, key, &dir);
if (rc < 0 && errno != ENOENT)
printf ("%s: %s\n", key, flux_strerror (errno));
else /* rc == 0 || (rc < 0 && errno == ENOENT) */
watch_dump_kvsdir (dir, ns, Ropt, dopt, key);
}
}
count--;
}
if (dir)
flux_kvsdir_destroy (dir);
free (json_str);
return (0);
}

int cmd_dropcache (optparse_t *p, int argc, char **argv)
{
flux_t *h;
Expand Down
7 changes: 0 additions & 7 deletions src/common/libkvs/Makefile.am
Expand Up @@ -20,7 +20,6 @@ libkvs_la_SOURCES = \
kvs_dir.c \
kvs_dir_private.h \
kvs_classic.c \
kvs_watch.c \
kvs_commit.c \
kvs_txn.c \
kvs_txn_private.h \
Expand All @@ -36,7 +35,6 @@ fluxcoreinclude_HEADERS = \
kvs_lookup.h \
kvs_getroot.h \
kvs_dir.h \
kvs_watch.h \
kvs_classic.h \
kvs_txn.h \
kvs_commit.h \
Expand All @@ -49,7 +47,6 @@ TESTS = \
test_kvs_lookup.t \
test_kvs_dir.t \
test_kvs_commit.t \
test_kvs_watch.t \
test_kvs_getroot.t \
test_treeobj.t \
test_kvs_eventlog.t \
Expand Down Expand Up @@ -97,10 +94,6 @@ test_kvs_commit_t_SOURCES = test/kvs_commit.c
test_kvs_commit_t_CPPFLAGS = $(test_cppflags)
test_kvs_commit_t_LDADD = $(test_ldadd) $(LIBDL)

test_kvs_watch_t_SOURCES = test/kvs_watch.c
test_kvs_watch_t_CPPFLAGS = $(test_cppflags)
test_kvs_watch_t_LDADD = $(test_ldadd) $(LIBDL)

test_kvs_getroot_t_SOURCES = test/kvs_getroot.c
test_kvs_getroot_t_CPPFLAGS = $(test_cppflags)
test_kvs_getroot_t_LDADD = $(test_ldadd) $(LIBDL)
Expand Down
1 change: 0 additions & 1 deletion src/common/libkvs/kvs.h
Expand Up @@ -17,7 +17,6 @@
#include "kvs_lookup.h"
#include "kvs_getroot.h"
#include "kvs_classic.h"
#include "kvs_watch.h"
#include "kvs_txn.h"
#include "kvs_commit.h"
#include "kvs_eventlog.h"
Expand Down