From cd0cbfb349fbbe23f2bb840c3d58f27ec34e4849 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 31 May 2021 21:42:34 -0700 Subject: [PATCH] libflux: use internal implementation for flux msg Problem: The flux_msg_t data structure stores all internal data within a zmsg_t data structure. This leads to libflux always requiring libczmq. Solution: Remove use of zmsg_t when storing data internally within flux_msg_t, instead using lists, memory allocations, etc. Adjust all API functions accordingly. In the few functions that still require zmsg, convert flux_msg_t to/from zmsg_t as needed. This leads libflux to still require libczmq for the time being, but decreases the footprint of functions that need to be migrated out of libflux-core. --- src/common/libflux/Makefile.am | 1 + src/common/libflux/message.c | 1262 ++++++++++++++++---------------- src/common/libflux/message.h | 13 +- 3 files changed, 640 insertions(+), 636 deletions(-) diff --git a/src/common/libflux/Makefile.am b/src/common/libflux/Makefile.am index 084e7ab0c9d9..a090b6f7638a 100644 --- a/src/common/libflux/Makefile.am +++ b/src/common/libflux/Makefile.am @@ -8,6 +8,7 @@ AM_LDFLAGS = \ AM_CPPFLAGS = \ -I$(top_srcdir) \ -I$(top_srcdir)/src/include \ + -I$(top_srcdir)/src/common/libccan \ -I$(top_builddir) \ -I$(top_builddir)/src/common/libflux \ $(JANSSON_CFLAGS) \ diff --git a/src/common/libflux/message.c b/src/common/libflux/message.c index 355696688338..fcb2acea9327 100644 --- a/src/common/libflux/message.c +++ b/src/common/libflux/message.c @@ -8,7 +8,8 @@ * SPDX-License-Identifier: LGPL-3.0 \************************************************************/ -/* A flux messages consist of a list of zeromq frames: +/* A flux message contains route, topic, payload protocol information. + * When sent it is formed into the following zeromq frames. * * [route] * [route] @@ -38,19 +39,55 @@ #include "src/common/libutil/aux.h" #include "src/common/libutil/errno_safe.h" +/* czmq and ccan both define streq */ +#ifdef streq +#undef streq +#endif +#include "src/common/libccan/ccan/list/list.h" #include "message.h" struct flux_msg { - zmsg_t *zmsg; + // optional route list, if FLUX_MSGFLAG_ROUTE + struct list_head routes; + int routes_len; /* to avoid looping */ + + // optional topic frame, if FLUX_MSGFLAG_TOPIC + char *topic; + + // optional payload frame, if FLUX_MSGFLAG_PAYLOAD + void *payload; + size_t payload_size; + + // required proto frame data + uint8_t type; + uint8_t flags; + uint32_t userid; + uint32_t rolemask; + union { + uint32_t nodeid; // request + uint32_t sequence; // event + uint32_t errnum; // response, keepalive + uint32_t aux1; // common accessor + }; + union { + uint32_t matchtag; // request, response + uint32_t status; // keepalive + uint32_t aux2; // common accessor + }; + json_t *json; char *lasterr; struct aux_item *aux; int refcount; }; -/* Begin manual codec - * PROTO consists of 4 byte prelude followed by a fixed length +struct route_id { + struct list_node route_id_node; + char id[0]; /* variable length id stored at end of struct */ +}; + +/* PROTO consists of 4 byte prelude followed by a fixed length * array of u32's in network byte order. */ #define PROTO_MAGIC 0x8e @@ -62,6 +99,20 @@ struct flux_msg { #define PROTO_OFF_FLAGS 3 /* 1 byte */ #define PROTO_OFF_U32_ARRAY 4 +/* aux1 + * + * request - nodeid + * response - errnum + * event - sequence + * keepalive - errnum + * + * aux2 + * + * request - matchtag + * response - matchtag + * event - not used + * keepalive - status + */ #define PROTO_IND_USERID 0 #define PROTO_IND_ROLEMASK 1 #define PROTO_IND_AUX1 2 @@ -70,147 +121,74 @@ struct flux_msg { #define PROTO_U32_COUNT 4 #define PROTO_SIZE 4 + (PROTO_U32_COUNT * 4) -/* Helpful aliases */ -#define PROTO_IND_NODEID PROTO_IND_AUX1 // request -#define PROTO_IND_MATCHTAG PROTO_IND_AUX2 // request, response -#define PROTO_IND_SEQUENCE PROTO_IND_AUX1 // event -#define PROTO_IND_ERRNUM PROTO_IND_AUX1 // response, keepalive -#define PROTO_IND_STATUS PROTO_IND_AUX2 // keepalive +static void route_id_destroy (void *data) +{ + if (data) { + struct route_id *r = data; + free (r); + } +} -static int proto_set_u32 (uint8_t *data, int len, int index, uint32_t val); +static struct route_id *route_id_create (const char *id, unsigned int id_len) +{ + struct route_id *r; + if (!(r = calloc (1, sizeof (*r) + id_len + 1))) + return NULL; + if (id && id_len) { + memcpy (r->id, id, id_len); + list_node_init (&(r->route_id_node)); + } + return r; +} -static int proto_set_type (uint8_t *data, int len, int type) +static flux_msg_t *flux_msg_create_common (void) +{ + flux_msg_t *msg; + + if (!(msg = calloc (1, sizeof (*msg)))) + return NULL; + list_head_init (&msg->routes); + msg->refcount = 1; + return msg; +} + +static int msg_setup_type (flux_msg_t *msg, int type) { - if (len < PROTO_SIZE || data[PROTO_OFF_MAGIC] != PROTO_MAGIC - || data[PROTO_OFF_VERSION] != PROTO_VERSION) - return -1; switch (type) { case FLUX_MSGTYPE_REQUEST: - if (proto_set_u32 (data, len, PROTO_IND_NODEID, - FLUX_NODEID_ANY) < 0) - return -1; - if (proto_set_u32 (data, len, PROTO_IND_MATCHTAG, - FLUX_MATCHTAG_NONE) < 0) - return -1; + msg->nodeid = FLUX_NODEID_ANY; + msg->matchtag = FLUX_MATCHTAG_NONE; break; case FLUX_MSGTYPE_RESPONSE: /* N.B. don't clobber matchtag from request on set_type */ - if (proto_set_u32 (data, len, PROTO_IND_ERRNUM, 0) < 0) - return -1; + msg->errnum = 0; break; case FLUX_MSGTYPE_EVENT: - if (proto_set_u32 (data, len, PROTO_IND_SEQUENCE, 0) < 0) - return -1; - if (proto_set_u32 (data, len, PROTO_IND_AUX2, 0) < 0) - return -1; + msg->sequence = 0; + msg->aux2 = 0; break; case FLUX_MSGTYPE_KEEPALIVE: - if (proto_set_u32 (data, len, PROTO_IND_STATUS, 0) < 0) - return -1; - if (proto_set_u32 (data, len, PROTO_IND_ERRNUM, 0) < 0) - return -1; + msg->errnum = 0; + msg->status = 0; break; default: + errno = EINVAL; return -1; } - data[PROTO_OFF_TYPE] = type; - return 0; -} -static int proto_get_type (uint8_t *data, int len, int *type) -{ - if (len < PROTO_SIZE || data[PROTO_OFF_MAGIC] != PROTO_MAGIC - || data[PROTO_OFF_VERSION] != PROTO_VERSION) - return -1; - *type = data[PROTO_OFF_TYPE]; - return 0; -} -static int proto_set_flags (uint8_t *data, int len, uint8_t flags) -{ - if (len < PROTO_SIZE || data[PROTO_OFF_MAGIC] != PROTO_MAGIC - || data[PROTO_OFF_VERSION] != PROTO_VERSION) - return -1; - data[PROTO_OFF_FLAGS] = flags; - return 0; -} -static int proto_get_flags (uint8_t *data, int len, uint8_t *val) -{ - if (len < PROTO_SIZE || data[PROTO_OFF_MAGIC] != PROTO_MAGIC - || data[PROTO_OFF_VERSION] != PROTO_VERSION) - return -1; - *val = data[PROTO_OFF_FLAGS]; - return 0; -} -static int proto_set_u32 (uint8_t *data, int len, int index, uint32_t val) -{ - uint32_t x = htonl (val); - int offset = PROTO_OFF_U32_ARRAY + index * 4; - - if (len < PROTO_SIZE || data[PROTO_OFF_MAGIC] != PROTO_MAGIC - || data[PROTO_OFF_VERSION] != PROTO_VERSION - || index < 0 || index >= PROTO_U32_COUNT) - return -1; - memcpy (&data[offset], &x, sizeof (x)); - return 0; -} -static int proto_get_u32 (uint8_t *data, int len, int index, uint32_t *val) -{ - uint32_t x; - int offset = PROTO_OFF_U32_ARRAY + index * 4; - - if (len < PROTO_SIZE || data[PROTO_OFF_MAGIC] != PROTO_MAGIC - || data[PROTO_OFF_VERSION] != PROTO_VERSION - || index < 0 || index >= PROTO_U32_COUNT) - return -1; - memcpy (&x, &data[offset], sizeof (x)); - *val = ntohl (x); + msg->type = type; return 0; } -static void proto_init (uint8_t *data, int len, uint8_t flags) -{ - int n; - assert (len >= PROTO_SIZE); - memset (data, 0, len); - data[PROTO_OFF_MAGIC] = PROTO_MAGIC; - data[PROTO_OFF_VERSION] = PROTO_VERSION; - data[PROTO_OFF_FLAGS] = flags; - n = proto_set_u32 (data, len, PROTO_IND_USERID, FLUX_USERID_UNKNOWN); - assert (n == 0); - n = proto_set_u32 (data, len, PROTO_IND_ROLEMASK, FLUX_ROLE_NONE); - assert (n == 0); -} -/* End manual codec - */ - -static flux_msg_t *flux_msg_create_common (void) -{ - flux_msg_t *msg; - - if (!(msg = calloc (1, sizeof (*msg)))) - return NULL; - msg->refcount = 1; - return msg; -} flux_msg_t *flux_msg_create (int type) { - uint8_t proto[PROTO_SIZE]; flux_msg_t *msg; if (!(msg = flux_msg_create_common ())) return NULL; - proto_init (proto, PROTO_SIZE, 0); - if (proto_set_type (proto, PROTO_SIZE, type) < 0) { - errno = EINVAL; + msg->userid = FLUX_USERID_UNKNOWN; + msg->rolemask = FLUX_ROLE_NONE; + if (msg_setup_type (msg, type) < 0) goto error; - } - if (!(msg->zmsg = zmsg_new ())) { - errno = ENOMEM; - goto error; - } - if (zmsg_addmem (msg->zmsg, proto, PROTO_SIZE) < 0) { - errno = ENOMEM; - goto error; - } return msg; error: flux_msg_destroy (msg); @@ -221,8 +199,12 @@ void flux_msg_destroy (flux_msg_t *msg) { if (msg && --msg->refcount == 0) { int saved_errno = errno; + struct route_id *r; + while ((r = list_pop (&msg->routes, struct route_id, route_id_node))) + route_id_destroy (r); + free (msg->topic); + free (msg->payload); json_decref (msg->json); - zmsg_destroy (&msg->zmsg); aux_destroy (&msg->aux); free (msg->lasterr); free (msg); @@ -277,71 +259,257 @@ void *flux_msg_aux_get (const flux_msg_t *msg, const char *name) return aux_get (msg->aux, name); } +void encode_count (ssize_t *size, size_t len) +{ + if (len < 255) + (*size) += 1; + else + (*size) += 1 + 4; + (*size) += len; +} + ssize_t flux_msg_encode_size (const flux_msg_t *msg) { - zframe_t *zf; ssize_t size = 0; if (!msg) { errno = EINVAL; return -1; } - zf = zmsg_first (msg->zmsg); - while (zf) { - size_t n = zframe_size (zf); - if (n < 255) - size += 1; - else - size += 1 + 4; - size += n; - zf = zmsg_next (msg->zmsg); + + encode_count (&size, PROTO_SIZE); + if (msg->flags & FLUX_MSGFLAG_PAYLOAD) + encode_count (&size, msg->payload_size); + if (msg->flags & FLUX_MSGFLAG_TOPIC) + encode_count (&size, strlen (msg->topic)); + if (msg->flags & FLUX_MSGFLAG_ROUTE) { + struct route_id *r = NULL; + /* route delimeter */ + encode_count (&size, 0); + list_for_each (&msg->routes, r, route_id_node) + encode_count (&size, strlen (r->id)); } return size; } +static void proto_set_u32 (uint8_t *data, int index, uint32_t val) +{ + uint32_t x = htonl (val); + int offset = PROTO_OFF_U32_ARRAY + index * 4; + memcpy (&data[offset], &x, sizeof (x)); +} + +static void msg_proto_setup (const flux_msg_t *msg, uint8_t *data, int len) +{ + assert (len >= PROTO_SIZE); + memset (data, 0, len); + data[PROTO_OFF_MAGIC] = PROTO_MAGIC; + data[PROTO_OFF_VERSION] = PROTO_VERSION; + data[PROTO_OFF_TYPE] = msg->type; + data[PROTO_OFF_FLAGS] = msg->flags; + proto_set_u32 (data, PROTO_IND_USERID, msg->userid); + proto_set_u32 (data, PROTO_IND_ROLEMASK, msg->rolemask); + proto_set_u32 (data, PROTO_IND_AUX1, msg->aux1); + proto_set_u32 (data, PROTO_IND_AUX2, msg->aux2); +} + +static ssize_t encode_frame (uint8_t *buf, + size_t buf_len, + void *frame, + size_t frame_size) +{ + ssize_t n = 0; + if (frame_size < 0xff) { + if (buf_len < (frame_size + 1)) { + errno = EINVAL; + return -1; + } + *buf++ = (uint8_t)frame_size; + n += 1; + } else { + if (buf_len < (frame_size + 1 + 4)) { + errno = EINVAL; + return -1; + } + *buf++ = 0xff; + *(uint32_t *)buf = htonl (frame_size); + buf += 4; + n += 1 + 4; + } + if (frame && frame_size) + memcpy (buf, frame, frame_size); + return (frame_size + n); +} int flux_msg_encode (const flux_msg_t *msg, void *buf, size_t size) { - uint8_t *p = buf; - zframe_t *zf; + uint8_t proto[PROTO_SIZE]; + ssize_t total = 0; + ssize_t n; if (!msg) { errno = EINVAL; return -1; } - zf = zmsg_first (msg->zmsg); - while (zf) { - size_t n = zframe_size (zf); - if (n < 0xff) { - if (size - (p - (uint8_t *)buf) < n + 1) - goto nospace; - *p++ = (uint8_t)n; - } else { - if (size - (p - (uint8_t *)buf) < n + 1 + 4) - goto nospace; - *p++ = 0xff; - *(uint32_t *)p = htonl (n); - p += 4; + if (msg->flags & FLUX_MSGFLAG_ROUTE) { + struct route_id *r = NULL; + list_for_each (&msg->routes, r, route_id_node) { + if ((n = encode_frame (buf + total, + size - total, + r->id, + strlen (r->id))) < 0) + return -1; + total += n; } - memcpy (p, zframe_data (zf), n); - p += n; - zf = zmsg_next (msg->zmsg); + /* route delimeter */ + if ((n = encode_frame (buf + total, + size - total, + NULL, + 0)) < 0) + return -1; + total += n; + } + if (msg->flags & FLUX_MSGFLAG_TOPIC) { + if ((n = encode_frame (buf + total, + size - total, + msg->topic, + strlen (msg->topic))) < 0) + return -1; + total += n; + } + if (msg->flags & FLUX_MSGFLAG_PAYLOAD) { + if ((n = encode_frame (buf + total, + size - total, + msg->payload, + msg->payload_size)) < 0) + return -1; + total += n; } + msg_proto_setup (msg, proto, PROTO_SIZE); + if ((n = encode_frame (buf + total, + size - total, + proto, + PROTO_SIZE)) < 0) + return -1; + total += n; + return 0; +} + +static void proto_get_u32 (uint8_t *data, int index, uint32_t *val) +{ + uint32_t x; + int offset = PROTO_OFF_U32_ARRAY + index * 4; + memcpy (&x, &data[offset], sizeof (x)); + *val = ntohl (x); +} + +static int msg_append_route (flux_msg_t *msg, + const char *id, + unsigned int id_len) +{ + struct route_id *r; + assert (msg); + assert ((msg->flags & FLUX_MSGFLAG_ROUTE)); + assert (id); + if (!(r = route_id_create (id, id_len))) + return -1; + list_add_tail (&msg->routes, &r->route_id_node); + msg->routes_len++; + return 0; +} + +static int zmsg_to_msg (flux_msg_t *msg, zmsg_t *zmsg) +{ + uint8_t *proto_data; + size_t proto_size; + zframe_t *zf; + + if (!(zf = zmsg_last (zmsg))) { + errno = EPROTO; + return -1; + } + proto_data = zframe_data (zf); + proto_size = zframe_size (zf); + if (proto_size < PROTO_SIZE + || proto_data[PROTO_OFF_MAGIC] != PROTO_MAGIC + || proto_data[PROTO_OFF_VERSION] != PROTO_VERSION) { + errno = EPROTO; + return -1; + } + msg->type = proto_data[PROTO_OFF_TYPE]; + if (msg->type != FLUX_MSGTYPE_REQUEST + && msg->type != FLUX_MSGTYPE_RESPONSE + && msg->type != FLUX_MSGTYPE_EVENT + && msg->type != FLUX_MSGTYPE_KEEPALIVE) { + errno = EPROTO; + return -1; + } + msg->flags = proto_data[PROTO_OFF_FLAGS]; + + zf = zmsg_first (zmsg); + if ((msg->flags & FLUX_MSGFLAG_ROUTE)) { + if (!zf) { + errno = EPROTO; + return -1; + } + while (zf && zframe_size (zf) > 0) { + if (msg_append_route (msg, + (char *)zframe_data (zf), + zframe_size (zf)) < 0) + return -1; + zf = zmsg_next (zmsg); + } + if (zf) + zf = zmsg_next (zmsg); + } + if ((msg->flags & FLUX_MSGFLAG_TOPIC)) { + if (!zf) { + errno = EPROTO; + return -1; + } + if (!(msg->topic = zframe_strdup (zf))) { + errno = ENOMEM; + return -1; + } + if (zf) + zf = zmsg_next (zmsg); + } + if ((msg->flags & FLUX_MSGFLAG_PAYLOAD)) { + if (!zf) { + errno = EPROTO; + return -1; + } + msg->payload_size = zframe_size (zf); + if (!(msg->payload = malloc (msg->payload_size))) { + errno = ENOMEM; + return -1; + } + memcpy (msg->payload, zframe_data (zf), msg->payload_size); + if (zf) + zf = zmsg_next (zmsg); + } + /* proto frame required */ + if (!zf) { + errno = EPROTO; + return -1; + } + proto_get_u32 (proto_data, PROTO_IND_USERID, &msg->userid); + proto_get_u32 (proto_data, PROTO_IND_ROLEMASK, &msg->rolemask); + proto_get_u32 (proto_data, PROTO_IND_AUX1, &msg->aux1); + proto_get_u32 (proto_data, PROTO_IND_AUX2, &msg->aux2); return 0; -nospace: - errno = EINVAL; - return -1; } flux_msg_t *flux_msg_decode (const void *buf, size_t size) { flux_msg_t *msg; uint8_t const *p = buf; + zmsg_t *zmsg = NULL; zframe_t *zf; if (!(msg = flux_msg_create_common ())) return NULL; - if (!(msg->zmsg = zmsg_new ())) + if (!(zmsg = zmsg_new ())) goto nomem; while (p - (uint8_t *)buf < size) { size_t n = *p++; @@ -359,35 +527,40 @@ flux_msg_t *flux_msg_decode (const void *buf, size_t size) } if (!(zf = zframe_new (p, n))) goto nomem; - if (zmsg_append (msg->zmsg, &zf) < 0) + if (zmsg_append (zmsg, &zf) < 0) goto nomem; p += n; } + if (zmsg_to_msg (msg, zmsg) < 0) + goto error; + zmsg_destroy (&zmsg); return msg; nomem: errno = ENOMEM; error: + zmsg_destroy (&zmsg); flux_msg_destroy (msg); return NULL; } int flux_msg_set_type (flux_msg_t *msg, int type) { - zframe_t *zf = zmsg_last (msg->zmsg); - if (!zf || proto_set_type (zframe_data (zf), zframe_size (zf), type) < 0) { + if (!msg) { errno = EINVAL; return -1; } + if (msg_setup_type (msg, type) < 0) + return -1; return 0; } int flux_msg_get_type (const flux_msg_t *msg, int *type) { - zframe_t *zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), type) < 0) { - errno = EPROTO; + if (!msg) { + errno = EINVAL; return -1; } + (*type) = msg->type; return 0; } @@ -403,11 +576,7 @@ int flux_msg_set_flags (flux_msg_t *msg, uint8_t fl) errno = EINVAL; return -1; } - zframe_t *zf = zmsg_last (msg->zmsg); - if (!zf || proto_set_flags (zframe_data (zf), zframe_size (zf), fl) < 0) { - errno = EINVAL; - return -1; - } + msg->flags = fl; return 0; } @@ -417,38 +586,36 @@ int flux_msg_get_flags (const flux_msg_t *msg, uint8_t *fl) errno = EINVAL; return -1; } - zframe_t *zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_flags (zframe_data (zf), zframe_size (zf), fl) < 0) { - errno = EPROTO; - return -1; - } + (*fl) = msg->flags; return 0; } int flux_msg_set_private (flux_msg_t *msg) { - uint8_t flags = 0; - if (flux_msg_get_flags (msg, &flags) < 0) + if (!msg) { + errno = EINVAL; return -1; - if (flux_msg_set_flags (msg, flags | FLUX_MSGFLAG_PRIVATE) < 0) + } + if (flux_msg_set_flags (msg, msg->flags | FLUX_MSGFLAG_PRIVATE) < 0) return -1; return 0; } bool flux_msg_is_private (const flux_msg_t *msg) { - uint8_t flags = 0; - if (flux_msg_get_flags (msg, &flags) < 0) + if (!msg) return true; - return (flags & FLUX_MSGFLAG_PRIVATE) ? true : false; + return (msg->flags & FLUX_MSGFLAG_PRIVATE) ? true : false; } int flux_msg_set_streaming (flux_msg_t *msg) { - uint8_t flags = 0; - if (flux_msg_get_flags (msg, &flags) < 0) + uint8_t flags; + if (!msg) { + errno = EINVAL; return -1; - flags &= ~FLUX_MSGFLAG_NORESPONSE; + } + flags = msg->flags & ~FLUX_MSGFLAG_NORESPONSE; if (flux_msg_set_flags (msg, flags | FLUX_MSGFLAG_STREAMING) < 0) return -1; return 0; @@ -456,18 +623,19 @@ int flux_msg_set_streaming (flux_msg_t *msg) bool flux_msg_is_streaming (const flux_msg_t *msg) { - uint8_t flags = 0; - if (flux_msg_get_flags (msg, &flags) < 0) + if (!msg) return true; - return (flags & FLUX_MSGFLAG_STREAMING) ? true : false; + return (msg->flags & FLUX_MSGFLAG_STREAMING) ? true : false; } int flux_msg_set_noresponse (flux_msg_t *msg) { uint8_t flags = 0; - if (flux_msg_get_flags (msg, &flags) < 0) + if (!msg) { + errno = EINVAL; return -1; - flags &= ~FLUX_MSGFLAG_STREAMING; + } + flags = msg->flags & ~FLUX_MSGFLAG_STREAMING; if (flux_msg_set_flags (msg, flags | FLUX_MSGFLAG_NORESPONSE) < 0) return -1; return 0; @@ -475,85 +643,48 @@ int flux_msg_set_noresponse (flux_msg_t *msg) bool flux_msg_is_noresponse (const flux_msg_t *msg) { - uint8_t flags = 0; - if (flux_msg_get_flags (msg, &flags) < 0) + if (!msg) return true; - return (flags & FLUX_MSGFLAG_NORESPONSE) ? true : false; + return (msg->flags & FLUX_MSGFLAG_NORESPONSE) ? true : false; } int flux_msg_set_userid (flux_msg_t *msg, uint32_t userid) { - zframe_t *zf; - if (!msg) { errno = EINVAL; return -1; } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_set_u32 (zframe_data (zf), - zframe_size (zf), - PROTO_IND_USERID, - userid) < 0) { - errno = EINVAL; - return -1; - } + msg->userid = userid; return 0; } int flux_msg_get_userid (const flux_msg_t *msg, uint32_t *userid) { - zframe_t *zf; - if (!msg || !userid) { errno = EINVAL; return -1; } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_u32 (zframe_data (zf), - zframe_size (zf), - PROTO_IND_USERID, - userid) < 0) { - errno = EPROTO; - return -1; - } + (*userid) = msg->userid; return 0; } int flux_msg_set_rolemask (flux_msg_t *msg, uint32_t rolemask) { - zframe_t *zf; - if (!msg) { errno = EINVAL; return -1; } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_set_u32 (zframe_data (zf), - zframe_size (zf), - PROTO_IND_ROLEMASK, - rolemask) < 0) { - errno = EINVAL; - return -1; - } + msg->rolemask = rolemask; return 0; } int flux_msg_get_rolemask (const flux_msg_t *msg, uint32_t *rolemask) { - zframe_t *zf; - if (!msg || !rolemask) { errno = EINVAL; return -1; } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_u32 (zframe_data (zf), - zframe_size (zf), - PROTO_IND_ROLEMASK, - rolemask) < 0) { - errno = EPROTO; - return -1; - } + (*rolemask) = msg->rolemask; return 0; } @@ -607,22 +738,13 @@ int flux_msg_authorize (const flux_msg_t *msg, uint32_t userid) int flux_msg_set_nodeid (flux_msg_t *msg, uint32_t nodeid) { - zframe_t *zf; - int type; - if (!msg) goto error; if (nodeid == FLUX_NODEID_UPSTREAM) /* should have been resolved earlier */ goto error; - if (!(zf = zmsg_last (msg->zmsg))) - goto error; - if (proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0) - goto error; - if (type != FLUX_MSGTYPE_REQUEST) - goto error; - if (proto_set_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_NODEID, nodeid) < 0) + if (msg->type != FLUX_MSGTYPE_REQUEST) goto error; + msg->nodeid = nodeid; return 0; error: errno = EINVAL; @@ -631,191 +753,117 @@ int flux_msg_set_nodeid (flux_msg_t *msg, uint32_t nodeid) int flux_msg_get_nodeid (const flux_msg_t *msg, uint32_t *nodeidp) { - zframe_t *zf; - int type; - uint32_t nodeid; - if (!msg || !nodeidp) { errno = EINVAL; return -1; } - if (!(zf = zmsg_last (msg->zmsg))) - goto error; - if (proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0) - goto error; - if (type != FLUX_MSGTYPE_REQUEST) - goto error; - if (proto_get_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_NODEID, &nodeid) < 0) - goto error; - *nodeidp = nodeid; + if (msg->type != FLUX_MSGTYPE_REQUEST) { + errno = EPROTO; + return -1; + } + *nodeidp = msg->nodeid; return 0; -error: - return EPROTO; - return -1; } int flux_msg_set_errnum (flux_msg_t *msg, int e) { - zframe_t *zf;; - int type; - - if (!msg) { - errno = EINVAL; - return -1; - } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0 - || (type != FLUX_MSGTYPE_RESPONSE && type != FLUX_MSGTYPE_KEEPALIVE) - || proto_set_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_ERRNUM, e) < 0) { + if (!msg + || (msg->type != FLUX_MSGTYPE_RESPONSE + && msg->type != FLUX_MSGTYPE_KEEPALIVE)) { errno = EINVAL; return -1; } + msg->errnum = e; return 0; } int flux_msg_get_errnum (const flux_msg_t *msg, int *e) { - zframe_t *zf; - int type; - uint32_t xe; - if (!msg || !e) { errno = EINVAL; return -1; } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0 - || (type != FLUX_MSGTYPE_RESPONSE && type != FLUX_MSGTYPE_KEEPALIVE) - || proto_get_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_ERRNUM, &xe) < 0) { + if (msg->type != FLUX_MSGTYPE_RESPONSE + && msg->type != FLUX_MSGTYPE_KEEPALIVE) { errno = EPROTO; return -1; } - *e = xe; + *e = msg->errnum; return 0; } int flux_msg_set_seq (flux_msg_t *msg, uint32_t seq) { - zframe_t *zf; - int type; - - if (!msg) { - errno = EINVAL; - return -1; - } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0 - || type != FLUX_MSGTYPE_EVENT - || proto_set_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_SEQUENCE, seq) < 0) { + if (!msg || msg->type != FLUX_MSGTYPE_EVENT) { errno = EINVAL; return -1; } + msg->sequence = seq; return 0; } int flux_msg_get_seq (const flux_msg_t *msg, uint32_t *seq) { - zframe_t *zf; - int type; - if (!msg || !seq) { errno = EINVAL; return -1; } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0 - || type != FLUX_MSGTYPE_EVENT - || proto_get_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_SEQUENCE, seq) < 0) { + if (msg->type != FLUX_MSGTYPE_EVENT) { errno = EPROTO; return -1; } + (*seq) = msg->sequence; return 0; } int flux_msg_set_matchtag (flux_msg_t *msg, uint32_t t) { - zframe_t *zf; - int type; - - if (!msg) { - errno = EINVAL; - return -1; - } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0 - || (type != FLUX_MSGTYPE_REQUEST && type != FLUX_MSGTYPE_RESPONSE) - || proto_set_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_MATCHTAG, t) < 0) { + if (!msg + || (msg->type != FLUX_MSGTYPE_REQUEST + && msg->type != FLUX_MSGTYPE_RESPONSE)) { errno = EINVAL; return -1; } + msg->matchtag = t; return 0; } int flux_msg_get_matchtag (const flux_msg_t *msg, uint32_t *t) { - zframe_t *zf; - int type; - if (!msg || !t) { errno = EINVAL; return -1; } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0 - || (type != FLUX_MSGTYPE_REQUEST && type != FLUX_MSGTYPE_RESPONSE) - || proto_get_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_MATCHTAG, t) < 0) { + if (msg->type != FLUX_MSGTYPE_REQUEST + && msg->type != FLUX_MSGTYPE_RESPONSE) { errno = EPROTO; return -1; } + (*t) = msg->matchtag; return 0; } int flux_msg_set_status (flux_msg_t *msg, int s) { - zframe_t *zf; - int type; - - if (!msg) { - errno = EINVAL; - return -1; - } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0 - || type != FLUX_MSGTYPE_KEEPALIVE - || proto_set_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_STATUS, s) < 0) { + if (!msg || msg->type != FLUX_MSGTYPE_KEEPALIVE) { errno = EINVAL; return -1; } + msg->status = s; return 0; } int flux_msg_get_status (const flux_msg_t *msg, int *s) { - zframe_t *zf; - int type; - uint32_t u; - if (!msg || !s) { errno = EINVAL; return -1; } - zf = zmsg_last (msg->zmsg); - if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0 - || type != FLUX_MSGTYPE_KEEPALIVE - || proto_get_u32 (zframe_data (zf), zframe_size (zf), - PROTO_IND_STATUS, &u) < 0) { + if (msg->type != FLUX_MSGTYPE_KEEPALIVE) { errno = EPROTO; return -1; } - *s = u; + (*s) = msg->status; return 0; } @@ -878,218 +926,174 @@ bool flux_msg_cmp (const flux_msg_t *msg, struct flux_match match) int flux_msg_enable_route (flux_msg_t *msg) { - uint8_t flags = 0; - - if (flux_msg_get_flags (msg, &flags) < 0) - return -1; - if ((flags & FLUX_MSGFLAG_ROUTE)) - return 0; - if (zmsg_pushmem (msg->zmsg, NULL, 0) < 0) { - errno = ENOMEM; + if (!msg) { + errno = EINVAL; return -1; } - flags |= FLUX_MSGFLAG_ROUTE; - return flux_msg_set_flags (msg, flags); + if ((msg->flags & FLUX_MSGFLAG_ROUTE)) + return 0; + return flux_msg_set_flags (msg, msg->flags |= FLUX_MSGFLAG_ROUTE); } int flux_msg_clear_route (flux_msg_t *msg) { - uint8_t flags = 0; - zframe_t *zf; - int size; - - if (flux_msg_get_flags (msg, &flags) < 0) + struct route_id *r; + if (!msg) { + errno = EINVAL; return -1; - if (!(flags & FLUX_MSGFLAG_ROUTE)) - return 0; - while ((zf = zmsg_pop (msg->zmsg))) { - size = zframe_size (zf); - zframe_destroy (&zf); - if (size == 0) - break; } - flags &= ~(uint8_t)FLUX_MSGFLAG_ROUTE; - return flux_msg_set_flags (msg, flags); + if (!(msg->flags & FLUX_MSGFLAG_ROUTE)) + return 0; + while ((r = list_pop (&msg->routes, struct route_id, route_id_node))) + route_id_destroy (r); + list_head_init (&msg->routes); + return flux_msg_set_flags (msg, msg->flags & ~(uint8_t)FLUX_MSGFLAG_ROUTE); } int flux_msg_push_route (flux_msg_t *msg, const char *id) { - uint8_t flags = 0; - if (!id) { + struct route_id *r; + if (!msg || !id) { errno = EINVAL; return -1; } - if (flux_msg_get_flags (msg, &flags) < 0) - return -1; - if (!(flags & FLUX_MSGFLAG_ROUTE)) { + if (!(msg->flags & FLUX_MSGFLAG_ROUTE)) { errno = EPROTO; return -1; } - if (zmsg_pushstr (msg->zmsg, id) < 0) { - errno = ENOMEM; + if (!(r = route_id_create (id, strlen (id)))) return -1; - } + list_add (&msg->routes, &r->route_id_node); + msg->routes_len++; return 0; } int flux_msg_pop_route (flux_msg_t *msg, char **id) { - uint8_t flags = 0; - zframe_t *zf; + struct route_id *r; /* do not check 'id' for NULL, a "pop" is acceptable w/o returning * data to the user. Caller may wish to only "pop" and not look * at the data. */ - if (flux_msg_get_flags (msg, &flags) < 0) + if (!msg) { + errno = EINVAL; return -1; - if (!(flags & FLUX_MSGFLAG_ROUTE) || !(zf = zmsg_first (msg->zmsg))) { + } + if (!(msg->flags & FLUX_MSGFLAG_ROUTE)) { errno = EPROTO; return -1; } - if (zframe_size (zf) > 0 && (zf = zmsg_pop (msg->zmsg))) { - if (id) { - char *s = zframe_strdup (zf); - if (!s) { - zframe_destroy (&zf); - errno = ENOMEM; - return -1; - } - *id = s; - } - zframe_destroy (&zf); - } else { + if (list_empty (&msg->routes)) { if (id) - *id = NULL; + (*id) = NULL; + return 0; + } + r = list_pop (&msg->routes, struct route_id, route_id_node); + assert (r); + if (id) { + if (!((*id) = strdup (r->id))) + return -1; } + route_id_destroy (r); + msg->routes_len--; return 0; } /* replaces flux_msg_nexthop */ int flux_msg_get_route_last (const flux_msg_t *msg, char **id) { - uint8_t flags = 0; - zframe_t *zf; - char *s = NULL; + struct route_id *r; - if (!id) { + if (!msg || !id) { errno = EINVAL; return -1; } - if (flux_msg_get_flags (msg, &flags) < 0) - return -1; - if (!(flags & FLUX_MSGFLAG_ROUTE) || !(zf = zmsg_first (msg->zmsg))) { + if (!(msg->flags & FLUX_MSGFLAG_ROUTE)) { errno = EPROTO; return -1; } - if (zframe_size (zf) > 0 && !(s = zframe_strdup (zf))) { - errno = ENOMEM; - return -1; + if ((r = list_top (&msg->routes, struct route_id, route_id_node))) { + if (!((*id) = strdup (r->id))) + return -1; } - *id = s; + else + (*id) = NULL; return 0; } -static zframe_t *find_route_first (const flux_msg_t *msg) +static int find_route_first (const flux_msg_t *msg, struct route_id **r) { - uint8_t flags = 0; - zframe_t *zf, *zf_next; - - if (flux_msg_get_flags (msg, &flags) < 0) - return NULL; - if (!(flags & FLUX_MSGFLAG_ROUTE)) { + if (!(msg->flags & FLUX_MSGFLAG_ROUTE)) { errno = EPROTO; - return NULL; - } - zf = zmsg_first (msg->zmsg); - while (zf && zframe_size (zf) > 0) { - zf_next = zmsg_next (msg->zmsg); - if (zf_next && zframe_size (zf_next) == 0) - break; - zf = zf_next; + return -1; } - return zf; + (*r) = list_tail (&msg->routes, struct route_id, route_id_node); + return 0; } /* replaces flux_msg_sender */ int flux_msg_get_route_first (const flux_msg_t *msg, char **id) { - zframe_t *zf; - char *s = NULL; + struct route_id *r = NULL; - if (!id) { + if (!msg || !id) { errno = EINVAL; return -1; } - if (!(zf = find_route_first (msg))) - return -1; - if (zframe_size (zf) > 0 && !(s = zframe_strdup (zf))) { - errno = ENOMEM; + if (find_route_first (msg, &r) < 0) return -1; + if (r) { + if (!((*id) = strdup (r->id))) + return -1; } - *id = s; + else + (*id) = NULL; return 0; } int flux_msg_get_route_count (const flux_msg_t *msg) { - uint8_t flags = 0; - zframe_t *zf; - int count = 0; - - if (flux_msg_get_flags (msg, &flags) < 0) + if (!msg) { + errno = EINVAL; return -1; - if (!(flags & FLUX_MSGFLAG_ROUTE)) { + } + if (!(msg->flags & FLUX_MSGFLAG_ROUTE)) { errno = EPROTO; return -1; } - zf = zmsg_first (msg->zmsg); - while (zf && zframe_size (zf) > 0) { - zf = zmsg_next (msg->zmsg); - count++; - } - return count; + return msg->routes_len; } /* Get sum of size in bytes of route frames */ static int flux_msg_get_route_size (const flux_msg_t *msg) { - uint8_t flags = 0; - zframe_t *zf; + struct route_id *r = NULL; int size = 0; - if (flux_msg_get_flags (msg, &flags) < 0) - return -1; - if (!(flags & FLUX_MSGFLAG_ROUTE)) { + assert (msg); + if (!(msg->flags & FLUX_MSGFLAG_ROUTE)) { errno = EPROTO; return -1; } - zf = zmsg_first (msg->zmsg); - while (zf && zframe_size (zf) > 0) { - size += zframe_size (zf); - zf = zmsg_next (msg->zmsg); - } + list_for_each (&msg->routes, r, route_id_node) + size += strlen (r->id); return size; } -static zframe_t *flux_msg_get_route_nth (const flux_msg_t *msg, int n) +static char *flux_msg_get_route_nth (const flux_msg_t *msg, int n) { - uint8_t flags = 0; - zframe_t *zf; + struct route_id *r = NULL; int count = 0; - if (flux_msg_get_flags (msg, &flags) < 0) - return NULL; - if (!(flags & FLUX_MSGFLAG_ROUTE)) { + if (!(msg->flags & FLUX_MSGFLAG_ROUTE)) { errno = EPROTO; return NULL; } - zf = zmsg_first (msg->zmsg); - while (zf && zframe_size (zf) > 0) { + list_for_each (&msg->routes, r, route_id_node) { if (count == n) - return zf; - zf = zmsg_next (msg->zmsg); + return r->id; count++; } errno = ENOENT; @@ -1100,7 +1104,6 @@ char *flux_msg_get_route_string (const flux_msg_t *msg) { int hops, len; int n; - zframe_t *zf; char *buf, *cp; if (!msg) { @@ -1114,32 +1117,32 @@ char *flux_msg_get_route_string (const flux_msg_t *msg) if (!(cp = buf = malloc (len + hops + 1))) return NULL; for (n = hops - 1; n >= 0; n--) { + char *id; if (cp > buf) *cp++ = '!'; - if (!(zf = flux_msg_get_route_nth (msg, n))) { + if (!(id = flux_msg_get_route_nth (msg, n))) { ERRNO_SAFE_WRAP (free, buf); return NULL; } - int cpylen = zframe_size (zf); + int cpylen = strlen (id); if (cpylen > 8) /* abbreviate long UUID */ cpylen = 8; assert (cp - buf + cpylen < len + hops); - memcpy (cp, zframe_data (zf), cpylen); + memcpy (cp, id, cpylen); cp += cpylen; } *cp = '\0'; return buf; } -static bool payload_overlap (const void *b, zframe_t *zf) +static bool payload_overlap (flux_msg_t *msg, const void *b) { - return ((char *)b >= (char *)zframe_data (zf) - && (char *)b < (char *)zframe_data (zf) + zframe_size (zf)); + return ((char *)b >= (char *)msg->payload + && (char *)b < (char *)msg->payload + msg->payload_size); } int flux_msg_set_payload (flux_msg_t *msg, const void *buf, int size) { - zframe_t *zf; uint8_t flags = 0; if (!msg) { @@ -1148,50 +1151,47 @@ int flux_msg_set_payload (flux_msg_t *msg, const void *buf, int size) } json_decref (msg->json); /* invalidate cached json object */ msg->json = NULL; - if (flux_msg_get_flags (msg, &flags) < 0) - return -1; + flags = msg->flags; if (!(flags & FLUX_MSGFLAG_PAYLOAD) && (buf == NULL || size == 0)) return 0; - zf = zmsg_first (msg->zmsg); - if ((flags & FLUX_MSGFLAG_ROUTE)) { - while (zf && zframe_size (zf) > 0) - zf = zmsg_next (msg->zmsg); /* skip route frame */ - if (zf) - zf = zmsg_next (msg->zmsg); /* skip route delim */ - } - if ((flags & FLUX_MSGFLAG_TOPIC)) { - if (zf) - zf = zmsg_next (msg->zmsg); /* skip topic frame */ - } - if (!zf) { /* must at least have proto frame */ - errno = EPROTO; - return -1; - } /* Case #1: replace existing payload. */ if ((flags & FLUX_MSGFLAG_PAYLOAD) && (buf != NULL && size > 0)) { - if (zframe_data (zf) != buf || zframe_size (zf) != size) { - if (payload_overlap (buf, zf)) { + assert (msg->payload); + if (msg->payload != buf || msg->payload_size != size) { + if (payload_overlap (msg, buf)) { errno = EINVAL; return -1; } - zframe_reset (zf, buf, size); } + if (size > msg->payload_size) { + void *ptr; + if (!(ptr = realloc (msg->payload, size))) { + errno = ENOMEM; + return -1; + } + msg->payload = ptr; + msg->payload_size = size; + } + memcpy (msg->payload, buf, size); /* Case #2: add payload. */ } else if (!(flags & FLUX_MSGFLAG_PAYLOAD) && (buf != NULL && size > 0)) { - zmsg_remove (msg->zmsg, zf); - if (zmsg_addmem (msg->zmsg, buf, size) < 0 - || zmsg_append (msg->zmsg, &zf) < 0) { + assert (!msg->payload); + if (!(msg->payload = malloc (size))) { errno = ENOMEM; return -1; } + msg->payload_size = size; + memcpy (msg->payload, buf, size); flags |= FLUX_MSGFLAG_PAYLOAD; /* Case #3: remove payload. */ } else if ((flags & FLUX_MSGFLAG_PAYLOAD) && (buf == NULL || size == 0)) { - zmsg_remove (msg->zmsg, zf); - zframe_destroy (&zf); + assert (msg->payload); + free (msg->payload); + msg->payload = NULL; + msg->payload_size = 0; flags &= ~(uint8_t)(FLUX_MSGFLAG_PAYLOAD); } if (flux_msg_set_flags (msg, flags) < 0) @@ -1276,49 +1276,26 @@ int flux_msg_pack (flux_msg_t *msg, const char *fmt, ...) int flux_msg_get_payload (const flux_msg_t *msg, const void **buf, int *size) { - zframe_t *zf; - uint8_t flags = 0; - - if (!buf && !size) { + if (!msg || (!buf && !size)) { errno = EINVAL; return -1; } - if (flux_msg_get_flags (msg, &flags) < 0) - return -1; - if (!(flags & FLUX_MSGFLAG_PAYLOAD)) { - errno = EPROTO; - return -1; - } - zf = zmsg_first (msg->zmsg); - if ((flags & FLUX_MSGFLAG_ROUTE)) { - while (zf && zframe_size (zf) > 0) - zf = zmsg_next (msg->zmsg); - if (zf) - zf = zmsg_next (msg->zmsg); - } - if ((flags & FLUX_MSGFLAG_TOPIC)) { - if (zf) - zf = zmsg_next (msg->zmsg); - } - if (!zf) { + if (!(msg->flags & FLUX_MSGFLAG_PAYLOAD)) { errno = EPROTO; return -1; } if (buf) - *buf = zframe_data (zf); + *buf = msg->payload; if (size) - *size = zframe_size (zf); + *size = msg->payload_size; return 0; } bool flux_msg_has_payload (const flux_msg_t *msg) { - uint8_t flags = 0; - if (flux_msg_get_flags (msg, &flags) < 0) { - errno = 0; + if (!msg) return false; - } - return ((flags & FLUX_MSGFLAG_PAYLOAD)); + return ((msg->flags & FLUX_MSGFLAG_PAYLOAD)); } int flux_msg_set_string (flux_msg_t *msg, const char *s) @@ -1422,40 +1399,26 @@ const char *flux_msg_last_error (const flux_msg_t *msg) int flux_msg_set_topic (flux_msg_t *msg, const char *topic) { - zframe_t *zf, *zf2 = NULL; uint8_t flags = 0; - if (flux_msg_get_flags (msg, &flags) < 0) - return -1; - zf = zmsg_first (msg->zmsg); - if ((flags & FLUX_MSGFLAG_ROUTE)) { /* skip over routing frames, if any */ - while (zf && zframe_size (zf) > 0) - zf = zmsg_next (msg->zmsg); - if (zf) - zf = zmsg_next (msg->zmsg); - } - if (!zf) { /* must at least have proto frame */ - errno = EPROTO; + if (!msg) { + errno = EINVAL; return -1; } + flags = msg->flags; if ((flags & FLUX_MSGFLAG_TOPIC) && topic) { /* case 1: repl topic */ - zframe_reset (zf, topic, strlen (topic) + 1); + free (msg->topic); + if (!(msg->topic = strdup (topic))) + return -1; } else if (!(flags & FLUX_MSGFLAG_TOPIC) && topic) {/* case 2: add topic */ - zmsg_remove (msg->zmsg, zf); - if ((flags & FLUX_MSGFLAG_PAYLOAD) && (zf2 = zmsg_next (msg->zmsg))) - zmsg_remove (msg->zmsg, zf2); - if (zmsg_addmem (msg->zmsg, topic, strlen (topic) + 1) < 0 - || zmsg_append (msg->zmsg, &zf) < 0 - || (zf2 && zmsg_append (msg->zmsg, &zf2) < 0)) { - errno = ENOMEM; + if (!(msg->topic = strdup (topic))) return -1; - } flags |= FLUX_MSGFLAG_TOPIC; if (flux_msg_set_flags (msg, flags) < 0) return -1; } else if ((flags & FLUX_MSGFLAG_TOPIC) && !topic) { /* case 3: del topic */ - zmsg_remove (msg->zmsg, zf); - zframe_destroy (&zf); + free (msg->topic); + msg->topic = NULL; flags &= ~(uint8_t)FLUX_MSGFLAG_TOPIC; if (flux_msg_set_flags (msg, flags) < 0) return -1; @@ -1463,90 +1426,60 @@ int flux_msg_set_topic (flux_msg_t *msg, const char *topic) return 0; } -static int zf_topic (const flux_msg_t *msg, zframe_t **zfp) -{ - uint8_t flags = 0; - zframe_t *zf = NULL; - - if (flux_msg_get_flags (msg, &flags) < 0) - return -1; - if (!(flags & FLUX_MSGFLAG_TOPIC)) { - errno = EPROTO; - return -1; - } - zf = zmsg_first (msg->zmsg); - if ((flags & FLUX_MSGFLAG_ROUTE)) { - while (zf && zframe_size (zf) > 0) - zf = zmsg_next (msg->zmsg); - if (zf) - zf = zmsg_next (msg->zmsg); - } - if (!zf) { - errno = EPROTO; - return -1; - } - *zfp = zf; - return 0; -} - int flux_msg_get_topic (const flux_msg_t *msg, const char **topic) { - zframe_t *zf; - const char *s; - - if (!topic) { + if (!msg || !topic) { errno = EINVAL; return -1; } - if (zf_topic (msg, &zf) < 0) - return -1; - s = (const char *)zframe_data (zf); - if (s[zframe_size (zf) - 1] != '\0') { + if (!(msg->flags & FLUX_MSGFLAG_TOPIC)) { errno = EPROTO; return -1; } - *topic = s; + *topic = msg->topic; return 0; } flux_msg_t *flux_msg_copy (const flux_msg_t *msg, bool payload) { flux_msg_t *cpy = NULL; - zframe_t *zf; - int count; - uint8_t flags = 0; - bool skip_payload = false; - /* Set skip_payload = true if caller set 'payload' flag false - * AND message contains a payload frame. - */ - if (flux_msg_get_flags (msg, &flags) < 0) + if (!msg) { + errno = EINVAL; return NULL; - if (!payload && (flags & FLUX_MSGFLAG_PAYLOAD)) { - flags &= ~(FLUX_MSGFLAG_PAYLOAD); - skip_payload = true; } + if (!(cpy = flux_msg_create_common ())) return NULL; - if (!(cpy->zmsg = zmsg_new ())) - goto nomem; - /* Copy frames from 'msg' to 'cpy'. - * 'count' indexes frames from 0 to zmsg_size (msg) - 1. - * The payload frame (if it exists) will be in the second to last position. - */ - count = 0; - zf = zmsg_first (msg->zmsg); - while (zf) { - if (!skip_payload || count != zmsg_size (msg->zmsg) - 2) { - if (zmsg_addmem (cpy->zmsg, zframe_data (zf), zframe_size (zf)) < 0) + cpy->type = msg->type; + cpy->flags = msg->flags; + cpy->userid = msg->userid; + cpy->rolemask = msg->rolemask; + cpy->aux1 = msg->aux1; + cpy->aux2 = msg->aux2; + + if (flux_msg_get_route_count (msg) > 0) { + struct route_id *r = NULL; + list_for_each_rev (&msg->routes, r, route_id_node) { + if (flux_msg_push_route (cpy, r->id) < 0) + goto error; + } + } + if (msg->topic) { + if (!(cpy->topic = strdup (msg->topic))) + goto nomem; + } + if (msg->payload) { + if (payload) { + cpy->payload_size = msg->payload_size; + if (!(cpy->payload = malloc (cpy->payload_size))) goto nomem; + memcpy (cpy->payload, msg->payload, msg->payload_size); } - zf = zmsg_next (msg->zmsg); - count++; + else + cpy->flags &= ~FLUX_MSGFLAG_PAYLOAD; } - if (flux_msg_set_flags (cpy, flags) < 0) - goto error; return cpy; nomem: errno = ENOMEM; @@ -1592,22 +1525,16 @@ static const char *msgtype_shortstr (int type) void flux_msg_fprint (FILE *f, const flux_msg_t *msg) { int hops; - int type = 0; - zframe_t *proto; - const char *prefix, *topic = NULL; + const char *prefix; + uint8_t proto[PROTO_SIZE]; + int i; fprintf (f, "--------------------------------------\n"); if (!msg) { fprintf (f, "NULL"); return; } - if (flux_msg_get_type (msg, &type) < 0 - || (!(proto = zmsg_last (msg->zmsg)))) { - fprintf (f, "malformed message"); - return; - } - prefix = msgtype_shortstr (type); - (void)flux_msg_get_topic (msg, &topic); + prefix = msgtype_shortstr (msg->type); /* Route stack */ hops = flux_msg_get_route_count (msg); /* -1 if no route stack */ @@ -1620,8 +1547,8 @@ void flux_msg_fprint (FILE *f, const flux_msg_t *msg) }; /* Topic (keepalive has none) */ - if (topic) - fprintf (f, "%s[%3.3zu] %s\n", prefix, strlen (topic), topic); + if (msg->topic) + fprintf (f, "%s[%3.3zu] %s\n", prefix, strlen (msg->topic), msg->topic); /* Payload */ if (flux_msg_has_payload (msg)) { @@ -1637,32 +1564,91 @@ void flux_msg_fprint (FILE *f, const flux_msg_t *msg) } /* Proto block */ - zframe_fprint (proto, prefix, f); + msg_proto_setup (msg, proto, PROTO_SIZE); + fprintf (f, "%s[%03d] ", prefix, PROTO_SIZE); + for (i = 0; i < PROTO_SIZE; i++) + fprintf (f, "%02X", proto[i]); + fprintf (f, "\n"); +} + +static zmsg_t *msg_to_zmsg (const flux_msg_t *msg) +{ + uint8_t proto[PROTO_SIZE]; + zmsg_t *zmsg = NULL; + + if (!(zmsg = zmsg_new ())) { + errno = ENOMEM; + return NULL; + } + msg_proto_setup (msg, proto, PROTO_SIZE); + if (zmsg_addmem (zmsg, proto, PROTO_SIZE) < 0) { + errno = ENOMEM; + goto error; + } + if (msg->flags & FLUX_MSGFLAG_PAYLOAD) { + if (zmsg_pushmem (zmsg, msg->payload, msg->payload_size) < 0) { + errno = ENOMEM; + goto error; + } + } + if (msg->flags & FLUX_MSGFLAG_TOPIC) { + if (zmsg_pushmem (zmsg, msg->topic, strlen (msg->topic)) < 0) { + errno = ENOMEM; + goto error; + } + } + if (msg->flags & FLUX_MSGFLAG_ROUTE) { + struct route_id *r = NULL; + if (zmsg_pushmem (zmsg, NULL, 0) < 0) { + errno = ENOMEM; + goto error; + } + list_for_each_rev (&msg->routes, r, route_id_node) { + if (zmsg_pushstr (zmsg, r->id) < 0) { + errno = ENOMEM; + goto error; + } + } + } + return zmsg; +error: + zmsg_destroy (&zmsg); + return NULL; } int flux_msg_sendzsock_ex (void *sock, const flux_msg_t *msg, bool nonblock) { - if (!sock || !msg || !zmsg_is (msg->zmsg)) { + void *handle; + int flags = ZFRAME_REUSE | ZFRAME_MORE; + zmsg_t *zmsg = NULL; + zframe_t *zf; + size_t count = 0; + int rc = -1; + + if (!sock || !msg) { errno = EINVAL; return -1; } - void *handle = zsock_resolve (sock); - int flags = ZFRAME_REUSE | ZFRAME_MORE; - zframe_t *zf = zmsg_first (msg->zmsg); - size_t count = 0; + if (!(zmsg = msg_to_zmsg (msg))) + return -1; if (nonblock) flags |= ZFRAME_DONTWAIT; + handle = zsock_resolve (sock); + zf = zmsg_first (zmsg); while (zf) { - if (++count == zmsg_size (msg->zmsg)) + if (++count == zmsg_size (zmsg)) flags &= ~ZFRAME_MORE; if (zframe_send (&zf, handle, flags) < 0) - return -1; - zf = zmsg_next (msg->zmsg); + goto error; + zf = zmsg_next (zmsg); } - return 0; + rc = 0; +error: + zmsg_destroy (&zmsg); + return rc; } int flux_msg_sendzsock (void *sock, const flux_msg_t *msg) @@ -1682,17 +1668,31 @@ flux_msg_t *flux_msg_recvzsock (void *sock) errno = ENOMEM; return NULL; } - msg->zmsg = zmsg; + if (zmsg_to_msg (msg, zmsg) < 0) { + int save_errno = errno; + zmsg_destroy (&zmsg); + errno = save_errno; + return NULL; + } + zmsg_destroy (&zmsg); return msg; } int flux_msg_frames (const flux_msg_t *msg) { + int n = 1; /* 1 for proto frame */ if (!msg) { errno = EINVAL; return -1; } - return zmsg_size (msg->zmsg); + if (msg->flags & FLUX_MSGFLAG_PAYLOAD) + n++; + if (msg->flags & FLUX_MSGFLAG_TOPIC) + n++; + if (msg->flags & FLUX_MSGFLAG_ROUTE) + /* +1 for routes delimeter frame */ + n += msg->routes_len + 1; + return n; } struct flux_match flux_match_init (int typemask, @@ -1721,14 +1721,18 @@ int flux_match_asprintf (struct flux_match *m, const char *topic_glob_fmt, ...) bool flux_msg_match_route_first (const flux_msg_t *msg1, const flux_msg_t *msg2) { - zframe_t *zf1 = find_route_first (msg1); - zframe_t *zf2 = find_route_first (msg2); - int len; + struct route_id *r1 = NULL; + struct route_id *r2 = NULL; - if (!zf1 || !zf2) + if (!msg1 || !msg2) + return false; + if (find_route_first (msg1, &r1) < 0) + return false; + if (find_route_first (msg2, &r2) < 0) + return false; + if (!r1 || !r2) return false; - if ((len = zframe_size (zf1)) != zframe_size (zf2) - || memcmp (zframe_data (zf1), zframe_data (zf2), len) != 0) + if (strcmp (r1->id, r2->id)) return false; return true; } diff --git a/src/common/libflux/message.h b/src/common/libflux/message.h index 47aa0a951a79..d67e19428b88 100644 --- a/src/common/libflux/message.h +++ b/src/common/libflux/message.h @@ -306,15 +306,14 @@ const char *flux_msg_typestr (int type); * routing. */ -/* Prepare a message for routing, which consists of pushing a nil delimiter - * frame and setting FLUX_MSGFLAG_ROUTE. This function is a no-op if the - * flag is already set. +/* Prepare a message for routing by setting FLUX_MSGFLAG_ROUTE. This + * function is a no-op if the flag is already set. * Returns 0 on success, -1 with errno set on failure. */ int flux_msg_enable_route (flux_msg_t *msg); -/* Strip route frames, nil delimiter, and clear FLUX_MSGFLAG_ROUTE flag. - * This function is a no-op if the flag is already clear. +/* Clear routes from msg and clear FLUX_MSGFLAG_ROUTE flag. This + * function is a no-op if the flag is already clear. * Returns 0 on success, -1 with errno set on failure. */ int flux_msg_clear_route (flux_msg_t *msg); @@ -331,14 +330,14 @@ int flux_msg_push_route (flux_msg_t *msg, const char *id); */ int flux_msg_pop_route (flux_msg_t *msg, char **id); -/* Copy the first routing frame (closest to delimiter) contents (or NULL) +/* Copy the first route (e.g. first pushed route) contents (or NULL) * to 'id'. Caller must free 'id'. * For requests, this is the sender; for responses, this is the recipient. * Returns 0 on success, -1 with errno set (e.g. EPROTO) on failure. */ int flux_msg_get_route_first (const flux_msg_t *msg, char **id); -/* Copy the last routing frame (farthest from delimiter) contents (or NULL) +/* Copy the last route (e.g. most recent pushed route) contents (or NULL) * to 'id'. Caller must free 'id'. * For requests, this is the last hop; for responses: this is the next hop. * Returns 0 on success, -1 with errno set (e.g. EPROTO) on failure.