Skip to content

Commit

Permalink
Merge pull request #1056 from garlick/libflux_cleanup
Browse files Browse the repository at this point in the history
cleanup of flux_msg_copy(), flux_rpc_aux_set() etc. and tests
  • Loading branch information
grondo committed May 11, 2017
2 parents 79e983a + 5a6ca1f commit 5d36372
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 65 deletions.
14 changes: 10 additions & 4 deletions src/broker/content-cache.c
Expand Up @@ -272,7 +272,7 @@ static void remove_entry (content_cache_t *cache, struct cache_entry *e)
static void cache_load_continuation (flux_rpc_t *rpc, void *arg)
{
content_cache_t *cache = arg;
struct cache_entry *e = flux_rpc_aux_get (rpc);
struct cache_entry *e = flux_rpc_aux_get (rpc, "entry");
void *data = NULL;
int len = 0;
int saved_errno;
Expand Down Expand Up @@ -329,7 +329,10 @@ static int cache_load (content_cache_t *cache, struct cache_entry *e)
flux_log_error (cache->h, "%s: RPC", __FUNCTION__);
goto done;
}
flux_rpc_aux_set (rpc, e, NULL);
if (flux_rpc_aux_set (rpc, "entry", e, NULL) < 0) {
flux_log_error (cache->h, "content load flux_rpc_aux_set");
goto done;
}
if (flux_rpc_then (rpc, cache_load_continuation, cache) < 0) {
saved_errno = errno;
flux_log_error (cache->h, "content load");
Expand Down Expand Up @@ -421,7 +424,7 @@ void content_load_request (flux_t *h, flux_msg_handler_t *w,
static void cache_store_continuation (flux_rpc_t *rpc, void *arg)
{
content_cache_t *cache = arg;
struct cache_entry *e = flux_rpc_aux_get (rpc);
struct cache_entry *e = flux_rpc_aux_get (rpc, "entry");
const char *blobref;
int saved_errno = 0;
int rc = -1;
Expand Down Expand Up @@ -491,7 +494,10 @@ static int cache_store (content_cache_t *cache, struct cache_entry *e)
flux_log_error (cache->h, "content store");
goto done;
}
flux_rpc_aux_set (rpc, e, NULL);
if (flux_rpc_aux_set (rpc, "entry", e, NULL) < 0) {
flux_log_error (cache->h, "content store: flux_rpc_aux_set");
goto done;
}
if (flux_rpc_then (rpc, cache_store_continuation, cache) < 0) {
saved_errno = errno;
flux_log_error (cache->h, "content store");
Expand Down
5 changes: 3 additions & 2 deletions src/cmd/flux-ping.c
Expand Up @@ -102,7 +102,7 @@ void ping_continuation (flux_rpc_t *rpc, void *arg)
int64_t sec, nsec;
struct timespec t0;
int seq;
struct ping_data *pdata = flux_rpc_aux_get (rpc);
struct ping_data *pdata = flux_rpc_aux_get (rpc, "ping");
tstat_t *tstat = pdata->tstat;
uint32_t rolemask, userid;

Expand Down Expand Up @@ -181,7 +181,8 @@ void send_ping (struct ping_ctx *ctx)
"pad", ctx->pad);
if (!rpc)
log_err_exit ("flux_rpcf_multi");
flux_rpc_aux_set (rpc, pdata, ping_data_free);
if (flux_rpc_aux_set (rpc, "ping", pdata, ping_data_free) < 0)
log_err_exit ("flux_rpc_aux_set");
if (flux_rpc_then (rpc, ping_continuation, ctx) < 0)
log_err_exit ("flux_rpc_then");

Expand Down
1 change: 1 addition & 0 deletions src/common/libflux/Makefile.am
Expand Up @@ -48,6 +48,7 @@ intree_conf_cppflags = \

fluxcoreinclude_HEADERS = \
flux.h \
types.h \
handle.h \
connector.h \
reactor.h \
Expand Down
2 changes: 1 addition & 1 deletion src/common/libflux/dispatch.c
Expand Up @@ -361,7 +361,7 @@ static void call_handler (flux_msg_handler_t *w, const flux_msg_t *msg)
if (flux_msg_cmp (msg, FLUX_MATCH_REQUEST)
&& flux_msg_get_matchtag (msg, &matchtag) == 0
&& matchtag != FLUX_MATCHTAG_NONE) {
(void)flux_respond (w->d->h, msg, EPERM, NULL);
(void)flux_respond (w->d->h, msg, EPERM, NULL);
}
return;
}
Expand Down
1 change: 1 addition & 0 deletions src/common/libflux/flux.h
@@ -1,6 +1,7 @@
#ifndef _FLUX_CORE_FLUX_H
#define _FLUX_CORE_FLUX_H

#include "types.h"
#include "handle.h"
#include "reactor.h"
#include "dispatch.h"
Expand Down
4 changes: 2 additions & 2 deletions src/common/libflux/handle.h
Expand Up @@ -5,6 +5,7 @@
#include <stdbool.h>
#include <string.h>

#include "types.h"
#include "message.h"

typedef struct flux_handle_struct flux_t;
Expand Down Expand Up @@ -95,11 +96,10 @@ void flux_fatal_error (flux_t *h, const char *fun, const char *msg);
bool flux_fatality (flux_t *h);

/* A mechanism is provide for users to attach auxiliary state to the flux_t
* handle by name. The flux_free_f, if non-NULL, will be called
* handle by name. The destructor, if non-NULL, will be called
* to destroy this state when the handle is destroyed.
* Key names used internally by flux-core are prefixed with "flux::".
*/
typedef void (*flux_free_f)(void *arg);
void *flux_aux_get (flux_t *h, const char *name);
void flux_aux_set (flux_t *h, const char *name, void *aux, flux_free_f destroy);

Expand Down
85 changes: 63 additions & 22 deletions src/common/libflux/message.c
Expand Up @@ -56,6 +56,7 @@ struct flux_msg {
int magic;
zmsg_t *zmsg;
json_t *json;
zhash_t *aux;
};

static int proto_set_bigint (uint8_t *data, int len, uint32_t bigint);
Expand Down Expand Up @@ -262,11 +263,43 @@ void flux_msg_destroy (flux_msg_t *msg)
json_decref (msg->json);
zmsg_destroy (&msg->zmsg);
msg->magic =~ FLUX_MSG_MAGIC;
zhash_destroy (&msg->aux);
free (msg);
errno = saved_errno;
}
}

/* N.B. const attribute of msg argument is defeated internally to
* allow msg to be "annotated" for convenience.
* The message content is otherwise unchanged.
*/
int flux_msg_aux_set (const flux_msg_t *const_msg, const char *name,
void *aux, flux_free_f destroy)
{
flux_msg_t *msg = (flux_msg_t *)const_msg;
if (!msg->aux)
msg->aux = zhash_new ();
if (!msg->aux) {
errno = ENOMEM;
return -1;
}
zhash_delete (msg->aux, name);
if (zhash_insert (msg->aux, name, aux) < 0) {
errno = ENOMEM;
return -1;
}
zhash_freefn (msg->aux, name, destroy);
return 0;
}

void *flux_msg_aux_get (const flux_msg_t *msg, const char *name)
{
if (!msg->aux)
return NULL;
return zhash_lookup (msg->aux, name);

}

size_t flux_msg_encode_size (const flux_msg_t *msg)
{
zframe_t *zf;
Expand Down Expand Up @@ -834,19 +867,6 @@ int flux_msg_get_route_count (const flux_msg_t *msg)
return count;
}

bool flux_msg_has_route (const flux_msg_t *msg, const char *s)
{
zframe_t *zf;

zf = zmsg_first (msg->zmsg);
while (zf && zframe_size (zf) > 0) {
if (zframe_streq (zf, s))
return true;
zf = zmsg_next (msg->zmsg);
}
return false;
}

/* Get sum of size in bytes of route frames
*/
static int flux_msg_get_route_size (const flux_msg_t *msg)
Expand Down Expand Up @@ -1266,20 +1286,41 @@ int flux_msg_get_topic (const flux_msg_t *msg, const char **topic)
return rc;
}

/* FIXME: this function copies payload and then deletes it if 'payload'
* is false, when the point was to avoid the overhead of copying it in
* the first place.
*/
flux_msg_t *flux_msg_copy (const flux_msg_t *msg, bool payload)
{
assert (msg->magic == FLUX_MSG_MAGIC);
flux_msg_t *cpy = calloc (1, sizeof (*cpy));
if (!cpy)
flux_msg_t *cpy = NULL;
zframe_t *zf;
int count;
uint8_t flags;
bool skip_payload = false;

if (msg->magic != FLUX_MSG_MAGIC) {
errno = EINVAL;
goto error;
}
if (flux_msg_get_flags (msg, &flags) < 0)
goto error;
if (!payload && (flags & FLUX_MSGFLAG_PAYLOAD)) {
flags &= ~(FLUX_MSGFLAG_PAYLOAD | FLUX_MSGFLAG_JSON);
skip_payload = true;
}
if (!(cpy = calloc (1, sizeof (*cpy))))
goto nomem;
cpy->magic = FLUX_MSG_MAGIC;
if (!(cpy->zmsg = zmsg_dup (msg->zmsg)))
if (!(cpy->zmsg = zmsg_new ()))
goto nomem;
if (!payload && flux_msg_set_payload (cpy, 0, NULL, 0) < 0)

count = 0;
zf = zmsg_first (msg->zmsg);
while (zf) {
if (!skip_payload || count != zmsg_size (msg->zmsg) - 2) {
if (zmsg_addmem (cpy->zmsg, zframe_data (zf), zframe_size (zf)) < 0)
goto nomem;
}
zf = zmsg_next (msg->zmsg);
count++;
}
if (flux_msg_set_flags (cpy, flags) < 0)
goto error;
return cpy;
nomem:
Expand Down
13 changes: 8 additions & 5 deletions src/common/libflux/message.h
Expand Up @@ -5,6 +5,7 @@
#include <stdint.h>
#include <stdarg.h>
#include <stdio.h>
#include "types.h"
#include "security.h"

typedef struct flux_msg flux_msg_t;
Expand Down Expand Up @@ -69,6 +70,13 @@ struct flux_msg_iobuf {
flux_msg_t *flux_msg_create (int type);
void flux_msg_destroy (flux_msg_t *msg);

/* Access auxiliary data members in Flux message.
* These are for convenience only - they are not sent over the wire.
*/
int flux_msg_aux_set (const flux_msg_t *msg, const char *name,
void *aux, flux_free_f destroy);
void *flux_msg_aux_get (const flux_msg_t *msg, const char *name);

/* Duplicate msg, omitting payload if 'payload' is false.
*/
flux_msg_t *flux_msg_copy (const flux_msg_t *msg, bool payload);
Expand Down Expand Up @@ -299,11 +307,6 @@ int flux_msg_get_route_count (const flux_msg_t *msg);
*/
char *flux_msg_get_route_string (const flux_msg_t *msg);

/* Return true if route stack contains a frame matching 's'
*/
bool flux_msg_has_route (const flux_msg_t *msg, const char *s);


#endif /* !_FLUX_CORE_MESSAGE_H */

/*
Expand Down
44 changes: 22 additions & 22 deletions src/common/libflux/rpc.c
Expand Up @@ -33,6 +33,7 @@
#include <sys/syscall.h>
#endif
#include <jansson.h>
#include <czmq.h>

#include "request.h"
#include "response.h"
Expand All @@ -58,9 +59,8 @@ struct flux_rpc_struct {
int rx_errnum;
int rx_count;
int rx_expected;
void *aux;
zhash_t *aux;
flux_free_f aux_destroy;
const char *type;
int usecount;
};

Expand Down Expand Up @@ -88,8 +88,7 @@ static void flux_rpc_usecount_decr (flux_rpc_t *rpc)
flux_matchtag_free (rpc->h, rpc->m.matchtag);
}
flux_msg_destroy (rpc->rx_msg);
if (rpc->aux && rpc->aux_destroy)
rpc->aux_destroy (rpc->aux);
zhash_destroy (&rpc->aux);
rpc->magic =~ RPC_MAGIC;
free (rpc);
}
Expand Down Expand Up @@ -593,30 +592,31 @@ flux_rpc_t *flux_rpcf_multi (flux_t *h,
return rc;
}

const char *flux_rpc_type_get (flux_rpc_t *rpc)
{
return rpc->type;
}

void flux_rpc_type_set (flux_rpc_t *rpc, const char *type)
{
assert (rpc->magic == RPC_MAGIC);
rpc->type = type;
}

void *flux_rpc_aux_get (flux_rpc_t *rpc)
void *flux_rpc_aux_get (flux_rpc_t *rpc, const char *name)
{
assert (rpc->magic == RPC_MAGIC);
return rpc->aux;
if (!rpc->aux)
return NULL;
return zhash_lookup (rpc->aux, name);
}

void flux_rpc_aux_set (flux_rpc_t *rpc, void *aux, flux_free_f destroy)
int flux_rpc_aux_set (flux_rpc_t *rpc, const char *name,
void *aux, flux_free_f destroy)
{
assert (rpc->magic == RPC_MAGIC);
if (rpc->aux && rpc->aux_destroy)
rpc->aux_destroy (rpc->aux);
rpc->aux = aux;
rpc->aux_destroy = destroy;
if (!rpc->aux)
rpc->aux = zhash_new ();
if (!rpc->aux) {
errno = ENOMEM;
return -1;
}
zhash_delete (rpc->aux, name);
if (zhash_insert (rpc->aux, name, aux) < 0) {
errno = ENOMEM;
return -1;
}
zhash_freefn (rpc->aux, name, destroy);
return 0;
}

/*
Expand Down
7 changes: 3 additions & 4 deletions src/common/libflux/rpc.h
Expand Up @@ -82,10 +82,9 @@ int flux_rpc_next (flux_rpc_t *rpc);

/* Helper functions for extending flux_rpc_t.
*/
const char *flux_rpc_type_get (flux_rpc_t *rpc);
void flux_rpc_type_set (flux_rpc_t *rpc, const char *type);
void *flux_rpc_aux_get (flux_rpc_t *rpc);
void flux_rpc_aux_set (flux_rpc_t *rpc, void *aux, flux_free_f destroy);
void *flux_rpc_aux_get (flux_rpc_t *rpc, const char *name);
int flux_rpc_aux_set (flux_rpc_t *rpc, const char *name,
void *aux, flux_free_f destroy);

/* Variants of flux_rpc, flux_rpc_multi, and flux_rpc_get that
* encode/decode json payloads using jansson pack/unpack format
Expand Down

0 comments on commit 5d36372

Please sign in to comment.