Skip to content

Commit

Permalink
libflux: return routes via const in msg API
Browse files Browse the repository at this point in the history
Problem: The functions flux_msg_get_route_first() and
flux_msg_get_route_last() return route IDs via an allocated string
that must be freed by the user.  This API made sense previously
when route IDs were stored internally in a zframe.  However, now
that the route IDs are stored in decoded data structures, we could
return the route IDs directly.

Solution: Return internally stored route IDs via a const char *
instead of an allocated string.  Adjust all callers accordingly.
use 'const char *' instead of 'char *' in several places where
more appropriate.
  • Loading branch information
chu11 committed Jun 28, 2021
1 parent bdd80db commit 51a856e
Show file tree
Hide file tree
Showing 19 changed files with 79 additions and 129 deletions.
22 changes: 6 additions & 16 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1079,11 +1079,10 @@ static void broker_panic_cb (flux_t *h, flux_msg_handler_t *mh,
static void broker_disconnect_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
char *sender = NULL;
const char *sender;

if (flux_msg_get_route_first (msg, &sender) == 0) {
exec_terminate_subprocesses_by_uuid (h, sender);
free (sender);
}
/* no response */
}
Expand All @@ -1092,7 +1091,7 @@ static void broker_sub_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
broker_ctx_t *ctx = arg;
char *uuid = NULL;
const char *uuid;
const char *topic;

if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0)
Expand All @@ -1107,19 +1106,17 @@ static void broker_sub_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
free (uuid);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
free (uuid);
}

static void broker_unsub_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
broker_ctx_t *ctx = arg;
char *uuid = NULL;
const char *uuid;
const char *topic;

if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0)
Expand All @@ -1134,12 +1131,10 @@ static void broker_unsub_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
free (uuid);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
free (uuid);
}

static int route_to_handle (const flux_msg_t *msg, void *arg)
Expand Down Expand Up @@ -1175,7 +1170,7 @@ static void service_add_cb (flux_t *h, flux_msg_handler_t *w,
{
broker_ctx_t *ctx = arg;
const char *name = NULL;
char *sender = NULL;
const char *sender;
module_t *p;
struct flux_msg_cred cred;

Expand All @@ -1194,12 +1189,10 @@ static void service_add_cb (flux_t *h, flux_msg_handler_t *w,
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "service_add: flux_respond");
free (sender);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "service_add: flux_respond_error");
free (sender);
}

static void service_remove_cb (flux_t *h, flux_msg_handler_t *w,
Expand All @@ -1208,7 +1201,7 @@ static void service_remove_cb (flux_t *h, flux_msg_handler_t *w,
broker_ctx_t *ctx = arg;
const char *name;
const char *uuid;
char *sender = NULL;
const char *sender;
struct flux_msg_cred cred;

if (flux_request_unpack (msg, NULL, "{ s:s }", "service", &name) < 0
Expand All @@ -1229,12 +1222,10 @@ static void service_remove_cb (flux_t *h, flux_msg_handler_t *w,
service_remove (ctx->services, name);
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "service_remove: flux_respond");
free (sender);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "service_remove: flux_respond_error");
free (sender);
}


Expand Down Expand Up @@ -1693,7 +1684,7 @@ static void broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
{
int rc;
char *uuid = NULL;
const char *uuid;

if (flux_msg_get_route_last (msg, &uuid) < 0)
return -1;
Expand All @@ -1705,7 +1696,6 @@ static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
rc = overlay_sendmsg (ctx->overlay, msg, OVERLAY_DOWNSTREAM);
else
rc = module_response_sendmsg (ctx->modhash, msg);
ERRNO_SAFE_WRAP (free, uuid);
return rc;
}

Expand Down
15 changes: 5 additions & 10 deletions src/broker/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -596,24 +596,20 @@ static void dmesg_request_cb (flux_t *h, flux_msg_handler_t *mh,

static int cmp_sender (const flux_msg_t *msg, const char *uuid)
{
char *sender = NULL;
int rc = 0;
const char *sender;

if (flux_msg_get_route_first (msg, &sender) < 0)
goto done;
return 0;
if (!sender || strcmp (sender, uuid) != 0)
goto done;
rc = 1;
done:
free (sender);
return rc;
return 0;
return 1;
}

static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
logbuf_t *logbuf = arg;
char *sender = NULL;
const char *sender;
const flux_msg_t *msgp;
zlist_t *tmp = NULL;

Expand All @@ -635,7 +631,6 @@ static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *mh,
zlist_remove (logbuf->followers, (flux_msg_t *)msgp);
}
done:
free (sender);
zlist_destroy (&tmp);
/* no response */
}
Expand Down
14 changes: 5 additions & 9 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -340,26 +340,22 @@ int module_sendmsg (module_t *p, const flux_msg_t *msg)

int module_response_sendmsg (modhash_t *mh, const flux_msg_t *msg)
{
char *uuid = NULL;
int rc = -1;
const char *uuid;
module_t *p;

if (!msg)
return 0;
if (flux_msg_get_route_last (msg, &uuid) < 0)
goto done;
return -1;
if (!uuid) {
errno = EPROTO;
goto done;
return -1;
}
if (!(p = zhash_lookup (mh->zh_byuuid, uuid))) {
errno = ENOSYS;
goto done;
return -1;
}
rc = module_sendmsg (p, msg);
done:
free (uuid);
return rc;
return module_sendmsg (p, msg);
}

int module_disconnect_arm (module_t *p,
Expand Down
8 changes: 2 additions & 6 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ int overlay_sendmsg (struct overlay *ov,
int type;
uint8_t flags;
flux_msg_t *cpy = NULL;
char *uuid = NULL;
const char *uuid;
uint32_t nodeid;
struct child *child;
int rc;
Expand Down Expand Up @@ -466,13 +466,11 @@ int overlay_sendmsg (struct overlay *ov,
default:
goto inval;
}
free (uuid);
flux_msg_decref (cpy);
return 0;
inval:
errno = EINVAL;
error:
ERRNO_SAFE_WRAP (free, uuid);
flux_msg_decref (cpy);
return -1;
}
Expand Down Expand Up @@ -567,7 +565,7 @@ static void child_cb (flux_reactor_t *r, flux_watcher_t *w,
flux_msg_t *msg;
int type = -1;
const char *topic = NULL;
char *sender = NULL;
const char *sender = NULL;
struct child *child;
int status;

Expand Down Expand Up @@ -612,7 +610,6 @@ static void child_cb (flux_reactor_t *r, flux_watcher_t *w,
}
ov->recv_cb (msg, OVERLAY_DOWNSTREAM, ov->recv_arg);
handled:
free (sender);
flux_msg_decref (msg);
return;
drop:
Expand All @@ -623,7 +620,6 @@ static void child_cb (flux_reactor_t *r, flux_watcher_t *w,
type != -1 ? flux_msg_typestr (type) : "message",
topic ? topic : "-",
sender != NULL ? sender : "unknown");
free (sender);
flux_msg_decref (msg);
}

Expand Down
4 changes: 1 addition & 3 deletions src/broker/test/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ void trio (flux_t *h)
zsock_t *zsock_none;
zsock_t *zsock_curve;
zcert_t *cert;
char *sender;
const char *sender;

ctx[0] = ctx_create (h, "trio", size, 0, recv_cb);

Expand Down Expand Up @@ -313,7 +313,6 @@ void trio (flux_t *h)
ok (flux_msg_get_route_first (rmsg, &sender) == 0
&& sender != NULL && !strcmp (sender, ctx[1]->uuid),
"%s: received message sender is rank 1", ctx[0]->name);
free (sender);

/* Send request 0->1
* Side effect: during recvmsg_timeout(), reactor allows hello response
Expand All @@ -336,7 +335,6 @@ void trio (flux_t *h)
ok (flux_msg_get_route_first (rmsg, &sender) == 0
&& sender != NULL && !strcmp (sender, ctx[0]->uuid),
"%s: request sender is rank 0", ctx[1]->name);
free (sender);

/* Response 1->0
*/
Expand Down
6 changes: 2 additions & 4 deletions src/cmd/flux-start.c
Original file line number Diff line number Diff line change
Expand Up @@ -860,14 +860,12 @@ void disconnect_cb (flux_t *h,
const flux_msg_t *msg,
void *arg)
{
char *uuid = NULL;
const char *uuid;

if (flux_msg_get_route_first (msg, &uuid) < 0)
goto done;
return;
if (ctx.verbose >= 1)
log_msg ("disconnect from %.5s", uuid);
done:
free (uuid);
}

const struct flux_msg_handler_spec htab[] = {
Expand Down
4 changes: 1 addition & 3 deletions src/common/libflux/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ flux_msg_t *flux_recv (flux_t *h, struct flux_match match, int flags)
cali_begin_int (h->prof.msg_match_tag, match.matchtag);
cali_begin_string (h->prof.msg_match_glob,
match.topic_glob ? match.topic_glob : "NONE");
char *sender = NULL;
const char *sender = NULL;
flux_msg_get_route_first (msg, &sender);
if (sender)
cali_begin_string (h->prof.msg_sender, sender);
Expand All @@ -694,8 +694,6 @@ flux_msg_t *flux_recv (flux_t *h, struct flux_match match, int flags)
cali_end (h->prof.msg_match_type);
cali_end (h->prof.msg_match_tag);
cali_end (h->prof.msg_match_glob);

free (sender);
#endif
return msg;
fatal:
Expand Down
20 changes: 8 additions & 12 deletions src/common/libflux/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ int flux_msg_pop_route (flux_msg_t *msg, char **id)
}

/* replaces flux_msg_nexthop */
int flux_msg_get_route_last (const flux_msg_t *msg, char **id)
int flux_msg_get_route_last (const flux_msg_t *msg, const char **id)
{
struct route_id *r;

Expand All @@ -1013,10 +1013,8 @@ int flux_msg_get_route_last (const flux_msg_t *msg, char **id)
errno = EPROTO;
return -1;
}
if ((r = list_top (&msg->routes, struct route_id, route_id_node))) {
if (!((*id) = strdup (r->id)))
return -1;
}
if ((r = list_top (&msg->routes, struct route_id, route_id_node)))
(*id) = r->id;
else
(*id) = NULL;
return 0;
Expand All @@ -1033,7 +1031,7 @@ static int find_route_first (const flux_msg_t *msg, struct route_id **r)
}

/* replaces flux_msg_sender */
int flux_msg_get_route_first (const flux_msg_t *msg, char **id)
int flux_msg_get_route_first (const flux_msg_t *msg, const char **id)
{
struct route_id *r = NULL;

Expand All @@ -1043,10 +1041,8 @@ int flux_msg_get_route_first (const flux_msg_t *msg, char **id)
}
if (find_route_first (msg, &r) < 0)
return -1;
if (r) {
if (!((*id) = strdup (r->id)))
return -1;
}
if (r)
(*id) = r->id;
else
(*id) = NULL;
return 0;
Expand Down Expand Up @@ -1082,7 +1078,7 @@ static int flux_msg_get_route_size (const flux_msg_t *msg)
return size;
}

static char *flux_msg_get_route_nth (const flux_msg_t *msg, int n)
static const char *flux_msg_get_route_nth (const flux_msg_t *msg, int n)
{
struct route_id *r = NULL;
int count = 0;
Expand Down Expand Up @@ -1117,7 +1113,7 @@ char *flux_msg_get_route_string (const flux_msg_t *msg)
if (!(cp = buf = malloc (len + hops + 1)))
return NULL;
for (n = hops - 1; n >= 0; n--) {
char *id;
const char *id;
if (cp > buf)
*cp++ = '!';
if (!(id = flux_msg_get_route_nth (msg, n))) {
Expand Down
12 changes: 6 additions & 6 deletions src/common/libflux/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,19 +330,19 @@ int flux_msg_push_route (flux_msg_t *msg, const char *id);
*/
int flux_msg_pop_route (flux_msg_t *msg, char **id);

/* Copy the first route (e.g. first pushed route) contents (or NULL)
* to 'id'. Caller must free 'id'.
/* Return the first route (e.g. first pushed route) contents (or NULL)
* to 'id'.
* For requests, this is the sender; for responses, this is the recipient.
* Returns 0 on success, -1 with errno set (e.g. EPROTO) on failure.
*/
int flux_msg_get_route_first (const flux_msg_t *msg, char **id);
int flux_msg_get_route_first (const flux_msg_t *msg, const char **id);

/* Copy the last route (e.g. most recent pushed route) contents (or NULL)
* to 'id'. Caller must free 'id'.
/* Return the last route (e.g. most recent pushed route) contents (or NULL)
* to 'id'.
* For requests, this is the last hop; for responses: this is the next hop.
* Returns 0 on success, -1 with errno set (e.g. EPROTO) on failure.
*/
int flux_msg_get_route_last (const flux_msg_t *msg, char **id);
int flux_msg_get_route_last (const flux_msg_t *msg, const char **id);

/* Return the number of route frames in the message.
* It is an EPROTO error if there is no route stack.
Expand Down
Loading

0 comments on commit 51a856e

Please sign in to comment.