diff --git a/src/broker/content-cache.c b/src/broker/content-cache.c index 64112c7bdc34..f0c4165e7316 100644 --- a/src/broker/content-cache.c +++ b/src/broker/content-cache.c @@ -272,7 +272,7 @@ static void remove_entry (content_cache_t *cache, struct cache_entry *e) static void cache_load_continuation (flux_rpc_t *rpc, void *arg) { content_cache_t *cache = arg; - struct cache_entry *e = flux_rpc_aux_get (rpc); + struct cache_entry *e = flux_rpc_aux_get (rpc, "entry"); void *data = NULL; int len = 0; int saved_errno; @@ -329,7 +329,10 @@ static int cache_load (content_cache_t *cache, struct cache_entry *e) flux_log_error (cache->h, "%s: RPC", __FUNCTION__); goto done; } - flux_rpc_aux_set (rpc, e, NULL); + if (flux_rpc_aux_set (rpc, "entry", e, NULL) < 0) { + flux_log_error (cache->h, "content load flux_rpc_aux_set"); + goto done; + } if (flux_rpc_then (rpc, cache_load_continuation, cache) < 0) { saved_errno = errno; flux_log_error (cache->h, "content load"); @@ -421,7 +424,7 @@ void content_load_request (flux_t *h, flux_msg_handler_t *w, static void cache_store_continuation (flux_rpc_t *rpc, void *arg) { content_cache_t *cache = arg; - struct cache_entry *e = flux_rpc_aux_get (rpc); + struct cache_entry *e = flux_rpc_aux_get (rpc, "entry"); const char *blobref; int saved_errno = 0; int rc = -1; @@ -491,7 +494,10 @@ static int cache_store (content_cache_t *cache, struct cache_entry *e) flux_log_error (cache->h, "content store"); goto done; } - flux_rpc_aux_set (rpc, e, NULL); + if (flux_rpc_aux_set (rpc, "entry", e, NULL) < 0) { + flux_log_error (cache->h, "content store: flux_rpc_aux_set"); + goto done; + } if (flux_rpc_then (rpc, cache_store_continuation, cache) < 0) { saved_errno = errno; flux_log_error (cache->h, "content store"); diff --git a/src/cmd/flux-ping.c b/src/cmd/flux-ping.c index 51bc774dac2c..136f3d2d4dac 100644 --- a/src/cmd/flux-ping.c +++ b/src/cmd/flux-ping.c @@ -102,7 +102,7 @@ void ping_continuation (flux_rpc_t *rpc, void *arg) int64_t sec, nsec; struct timespec t0; int seq; - struct ping_data *pdata = flux_rpc_aux_get (rpc); + struct ping_data *pdata = flux_rpc_aux_get (rpc, "ping"); tstat_t *tstat = pdata->tstat; uint32_t rolemask, userid; @@ -181,7 +181,8 @@ void send_ping (struct ping_ctx *ctx) "pad", ctx->pad); if (!rpc) log_err_exit ("flux_rpcf_multi"); - flux_rpc_aux_set (rpc, pdata, ping_data_free); + if (flux_rpc_aux_set (rpc, "ping", pdata, ping_data_free) < 0) + log_err_exit ("flux_rpc_aux_set"); if (flux_rpc_then (rpc, ping_continuation, ctx) < 0) log_err_exit ("flux_rpc_then"); diff --git a/src/common/libflux/Makefile.am b/src/common/libflux/Makefile.am index a4df52654015..1dd0dc360cc3 100644 --- a/src/common/libflux/Makefile.am +++ b/src/common/libflux/Makefile.am @@ -48,6 +48,7 @@ intree_conf_cppflags = \ fluxcoreinclude_HEADERS = \ flux.h \ + types.h \ handle.h \ connector.h \ reactor.h \ diff --git a/src/common/libflux/dispatch.c b/src/common/libflux/dispatch.c index 2820c09c27c0..573eabfb2ed2 100644 --- a/src/common/libflux/dispatch.c +++ b/src/common/libflux/dispatch.c @@ -361,7 +361,7 @@ static void call_handler (flux_msg_handler_t *w, const flux_msg_t *msg) if (flux_msg_cmp (msg, FLUX_MATCH_REQUEST) && flux_msg_get_matchtag (msg, &matchtag) == 0 && matchtag != FLUX_MATCHTAG_NONE) { - (void)flux_respond (w->d->h, msg, EPERM, NULL); + (void)flux_respond (w->d->h, msg, EPERM, NULL); } return; } diff --git a/src/common/libflux/flux.h b/src/common/libflux/flux.h index ce5c66ca6f35..bb3549fcf49f 100644 --- a/src/common/libflux/flux.h +++ b/src/common/libflux/flux.h @@ -1,6 +1,7 @@ #ifndef _FLUX_CORE_FLUX_H #define _FLUX_CORE_FLUX_H +#include "types.h" #include "handle.h" #include "reactor.h" #include "dispatch.h" diff --git a/src/common/libflux/handle.h b/src/common/libflux/handle.h index b1f48870eb2c..e515d184fd45 100644 --- a/src/common/libflux/handle.h +++ b/src/common/libflux/handle.h @@ -5,6 +5,7 @@ #include #include +#include "types.h" #include "message.h" typedef struct flux_handle_struct flux_t; @@ -95,11 +96,10 @@ void flux_fatal_error (flux_t *h, const char *fun, const char *msg); bool flux_fatality (flux_t *h); /* A mechanism is provide for users to attach auxiliary state to the flux_t - * handle by name. The flux_free_f, if non-NULL, will be called + * handle by name. The destructor, if non-NULL, will be called * to destroy this state when the handle is destroyed. * Key names used internally by flux-core are prefixed with "flux::". */ -typedef void (*flux_free_f)(void *arg); void *flux_aux_get (flux_t *h, const char *name); void flux_aux_set (flux_t *h, const char *name, void *aux, flux_free_f destroy); diff --git a/src/common/libflux/message.c b/src/common/libflux/message.c index 91fe00c662c7..63df1e6759eb 100644 --- a/src/common/libflux/message.c +++ b/src/common/libflux/message.c @@ -56,6 +56,7 @@ struct flux_msg { int magic; zmsg_t *zmsg; json_t *json; + zhash_t *aux; }; static int proto_set_bigint (uint8_t *data, int len, uint32_t bigint); @@ -262,11 +263,43 @@ void flux_msg_destroy (flux_msg_t *msg) json_decref (msg->json); zmsg_destroy (&msg->zmsg); msg->magic =~ FLUX_MSG_MAGIC; + zhash_destroy (&msg->aux); free (msg); errno = saved_errno; } } +/* N.B. const attribute of msg argument is defeated internally to + * allow msg to be "annotated" for convenience. + * The message content is otherwise unchanged. + */ +int flux_msg_aux_set (const flux_msg_t *const_msg, const char *name, + void *aux, flux_free_f destroy) +{ + flux_msg_t *msg = (flux_msg_t *)const_msg; + if (!msg->aux) + msg->aux = zhash_new (); + if (!msg->aux) { + errno = ENOMEM; + return -1; + } + zhash_delete (msg->aux, name); + if (zhash_insert (msg->aux, name, aux) < 0) { + errno = ENOMEM; + return -1; + } + zhash_freefn (msg->aux, name, destroy); + return 0; +} + +void *flux_msg_aux_get (const flux_msg_t *msg, const char *name) +{ + if (!msg->aux) + return NULL; + return zhash_lookup (msg->aux, name); + +} + size_t flux_msg_encode_size (const flux_msg_t *msg) { zframe_t *zf; @@ -834,19 +867,6 @@ int flux_msg_get_route_count (const flux_msg_t *msg) return count; } -bool flux_msg_has_route (const flux_msg_t *msg, const char *s) -{ - zframe_t *zf; - - zf = zmsg_first (msg->zmsg); - while (zf && zframe_size (zf) > 0) { - if (zframe_streq (zf, s)) - return true; - zf = zmsg_next (msg->zmsg); - } - return false; -} - /* Get sum of size in bytes of route frames */ static int flux_msg_get_route_size (const flux_msg_t *msg) @@ -1266,20 +1286,41 @@ int flux_msg_get_topic (const flux_msg_t *msg, const char **topic) return rc; } -/* FIXME: this function copies payload and then deletes it if 'payload' - * is false, when the point was to avoid the overhead of copying it in - * the first place. - */ flux_msg_t *flux_msg_copy (const flux_msg_t *msg, bool payload) { - assert (msg->magic == FLUX_MSG_MAGIC); - flux_msg_t *cpy = calloc (1, sizeof (*cpy)); - if (!cpy) + flux_msg_t *cpy = NULL; + zframe_t *zf; + int count; + uint8_t flags; + bool skip_payload = false; + + if (msg->magic != FLUX_MSG_MAGIC) { + errno = EINVAL; + goto error; + } + if (flux_msg_get_flags (msg, &flags) < 0) + goto error; + if (!payload && (flags & FLUX_MSGFLAG_PAYLOAD)) { + flags &= ~(FLUX_MSGFLAG_PAYLOAD | FLUX_MSGFLAG_JSON); + skip_payload = true; + } + if (!(cpy = calloc (1, sizeof (*cpy)))) goto nomem; cpy->magic = FLUX_MSG_MAGIC; - if (!(cpy->zmsg = zmsg_dup (msg->zmsg))) + if (!(cpy->zmsg = zmsg_new ())) goto nomem; - if (!payload && flux_msg_set_payload (cpy, 0, NULL, 0) < 0) + + count = 0; + zf = zmsg_first (msg->zmsg); + while (zf) { + if (!skip_payload || count != zmsg_size (msg->zmsg) - 2) { + if (zmsg_addmem (cpy->zmsg, zframe_data (zf), zframe_size (zf)) < 0) + goto nomem; + } + zf = zmsg_next (msg->zmsg); + count++; + } + if (flux_msg_set_flags (cpy, flags) < 0) goto error; return cpy; nomem: diff --git a/src/common/libflux/message.h b/src/common/libflux/message.h index d02168dadbfa..5240c0311c6e 100644 --- a/src/common/libflux/message.h +++ b/src/common/libflux/message.h @@ -5,6 +5,7 @@ #include #include #include +#include "types.h" #include "security.h" typedef struct flux_msg flux_msg_t; @@ -69,6 +70,13 @@ struct flux_msg_iobuf { flux_msg_t *flux_msg_create (int type); void flux_msg_destroy (flux_msg_t *msg); +/* Access auxiliary data members in Flux message. + * These are for convenience only - they are not sent over the wire. + */ +int flux_msg_aux_set (const flux_msg_t *msg, const char *name, + void *aux, flux_free_f destroy); +void *flux_msg_aux_get (const flux_msg_t *msg, const char *name); + /* Duplicate msg, omitting payload if 'payload' is false. */ flux_msg_t *flux_msg_copy (const flux_msg_t *msg, bool payload); @@ -299,11 +307,6 @@ int flux_msg_get_route_count (const flux_msg_t *msg); */ char *flux_msg_get_route_string (const flux_msg_t *msg); -/* Return true if route stack contains a frame matching 's' - */ -bool flux_msg_has_route (const flux_msg_t *msg, const char *s); - - #endif /* !_FLUX_CORE_MESSAGE_H */ /* diff --git a/src/common/libflux/rpc.c b/src/common/libflux/rpc.c index e4022e0e5a02..9befc7e8ff91 100644 --- a/src/common/libflux/rpc.c +++ b/src/common/libflux/rpc.c @@ -33,6 +33,7 @@ #include #endif #include +#include #include "request.h" #include "response.h" @@ -58,9 +59,8 @@ struct flux_rpc_struct { int rx_errnum; int rx_count; int rx_expected; - void *aux; + zhash_t *aux; flux_free_f aux_destroy; - const char *type; int usecount; }; @@ -88,8 +88,7 @@ static void flux_rpc_usecount_decr (flux_rpc_t *rpc) flux_matchtag_free (rpc->h, rpc->m.matchtag); } flux_msg_destroy (rpc->rx_msg); - if (rpc->aux && rpc->aux_destroy) - rpc->aux_destroy (rpc->aux); + zhash_destroy (&rpc->aux); rpc->magic =~ RPC_MAGIC; free (rpc); } @@ -593,30 +592,31 @@ flux_rpc_t *flux_rpcf_multi (flux_t *h, return rc; } -const char *flux_rpc_type_get (flux_rpc_t *rpc) -{ - return rpc->type; -} - -void flux_rpc_type_set (flux_rpc_t *rpc, const char *type) -{ - assert (rpc->magic == RPC_MAGIC); - rpc->type = type; -} - -void *flux_rpc_aux_get (flux_rpc_t *rpc) +void *flux_rpc_aux_get (flux_rpc_t *rpc, const char *name) { assert (rpc->magic == RPC_MAGIC); - return rpc->aux; + if (!rpc->aux) + return NULL; + return zhash_lookup (rpc->aux, name); } -void flux_rpc_aux_set (flux_rpc_t *rpc, void *aux, flux_free_f destroy) +int flux_rpc_aux_set (flux_rpc_t *rpc, const char *name, + void *aux, flux_free_f destroy) { assert (rpc->magic == RPC_MAGIC); - if (rpc->aux && rpc->aux_destroy) - rpc->aux_destroy (rpc->aux); - rpc->aux = aux; - rpc->aux_destroy = destroy; + if (!rpc->aux) + rpc->aux = zhash_new (); + if (!rpc->aux) { + errno = ENOMEM; + return -1; + } + zhash_delete (rpc->aux, name); + if (zhash_insert (rpc->aux, name, aux) < 0) { + errno = ENOMEM; + return -1; + } + zhash_freefn (rpc->aux, name, destroy); + return 0; } /* diff --git a/src/common/libflux/rpc.h b/src/common/libflux/rpc.h index bd87b679520a..d9873920e346 100644 --- a/src/common/libflux/rpc.h +++ b/src/common/libflux/rpc.h @@ -82,10 +82,9 @@ int flux_rpc_next (flux_rpc_t *rpc); /* Helper functions for extending flux_rpc_t. */ -const char *flux_rpc_type_get (flux_rpc_t *rpc); -void flux_rpc_type_set (flux_rpc_t *rpc, const char *type); -void *flux_rpc_aux_get (flux_rpc_t *rpc); -void flux_rpc_aux_set (flux_rpc_t *rpc, void *aux, flux_free_f destroy); +void *flux_rpc_aux_get (flux_rpc_t *rpc, const char *name); +int flux_rpc_aux_set (flux_rpc_t *rpc, const char *name, + void *aux, flux_free_f destroy); /* Variants of flux_rpc, flux_rpc_multi, and flux_rpc_get that * encode/decode json payloads using jansson pack/unpack format diff --git a/src/common/libflux/test/message.c b/src/common/libflux/test/message.c index 48f43a982861..d650f56bf8e1 100644 --- a/src/common/libflux/test/message.c +++ b/src/common/libflux/test/message.c @@ -3,6 +3,7 @@ #endif #include #include +#include #include "src/common/libflux/message.h" #include "src/common/libtap/tap.h" @@ -135,6 +136,7 @@ void check_payload_json (void) /* RFC 3 - json payload must be an object * Encoding should return EINVAL. */ +/* XXX */ errno = 0; ok (flux_msg_set_json (msg, "[1,2,3]") < 0 && errno == EINVAL, "flux_msg_set_json array fails with EINVAL"); @@ -578,6 +580,131 @@ void check_sendzsock (void) zsock_destroy (&zsock[1]); } +void *myfree_arg = NULL; +void myfree (void *arg) +{ + myfree_arg = arg; +} + +void check_aux (void) +{ + flux_msg_t *msg; + char *test_data = "Hello"; + + ok ((msg = flux_msg_create (FLUX_MSGTYPE_REQUEST)) != NULL, + "created test message"); + ok (flux_msg_aux_set (msg, "test", test_data, myfree) == 0, + "hang aux data member on message with destructor"); + ok (flux_msg_aux_get (msg, "incorrect") == NULL, + "flux_msg_aux_get for unknown key returns NULL"); + ok (flux_msg_aux_get (msg, "test") == test_data, + "flux_msg_aux_get aux data memeber key returns orig pointer"); + flux_msg_destroy (msg); + ok (myfree_arg == test_data, + "destroyed message and aux destructor was called"); +} + +void check_copy (void) +{ + flux_msg_t *msg, *cpy; + int type; + const char *topic; + int cpylen, flags; + char buf[] = "xxxxxxxxxxxxxxxxxx"; + char *cpybuf; + + ok ((msg = flux_msg_create (FLUX_MSGTYPE_KEEPALIVE)) != NULL, + "created no-payload keepalive"); + ok ((cpy = flux_msg_copy (msg, true)) != NULL, + "flux_msg_copy works"); + flux_msg_destroy (msg); + type = -1; + ok (flux_msg_get_type (cpy, &type) == 0 && type == FLUX_MSGTYPE_KEEPALIVE + && !flux_msg_has_payload (cpy) + && flux_msg_get_route_count (cpy) < 0 + && flux_msg_get_topic (cpy, &topic) < 0, + "copy is keepalive: no routes, topic, or payload"); + flux_msg_destroy (cpy); + + ok ((msg = flux_msg_create (FLUX_MSGTYPE_REQUEST)) != NULL, + "created request"); + ok (flux_msg_enable_route (msg) == 0, + "added route delim"); + ok (flux_msg_set_topic (msg, "foo") == 0, + "set topic string"); + ok (flux_msg_set_payload (msg, 0, buf, sizeof (buf)) == 0, + "added payload"); + ok ((cpy = flux_msg_copy (msg, true)) != NULL, + "flux_msg_copy works"); + type = -1; + ok (flux_msg_get_type (cpy, &type) == 0 && type == FLUX_MSGTYPE_REQUEST + && flux_msg_has_payload (cpy) + && flux_msg_get_payload (cpy, &flags, &cpybuf, &cpylen) == 0 + && cpylen == sizeof (buf) && memcmp (cpybuf, buf, cpylen) == 0 + && flux_msg_get_route_count (cpy) == 0 + && flux_msg_get_topic (cpy, &topic) == 0 && !strcmp (topic,"foo"), + "copy is request: w/route delim, topic, and payload"); + flux_msg_destroy (cpy); + + ok ((cpy = flux_msg_copy (msg, false)) != NULL, + "flux_msg_copy works (payload=false)"); + type = -1; + ok (flux_msg_get_type (cpy, &type) == 0 && type == FLUX_MSGTYPE_REQUEST + && !flux_msg_has_payload (cpy) + && flux_msg_get_route_count (cpy) == 0 + && flux_msg_get_topic (cpy, &topic) == 0 && !strcmp (topic,"foo"), + "copy is request: w/route delim, topic, and no payload"); + flux_msg_destroy (cpy); + flux_msg_destroy (msg); +} + +void check_print (void) +{ + flux_msg_t *msg; + char buf[] = "xxxxxxxx"; + FILE *f = fopen ("/dev/null", "w"); + if (!f) + BAIL_OUT ("cannot open /dev/null for writing"); + + ok ((msg = flux_msg_create (FLUX_MSGTYPE_KEEPALIVE)) != NULL, + "created test message"); + lives_ok ({flux_msg_fprint (f, msg);}, + "flux_msg_fprint doesn't segfault on keepalive"); + flux_msg_destroy (msg); + + ok ((msg = flux_msg_create (FLUX_MSGTYPE_EVENT)) != NULL, + "created test message"); + ok (flux_msg_set_topic (msg, "foo.bar") == 0, + "set topic string"); + lives_ok ({flux_msg_fprint (f, msg);}, + "flux_msg_fprint doesn't segfault on event with topic"); + flux_msg_destroy (msg); + + ok ((msg = flux_msg_create (FLUX_MSGTYPE_REQUEST)) != NULL, + "created test message"); + ok (flux_msg_set_topic (msg, "foo.bar") == 0, + "set topic string"); + ok (flux_msg_enable_route (msg) == 0, + "enabled routing"); + ok (flux_msg_push_route (msg, "id1") == 0, + "added one route"); + ok (flux_msg_set_payload (msg, 0, buf, strlen (buf)) == 0, + "added payload"); + lives_ok ({flux_msg_fprint (f, msg);}, + "flux_msg_fprint doesn't segfault on fully loaded request"); + flux_msg_destroy (msg); + + ok ((msg = flux_msg_create (FLUX_MSGTYPE_RESPONSE)) != NULL, + "created test message"); + ok (flux_msg_enable_route (msg) == 0, + "enabled routing"); + lives_ok ({flux_msg_fprint (f, msg);}, + "flux_msg_fprint doesn't segfault on response with empty route stack"); + flux_msg_destroy (msg); + + fclose (f); +} + int main (int argc, char *argv[]) { plan (NO_PLAN); @@ -590,6 +717,8 @@ int main (int argc, char *argv[]) check_payload_json_formatted (); check_matchtag (); check_security (); + check_aux (); + check_copy (); check_cmp (); @@ -597,6 +726,8 @@ int main (int argc, char *argv[]) check_sendfd (); check_sendzsock (); + //check_print (); + done_testing(); return (0); } diff --git a/src/common/libflux/types.h b/src/common/libflux/types.h new file mode 100644 index 000000000000..046791d202e5 --- /dev/null +++ b/src/common/libflux/types.h @@ -0,0 +1,10 @@ +#ifndef _FLUX_CORE_TYPES_H +#define _FLUX_CORE_TYPES_H + +typedef void (*flux_free_f)(void *arg); + +#endif /* !_FLUX_CORE_TYPES_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 469406462870..461f37b7fa30 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -189,7 +189,7 @@ static void content_load_completion (flux_rpc_t *rpc, void *arg) flux_log_error (ctx->h, "%s", __FUNCTION__); goto done; } - blobref = flux_rpc_aux_get (rpc); + blobref = flux_rpc_aux_get (rpc, "ref"); if (!(o = json_tokener_parse_ex (ctx->tok, (char *)data, size))) { errno = EPROTO; flux_log_error (ctx->h, "%s", __FUNCTION__); @@ -217,7 +217,8 @@ static int content_load_request_send (kvs_ctx_t *ctx, const href_t ref, bool now if (!(rpc = flux_rpc_raw (ctx->h, "content.load", ref, strlen (ref) + 1, FLUX_NODEID_ANY, 0))) goto error; - flux_rpc_aux_set (rpc, xstrdup (ref), free); + if (flux_rpc_aux_set (rpc, "ref", xstrdup (ref), free) < 0) + goto error; if (now) { content_load_completion (rpc, ctx); } else if (flux_rpc_then (rpc, content_load_completion, ctx) < 0) { diff --git a/t/loop/rpc.c b/t/loop/rpc.c index 15d87808de5a..faa5a9a089ff 100644 --- a/t/loop/rpc.c +++ b/t/loop/rpc.c @@ -108,26 +108,41 @@ void rpcftest_hello_cb (flux_t *h, flux_msg_handler_t *w, (void)flux_respondf (h, msg, "{}"); } +void *auxfree_arg = NULL; +void auxfree (void *arg) +{ + auxfree_arg = arg; +} + void rpctest_begin_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { json_object *o; const char *json_str; flux_rpc_t *r; + char *aux_data = "Hello"; errno = 0; ok (!(r = flux_rpc (h, NULL, NULL, FLUX_NODEID_ANY, 0)) && errno == EINVAL, "flux_rpc with NULL topic fails with EINVAL"); - /* working no-payload RPC */ + /* working no-payload RPC (with aux data) */ ok ((r = flux_rpc (h, "rpctest.hello", NULL, FLUX_NODEID_ANY, 0)) != NULL, "flux_rpc with no payload when none is expected works"); + ok (flux_rpc_aux_set (r, "test", aux_data, auxfree) == 0, + "flux_rpc_aux_set works"); + ok (flux_rpc_aux_get (r, "wrong") == NULL, + "flux_rpc_aux_get on wrong key returns NULL"); + ok (flux_rpc_aux_get (r, "test") == aux_data, + "flux_rpc_aux_get on right key returns orig pointer"); ok (flux_rpc_check (r) == false, "flux_rpc_check says get would block"); ok (flux_rpc_get (r, NULL) == 0, "flux_rpc_get works"); flux_rpc_destroy (r); + ok (auxfree_arg == aux_data, + "destroyed rpc and aux destructor was called with correct arg"); /* cause remote EPROTO (unexpected payload) - will be picked up in _get() */ ok ((r = flux_rpc (h, "rpctest.hello", "{}", FLUX_NODEID_ANY, 0)) != NULL,