Skip to content

Commit

Permalink
modules/kvs: Remove kvs.watch & kvs.unwatch
Browse files Browse the repository at this point in the history
Closes #1980
  • Loading branch information
chu11 committed Feb 14, 2019
1 parent 7457f32 commit 86572ad
Showing 1 changed file with 0 additions and 305 deletions.
305 changes: 0 additions & 305 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1572,307 +1572,6 @@ static void lookup_plus_request_cb (flux_t *h, flux_msg_handler_t *mh,
}


static void watch_request_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
kvs_ctx_t *ctx = arg;
json_t *oval = NULL;
json_t *val = NULL;
flux_msg_t *cpy = NULL;
struct kvsroot *root = NULL;
const char *key;
const char *ns;
int flags;
lookup_t *lh = NULL;
wait_t *wait = NULL;
wait_t *watcher = NULL;
bool isreplay = false;
bool out = false;
lookup_process_t lret;
int rc = -1;
int saved_errno, ret;

/* if lookup_handle exists in msg as aux data, is a replay */
lh = flux_msg_aux_get (msg, "lookup_handle");
if (!lh) {
uint32_t rolemask, userid;

if (flux_request_unpack (msg, NULL, "{ s:s s:s s:o s:i }",
"key", &key,
"namespace", &ns,
"val", &oval,
"flags", &flags) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
goto done;
}

if (get_msg_cred (ctx, msg, &rolemask, &userid) < 0)
goto done;

if (!(lh = lookup_create (ctx->cache,
ctx->krm,
ctx->epoch,
ns,
NULL,
0,
key,
rolemask,
userid,
flags,
h)))
goto done;
}
else {
int err;

/* error in prior load(), waited for in flight rpcs to complete */
if ((err = lookup_get_aux_errnum (lh))) {
errno = err;
goto done;
}

ret = lookup_set_current_epoch (lh, ctx->epoch);
assert (ret == 0);

isreplay = true;
}

lret = lookup (lh);
if (lret == LOOKUP_PROCESS_ERROR) {
errno = lookup_get_errnum (lh);
goto done;
}
else if (lret == LOOKUP_PROCESS_LOAD_MISSING_NAMESPACE) {
bool stall = false;
struct kvsroot *root;

ns = lookup_missing_namespace (lh);
assert (ns);

root = getroot (ctx, ns, mh, msg, lh, lookup_request_cb,
&stall);
assert (!root);

if (stall)
goto stall;
goto done;
}
else if (lret == LOOKUP_PROCESS_LOAD_MISSING_REFS) {
struct kvs_cb_data cbd;

if (!(wait = wait_create_msg_handler (h, mh, msg, ctx,
watch_request_cb)))
goto done;

if (wait_set_error_cb (wait, lookup_wait_error_cb, lh) < 0)
goto done;

/* do not destroy lookup_handle on message destruction, we
* manage it in here */
if (wait_msg_aux_set (wait, "lookup_handle", lh, NULL) < 0)
goto done;

cbd.ctx = ctx;
cbd.wait = wait;
cbd.errnum = 0;

if (lookup_iter_missing_refs (lh, lookup_load_cb, &cbd) < 0) {
/* rpcs already in flight, stall for them to complete */
if (wait_get_usecount (wait) > 0) {
lookup_set_aux_errnum (lh, cbd.errnum);
goto stall;
}

errno = cbd.errnum;
goto done;
}

assert (wait_get_usecount (wait) > 0);
goto stall;
}
/* else lret == LOOKUP_PROCESS_FINISHED, fallthrough */

val = lookup_get_value (lh);

/* if no value, create json null object for remainder of code */
if (!val) {
if (!(val = json_null ())) {
errno = ENOMEM;
goto done;
}
}

/* we didn't initialize these values on a replay, get them */
if (isreplay) {
if (flux_request_unpack (msg, NULL, "{ s:s s:s s:o s:i }",
"key", &key,
"namespace", &ns,
"val", &oval,
"flags", &flags) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
goto done;
}
}

/* Value changed or this is the initial request, so there will be
* a reply.
*/
if ((flags & KVS_WATCH_FIRST) || !json_equal (val, oval))
out = true;

/* No reply sent or this is a multi-response watch request.
* Arrange to wait on root->watchlist for each new transaction.
* Reconstruct the payload with 'first' flag clear, and updated
* value.
*/
if (!out || !(flags & KVS_WATCH_ONCE)) {
ns = lookup_get_namespace (lh);
assert (ns);

/* If lookup() succeeded, then namespace must still be valid */

root = kvsroot_mgr_lookup_root_safe (ctx->krm, ns);
assert (root);

if (!(cpy = flux_msg_copy (msg, false)))
goto done;

if (flux_msg_pack (cpy, "{ s:s s:s s:O s:i }",
"key", key,
"namespace", ns,
"val", val,
"flags", flags & ~KVS_WATCH_FIRST) < 0) {
flux_log_error (h, "%s: flux_msg_pack", __FUNCTION__);
goto done;
}

if (!(watcher = wait_create_msg_handler (h, mh, cpy, ctx,
watch_request_cb)))
goto done;
if (wait_addqueue (root->watchlist, watcher) < 0) {
saved_errno = errno;
wait_destroy (watcher);
errno = saved_errno;
goto done;
}
}

if (out) {
if (flux_respond_pack (h, msg, "{ s:O }", "val", val) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto done;
}
}
rc = 0;
done:
if (rc < 0) {
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
}
wait_destroy (wait);
lookup_destroy (lh);
stall:
flux_msg_destroy (cpy);
json_decref (val);
}

typedef struct {
char *key;
char *sender;
} unwatch_param_t;

static bool unwatch_cmp (const flux_msg_t *msg, void *arg)
{
unwatch_param_t *p = arg;
char *sender = NULL;
char *normkey = NULL;
json_t *val;
const char *key, *topic;
int flags;
bool match = false;

if (flux_request_unpack (msg, &topic, "{ s:s s:o s:i }",
"key", &key,
"val", &val,
"flags", &flags) < 0)
goto done;
if (strcmp (topic, "kvs.watch") != 0)
goto done;
if (flux_msg_get_route_first (msg, &sender) < 0)
goto done;
if (strcmp (sender, p->sender) != 0)
goto done;
if (!(normkey = kvs_util_normalize_key (key, NULL)))
goto done;
if (strcmp (p->key, normkey) != 0)
goto done;
match = true;
done:
free (sender);
free (normkey);
return match;
}

static void unwatch_request_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
kvs_ctx_t *ctx = arg;
struct kvsroot *root;
const char *ns;
const char *key;
unwatch_param_t p = { NULL, NULL };
int errnum = 0;

if (flux_request_unpack (msg, NULL, "{ s:s s:s }",
"namespace", &ns,
"key", &key) < 0) {
errnum = errno;
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
goto done;
}

/* if root not initialized, success automatically
* - any lingering watches on a namespace that is in the process
* of removal will be cleaned up through other means.
*/
if (!(root = kvsroot_mgr_lookup_root_safe (ctx->krm, ns)))
goto done;

if (check_user (ctx, root, msg) < 0)
goto done;

if (!(p.key = kvs_util_normalize_key (key, NULL))) {
errnum = errno;
goto done;
}
if (flux_msg_get_route_first (msg, &p.sender) < 0) {
errnum = errno;
goto done;
}
/* N.B. impossible for a watch to be on watchlist and cache waiter
* at the same time (i.e. on watchlist means we're watching, if on
* cache waiter we're not done processing towards being on the
* watchlist). So if wait_destroy_msg() on the waitlist succeeds
* but cache_wait_destroy_msg() fails, it's not that big of a
* deal. The current state is still maintained.
*/
if (wait_destroy_msg (root->watchlist, unwatch_cmp, &p) < 0) {
errnum = errno;
flux_log_error (h, "%s: wait_destroy_msg", __FUNCTION__);
goto done;
}
if (cache_wait_destroy_msg (ctx->cache, unwatch_cmp, &p) < 0) {
errnum = errno;
flux_log_error (h, "%s: cache_wait_destroy_msg", __FUNCTION__);
goto done;
}
done:
if (flux_respond (h, msg, errnum, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
free (p.key);
free (p.sender);
}

static int finalize_transaction_req (treq_t *tr,
const flux_msg_t *req,
void *data)
Expand Down Expand Up @@ -3184,16 +2883,12 @@ static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "kvs.dropcache", dropcache_event_cb, 0 },
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs.disconnect", disconnect_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs.unwatch",
unwatch_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.sync",
sync_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.lookup",
lookup_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.lookup-plus",
lookup_plus_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.watch",
watch_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.commit",
commit_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.relaycommit", relaycommit_request_cb, 0 },
Expand Down

0 comments on commit 86572ad

Please sign in to comment.