From 164bb3ce80ab59e76c89b4df79616446c900c608 Mon Sep 17 00:00:00 2001 From: "Christopher J. Morrone" Date: Fri, 10 Feb 2017 14:18:37 -0800 Subject: [PATCH] Rename all ctx_t structs to make them unique. Rename all ctx_t structs to give them a unique name. While they are ll used in ways that avoid conflict (usually local to a single file), it makes it more difficult for the new developer to navigate through the code when many different structs share the same name (because ools like cscope and ctags will show many definition locations for the struct rather than showing the one correct location). Fixes #972 --- src/broker/broker.c | 94 ++++++++++---------- src/broker/modservice.c | 18 ++-- src/cmd/builtin/proxy.c | 36 ++++---- src/common/libflux/attr.c | 24 +++--- src/connectors/local/local.c | 18 ++-- src/connectors/loop/loop.c | 14 +-- src/connectors/shmem/shmem.c | 26 +++--- src/connectors/ssh/ssh.c | 28 +++--- src/modules/barrier/barrier.c | 28 +++--- src/modules/barrier/libbarrier.c | 10 +-- src/modules/connector-local/local.c | 26 +++--- src/modules/content-sqlite/content-sqlite.c | 24 +++--- src/modules/kvs/kvs.c | 96 ++++++++++----------- src/modules/live/live.c | 80 ++++++++--------- src/modules/resource-hwloc/resource.c | 26 +++--- t/kz/kzutil.c | 10 +-- t/request/req.c | 22 ++--- 17 files changed, 290 insertions(+), 290 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index d8c9107e0d80..42185fddf17c 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -155,11 +155,11 @@ typedef struct { char *init_shell_cmd; size_t init_shell_cmd_len; struct subprocess *init_shell; -} ctx_t; +} broker_ctx_t; -static int broker_event_sendmsg (ctx_t *ctx, const flux_msg_t *msg); -static int broker_response_sendmsg (ctx_t *ctx, const flux_msg_t *msg); -static int broker_request_sendmsg (ctx_t *ctx, const flux_msg_t *msg, +static int broker_event_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg); +static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg); +static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg, request_error_mode_t errmode); static void event_cb (overlay_t *ov, void *sock, void *arg); @@ -172,15 +172,15 @@ static void shutdown_cb (shutdown_t *s, bool expired, void *arg); static void signal_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg); static void broker_block_signals (void); -static void broker_handle_signals (ctx_t *ctx, zlist_t *sigwatchers); +static void broker_handle_signals (broker_ctx_t *ctx, zlist_t *sigwatchers); static void broker_unhandle_signals (zlist_t *sigwatchers); -static void broker_add_services (ctx_t *ctx); +static void broker_add_services (broker_ctx_t *ctx); -static void load_modules (ctx_t *ctx, const char *default_modules); +static void load_modules (broker_ctx_t *ctx, const char *default_modules); -static void update_proctitle (ctx_t *ctx); -static void update_pidfile (ctx_t *ctx); +static void update_proctitle (broker_ctx_t *ctx); +static void update_pidfile (broker_ctx_t *ctx); static void runlevel_cb (runlevel_t *r, int level, int rc, double elapsed, const char *state, void *arg); static void runlevel_io_cb (runlevel_t *r, const char *name, @@ -191,12 +191,12 @@ static int create_scratchdir (attr_t *attrs); static int create_rankdir (attr_t *attrs, uint32_t rank); static int create_dummyattrs (flux_t *h, uint32_t rank, uint32_t size); -static int boot_pmi (ctx_t *ctx, double *elapsed_sec); +static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec); static int attr_get_snoop (const char *name, const char **val, void *arg); static int attr_get_overlay (const char *name, const char **val, void *arg); -static void init_attrs (ctx_t *ctx); +static void init_attrs (broker_ctx_t *ctx); static const struct flux_handle_ops broker_handle_ops; @@ -253,7 +253,7 @@ static int setup_profiling (const char *program, int rank) int main (int argc, char *argv[]) { int c; - ctx_t ctx; + broker_ctx_t ctx; zlist_t *sigwatchers; int security_clr = 0; int security_set = 0; @@ -747,7 +747,7 @@ static void init_attrs_from_environment (attr_t *attrs) } } -static void init_attrs_broker_pid (ctx_t *ctx) +static void init_attrs_broker_pid (broker_ctx_t *ctx) { char *attrname = "broker.pid"; char *pidval; @@ -761,7 +761,7 @@ static void init_attrs_broker_pid (ctx_t *ctx) free (pidval); } -static void init_attrs (ctx_t *ctx) +static void init_attrs (broker_ctx_t *ctx) { /* Initialize config attrs from environment set up by flux(1) */ @@ -774,7 +774,7 @@ static void init_attrs (ctx_t *ctx) static void hello_update_cb (hello_t *hello, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; if (hello_complete (hello)) { flux_log (ctx->h, LOG_INFO, "wireup: %d/%d (complete) %.1fs", @@ -794,12 +794,12 @@ static void hello_update_cb (hello_t *hello, void *arg) */ static void shutdown_cb (shutdown_t *s, bool expired, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; if (expired) exit (ctx->rank == 0 ? shutdown_get_rc (s) : 0); } -static void update_proctitle (ctx_t *ctx) +static void update_proctitle (broker_ctx_t *ctx) { char *s; if (asprintf (&s, "flux-broker-%"PRIu32, ctx->rank) < 0) @@ -810,7 +810,7 @@ static void update_proctitle (ctx_t *ctx) ctx->proctitle = s; } -static void update_pidfile (ctx_t *ctx) +static void update_pidfile (broker_ctx_t *ctx) { const char *rankdir; char *pidfile; @@ -834,7 +834,7 @@ static void update_pidfile (ctx_t *ctx) static void runlevel_io_cb (runlevel_t *r, const char *name, const char *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; int loglevel = !strcmp (name, "stderr") ? LOG_ERR : LOG_INFO; int runlevel = runlevel_get_level (r); @@ -846,7 +846,7 @@ static void runlevel_io_cb (runlevel_t *r, const char *name, static void runlevel_cb (runlevel_t *r, int level, int rc, double elapsed, const char *exit_string, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; int new_level = -1; flux_log (ctx->h, rc == 0 ? LOG_INFO : LOG_ERR, @@ -1078,7 +1078,7 @@ static int create_persistdir (attr_t *attrs, uint32_t rank) return rc; } -static int boot_pmi (ctx_t *ctx, double *elapsed_sec) +static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) { const char *scratch_dir; int spawned, size, rank, appnum; @@ -1364,7 +1364,7 @@ static int mod_svc_cb (const flux_msg_t *msg, void *arg) /* Load command line/default comms modules. If module name contains * one or more '/' characters, it refers to a .so path. */ -static void load_modules (ctx_t *ctx, const char *default_modules) +static void load_modules (broker_ctx_t *ctx, const char *default_modules) { char *cpy = xstrdup (default_modules); char *s, *saveptr = NULL, *a1 = cpy; @@ -1419,7 +1419,7 @@ static void broker_block_signals (void) log_err_exit ("sigprocmask"); } -static void broker_handle_signals (ctx_t *ctx, zlist_t *sigwatchers) +static void broker_handle_signals (broker_ctx_t *ctx, zlist_t *sigwatchers) { int i, sigs[] = { SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGSEGV, SIGFPE }; flux_watcher_t *w; @@ -1480,7 +1480,7 @@ static int attr_get_overlay (const char *name, const char **val, void *arg) static void cmb_rmmod_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; const char *json_str; char *name = NULL; module_t *p; @@ -1512,7 +1512,7 @@ static void cmb_rmmod_cb (flux_t *h, flux_msg_handler_t *w, static void cmb_insmod_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; const char *json_str; char *name = NULL; char *path = NULL; @@ -1564,7 +1564,7 @@ static void cmb_insmod_cb (flux_t *h, flux_msg_handler_t *w, static void cmb_lsmod_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; flux_modlist_t *mods = NULL; char *json_str = NULL; @@ -1589,7 +1589,7 @@ static void cmb_lsmod_cb (flux_t *h, flux_msg_handler_t *w, static void cmb_lspeer_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; json_object *out = overlay_lspeer_encode (ctx->overlay); if (flux_respond (h, msg, 0, Jtostr (out)) < 0) @@ -1600,7 +1600,7 @@ static void cmb_lspeer_cb (flux_t *h, flux_msg_handler_t *w, static void cmb_reparent_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; json_object *in = NULL; const char *uri; const char *json_str; @@ -1649,7 +1649,7 @@ static void cmb_panic_cb (flux_t *h, flux_msg_handler_t *w, static void cmb_event_mute_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; char *uuid = NULL; if (flux_msg_get_route_last (msg, &uuid) == 0) @@ -1673,7 +1673,7 @@ static void cmb_disconnect_cb (flux_t *h, flux_msg_handler_t *w, static void cmb_sub_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; const char *json_str; char *uuid = NULL; json_object *in = NULL; @@ -1708,7 +1708,7 @@ static void cmb_sub_cb (flux_t *h, flux_msg_handler_t *w, static void cmb_unsub_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; const char *json_str; char *uuid = NULL; json_object *in = NULL; @@ -1742,7 +1742,7 @@ static void cmb_unsub_cb (flux_t *h, flux_msg_handler_t *w, static int route_to_handle (const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; if (flux_requeue (ctx->h, msg, FLUX_RQ_TAIL) < 0) flux_log_error (ctx->h, "%s: flux_requeue\n", __FUNCTION__); return 0; @@ -1787,7 +1787,7 @@ static struct internal_service services[] = { * First loop is for services that are registered in other files. * Second loop is for services registered here. */ -static void broker_add_services (ctx_t *ctx) +static void broker_add_services (broker_ctx_t *ctx) { struct internal_service *svc; for (svc = &services[0]; svc->topic != NULL; svc++) { @@ -1816,7 +1816,7 @@ static void broker_add_services (ctx_t *ctx) */ static void child_cb (overlay_t *ov, void *sock, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; int type; char *uuid = NULL; flux_msg_t *msg = flux_msg_recvzsock (sock); @@ -1862,7 +1862,7 @@ static void child_cb (overlay_t *ov, void *sock, void *arg) } /* helper for event_cb, parent_cb, and (on rank 0) broker_event_sendmsg */ -static int handle_event (ctx_t *ctx, const flux_msg_t *msg) +static int handle_event (broker_ctx_t *ctx, const flux_msg_t *msg) { uint32_t seq; const char *topic, *s; @@ -1925,7 +1925,7 @@ static void send_mute_request (flux_t *h, void *sock) */ static void parent_cb (overlay_t *ov, void *sock, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; flux_msg_t *msg = flux_msg_recvzsock (sock); int type; @@ -1966,7 +1966,7 @@ static void parent_cb (overlay_t *ov, void *sock, void *arg) */ static void module_cb (module_t *p, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; flux_msg_t *msg = module_recvmsg (p); int type; int ka_errnum, ka_status; @@ -2011,7 +2011,7 @@ static void module_cb (module_t *p, void *arg) static void module_status_cb (module_t *p, int prev_status, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; flux_msg_t *msg; int status = module_get_status (p); const char *name = module_get_name (p); @@ -2049,7 +2049,7 @@ static void module_status_cb (module_t *p, int prev_status, void *arg) static void event_cb (overlay_t *ov, void *sock, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; flux_msg_t *msg = overlay_recvmsg_event (ov); int type; @@ -2075,7 +2075,7 @@ static void event_cb (overlay_t *ov, void *sock, void *arg) static void signal_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { - ctx_t *ctx = arg; + broker_ctx_t *ctx = arg; int signum = flux_signal_watcher_get_signum (w); shutdown_arm (ctx->shutdown, ctx->shutdown_grace, 0, @@ -2092,7 +2092,7 @@ static void signal_cb (flux_reactor_t *r, flux_watcher_t *w, * sending end by pushing the identity of the sender onto the stack, * followed by the identity of the peer we want to route the message to. */ -static int subvert_sendmsg_child (ctx_t *ctx, const flux_msg_t *msg, +static int subvert_sendmsg_child (broker_ctx_t *ctx, const flux_msg_t *msg, uint32_t nodeid) { flux_msg_t *cpy = flux_msg_copy (msg, true); @@ -2125,7 +2125,7 @@ static int subvert_sendmsg_child (ctx_t *ctx, const flux_msg_t *msg, * any local errors do not trigger a response, and function * returns -1 with errno set. */ -static int broker_request_sendmsg (ctx_t *ctx, const flux_msg_t *msg, +static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg, request_error_mode_t errmode) { uint32_t nodeid, gw; @@ -2179,7 +2179,7 @@ static int broker_request_sendmsg (ctx_t *ctx, const flux_msg_t *msg, return 0; } -static int broker_response_sendmsg (ctx_t *ctx, const flux_msg_t *msg) +static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg) { int rc = -1; char *uuid = NULL; @@ -2223,7 +2223,7 @@ static int broker_response_sendmsg (ctx_t *ctx, const flux_msg_t *msg) * Rank 0 doesn't (generally) receive the events it transmits so we have * to "loop back" here via handle_event(). */ -static int broker_event_sendmsg (ctx_t *ctx, const flux_msg_t *msg) +static int broker_event_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg) { flux_msg_t *cpy = NULL; int rc = -1; @@ -2256,7 +2256,7 @@ static int broker_event_sendmsg (ctx_t *ctx, const flux_msg_t *msg) static int broker_send (void *impl, const flux_msg_t *msg, int flags) { - ctx_t *ctx = impl; + broker_ctx_t *ctx = impl; int type; int rc = -1; @@ -2284,7 +2284,7 @@ static int broker_send (void *impl, const flux_msg_t *msg, int flags) static int broker_subscribe (void *impl, const char *topic) { - ctx_t *ctx = impl; + broker_ctx_t *ctx = impl; if (zlist_append (ctx->subscriptions, xstrdup (topic))) oom(); return 0; @@ -2292,7 +2292,7 @@ static int broker_subscribe (void *impl, const char *topic) static int broker_unsubscribe (void *impl, const char *topic) { - ctx_t *ctx = impl; + broker_ctx_t *ctx = impl; char *s = zlist_first (ctx->subscriptions); while (s) { if (!strcmp (s, topic)) { diff --git a/src/broker/modservice.c b/src/broker/modservice.c index b1eee3aa9382..cf556589db4f 100644 --- a/src/broker/modservice.c +++ b/src/broker/modservice.c @@ -56,11 +56,11 @@ typedef struct { zlist_t *handlers; flux_watcher_t *w_prepare; flux_watcher_t *w_check; -} ctx_t; +} modservice_ctx_t; static void freectx (void *arg) { - ctx_t *ctx = arg; + modservice_ctx_t *ctx = arg; flux_msg_handler_t *w; while ((w = zlist_pop (ctx->handlers))) flux_msg_handler_destroy (w); @@ -70,9 +70,9 @@ static void freectx (void *arg) free (ctx); } -static ctx_t *getctx (flux_t *h, module_t *p) +static modservice_ctx_t *getctx (flux_t *h, module_t *p) { - ctx_t *ctx = flux_aux_get (h, "flux::modservice"); + modservice_ctx_t *ctx = flux_aux_get (h, "flux::modservice"); if (!ctx) { ctx = xzmalloc (sizeof (*ctx)); @@ -182,7 +182,7 @@ static void shutdown_cb (flux_t *h, flux_msg_handler_t *w, static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { - ctx_t *ctx = arg; + modservice_ctx_t *ctx = arg; flux_msg_t *msg = flux_keepalive_encode (0, FLUX_MODSTATE_SLEEPING); if (!msg || flux_send (ctx->h, msg, 0) < 0) flux_log_error (ctx->h, "error sending keepalive"); @@ -194,14 +194,14 @@ static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w, static void check_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { - ctx_t *ctx = arg; + modservice_ctx_t *ctx = arg; flux_msg_t *msg = flux_keepalive_encode (0, FLUX_MODSTATE_RUNNING); if (!msg || flux_send (ctx->h, msg, 0) < 0) flux_log_error (ctx->h, "error sending keepalive"); flux_msg_destroy (msg); } -static void register_event (ctx_t *ctx, const char *name, +static void register_event (modservice_ctx_t *ctx, const char *name, flux_msg_handler_f cb) { struct flux_match match = FLUX_MATCH_EVENT; @@ -219,7 +219,7 @@ static void register_event (ctx_t *ctx, const char *name, free (match.topic_glob); } -static void register_request (ctx_t *ctx, const char *name, +static void register_request (modservice_ctx_t *ctx, const char *name, flux_msg_handler_f cb) { struct flux_match match = FLUX_MATCH_REQUEST; @@ -236,7 +236,7 @@ static void register_request (ctx_t *ctx, const char *name, void modservice_register (flux_t *h, module_t *p) { - ctx_t *ctx = getctx (h, p); + modservice_ctx_t *ctx = getctx (h, p); flux_reactor_t *r = flux_get_reactor (h); register_request (ctx, "shutdown", shutdown_cb); diff --git a/src/cmd/builtin/proxy.c b/src/cmd/builtin/proxy.c index 2b818ac3b53c..21cf3ee2a1f0 100644 --- a/src/cmd/builtin/proxy.c +++ b/src/cmd/builtin/proxy.c @@ -63,7 +63,7 @@ typedef struct { struct subprocess *p; bool oneshot; int exit_code; -} ctx_t; +} proxy_ctx_t; typedef void (*unsubscribe_f)(void *handle, const char *topic); @@ -82,7 +82,7 @@ typedef struct { struct flux_msg_iobuf inbuf; struct flux_msg_iobuf outbuf; zlist_t *outqueue; /* queue of outbound flux_msg_t */ - ctx_t *ctx; + proxy_ctx_t *ctx; zhash_t *disconnect_notify; zhash_t *subscriptions; zuuid_t *uuid; @@ -101,7 +101,7 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, static void client_write_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg); -static void ctx_destroy (ctx_t *ctx) +static void ctx_destroy (proxy_ctx_t *ctx) { if (ctx) { zlist_destroy (&ctx->clients); @@ -114,9 +114,9 @@ static void ctx_destroy (ctx_t *ctx) } } -static ctx_t *ctx_create (flux_t *h) +static proxy_ctx_t *ctx_create (flux_t *h) { - ctx_t *ctx = xzmalloc (sizeof (*ctx)); + proxy_ctx_t *ctx = xzmalloc (sizeof (*ctx)); ctx->h = h; if (!(ctx->reactor = flux_reactor_create (SIGCHLD))) log_err_exit ("flux_reactor_create"); @@ -150,7 +150,7 @@ static int set_nonblock (int fd, bool nonblock) return 0; } -static client_t * client_create (ctx_t *ctx, int rfd, int wfd) +static client_t * client_create (proxy_ctx_t *ctx, int rfd, int wfd) { client_t *c; flux_t *h = ctx->h; @@ -252,7 +252,7 @@ static void subscription_destroy (void *data) free (sub); } -static int global_subscribe (ctx_t *ctx, const char *topic) +static int global_subscribe (proxy_ctx_t *ctx, const char *topic) { subscription_t *sub; int rc = -1; @@ -277,7 +277,7 @@ static int global_subscribe (ctx_t *ctx, const char *topic) return rc; } -static int global_unsubscribe (ctx_t *ctx, const char *topic) +static int global_unsubscribe (proxy_ctx_t *ctx, const char *topic) { subscription_t *sub; int rc = -1; @@ -477,7 +477,7 @@ static void client_write_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { client_t *c = arg; - ctx_t *ctx = c->ctx; + proxy_ctx_t *ctx = c->ctx; if (revents & FLUX_POLLERR) goto disconnect; @@ -529,7 +529,7 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { client_t *c = arg; - ctx_t *ctx = c->ctx; + proxy_ctx_t *ctx = c->ctx; flux_t *h = ctx->h; flux_msg_t *msg = NULL; int type; @@ -595,7 +595,7 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, static void response_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + proxy_ctx_t *ctx = arg; char *uuid = NULL; client_t *c; flux_msg_t *cpy = flux_msg_copy (msg, true); @@ -640,7 +640,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, static void event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + proxy_ctx_t *ctx = arg; client_t *c; const char *topic; int count = 0; @@ -669,7 +669,7 @@ static void event_cb (flux_t *h, flux_msg_handler_t *w, //flux_log (h, LOG_DEBUG, "%s: %s to %d clients", __FUNCTION__, topic, count); } -static int check_cred (ctx_t *ctx, int fd) +static int check_cred (proxy_ctx_t *ctx, int fd) { struct ucred ucred; socklen_t crlen = sizeof (ucred); @@ -696,7 +696,7 @@ static void listener_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { int fd = flux_fd_watcher_get_fd (w); - ctx_t *ctx = arg; + proxy_ctx_t *ctx = arg; flux_t *h = ctx->h; if (revents & FLUX_POLLIN) { @@ -725,7 +725,7 @@ static void listener_cb (flux_reactor_t *r, flux_watcher_t *w, return; } -static int listener_init (ctx_t *ctx, char *sockpath) +static int listener_init (proxy_ctx_t *ctx, char *sockpath) { struct sockaddr_un addr; int fd; @@ -820,7 +820,7 @@ static char *findjob (const char *job) static int child_cb (struct subprocess *p) { - ctx_t *ctx = subprocess_get_context (p, "ctx"); + proxy_ctx_t *ctx = subprocess_get_context (p, "ctx"); ctx->exit_code = subprocess_exit_code (p); flux_reactor_stop (ctx->reactor); @@ -828,7 +828,7 @@ static int child_cb (struct subprocess *p) return 0; } -static int child_create (ctx_t *ctx, int ac, char **av, const char *workpath) +static int child_create (proxy_ctx_t *ctx, int ac, char **av, const char *workpath) { const char *shell = getenv ("SHELL"); char *argz = NULL; @@ -890,7 +890,7 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[]) { flux_t *h = NULL; int n; - ctx_t *ctx; + proxy_ctx_t *ctx; const char *tmpdir = getenv ("TMPDIR"); char workpath[PATH_MAX + 1]; char sockpath[PATH_MAX + 1]; diff --git a/src/common/libflux/attr.c b/src/common/libflux/attr.c index 348c800892dd..6d22af16cd80 100644 --- a/src/common/libflux/attr.c +++ b/src/common/libflux/attr.c @@ -40,7 +40,7 @@ typedef struct { zhash_t *hash; zlist_t *names; flux_t *h; -} ctx_t; +} attr_ctx_t; typedef struct { char *val; @@ -49,15 +49,15 @@ typedef struct { static void freectx (void *arg) { - ctx_t *ctx = arg; + attr_ctx_t *ctx = arg; zhash_destroy (&ctx->hash); zlist_destroy (&ctx->names); free (ctx); } -static ctx_t *getctx (flux_t *h) +static attr_ctx_t *getctx (flux_t *h) { - ctx_t *ctx = flux_aux_get (h, "flux::attr"); + attr_ctx_t *ctx = flux_aux_get (h, "flux::attr"); if (!ctx) { ctx = xzmalloc (sizeof (*ctx)); @@ -84,7 +84,7 @@ static attr_t *attr_create (const char *val, int flags) return attr; } -static int attr_get_rpc (ctx_t *ctx, const char *name, attr_t **attrp) +static int attr_get_rpc (attr_ctx_t *ctx, const char *name, attr_t **attrp) { flux_rpc_t *r; const char *val; @@ -107,7 +107,7 @@ static int attr_get_rpc (ctx_t *ctx, const char *name, attr_t **attrp) return rc; } -static int attr_set_rpc (ctx_t *ctx, const char *name, const char *val) +static int attr_set_rpc (attr_ctx_t *ctx, const char *name, const char *val) { flux_rpc_t *r; attr_t *attr; @@ -150,7 +150,7 @@ static int attr_strcmp (const char *s1, const char *s2) } #endif -static int attr_list_rpc (ctx_t *ctx) +static int attr_list_rpc (attr_ctx_t *ctx) { flux_rpc_t *r; json_t *array, *value; @@ -182,7 +182,7 @@ static int attr_list_rpc (ctx_t *ctx) const char *flux_attr_get (flux_t *h, const char *name, int *flags) { - ctx_t *ctx = getctx (h); + attr_ctx_t *ctx = getctx (h); attr_t *attr; if (!(attr = zhash_lookup (ctx->hash, name)) @@ -196,7 +196,7 @@ const char *flux_attr_get (flux_t *h, const char *name, int *flags) int flux_attr_set (flux_t *h, const char *name, const char *val) { - ctx_t *ctx = getctx (h); + attr_ctx_t *ctx = getctx (h); if (attr_set_rpc (ctx, name, val) < 0) return -1; @@ -205,7 +205,7 @@ int flux_attr_set (flux_t *h, const char *name, const char *val) int flux_attr_fake (flux_t *h, const char *name, const char *val, int flags) { - ctx_t *ctx = getctx (h); + attr_ctx_t *ctx = getctx (h); attr_t *attr = attr_create (val, flags); zhash_update (ctx->hash, name, attr); zhash_freefn (ctx->hash, name, attr_destroy); @@ -214,7 +214,7 @@ int flux_attr_fake (flux_t *h, const char *name, const char *val, int flags) const char *flux_attr_first (flux_t *h) { - ctx_t *ctx = getctx (h); + attr_ctx_t *ctx = getctx (h); if (attr_list_rpc (ctx) < 0) return NULL; @@ -223,7 +223,7 @@ const char *flux_attr_first (flux_t *h) const char *flux_attr_next (flux_t *h) { - ctx_t *ctx = flux_aux_get (h, "flux::attr"); + attr_ctx_t *ctx = flux_aux_get (h, "flux::attr"); return ctx->names ? zlist_next (ctx->names) : NULL; } diff --git a/src/connectors/local/local.c b/src/connectors/local/local.c index 21cdd2a72eb6..5ae900b50b89 100644 --- a/src/connectors/local/local.c +++ b/src/connectors/local/local.c @@ -48,11 +48,11 @@ typedef struct { struct flux_msg_iobuf outbuf; struct flux_msg_iobuf inbuf; flux_t *h; -} ctx_t; +} local_ctx_t; static const struct flux_handle_ops handle_ops; -static int set_nonblock (ctx_t *c, int nonblock) +static int set_nonblock (local_ctx_t *c, int nonblock) { int flags; if (c->fd_nonblock != nonblock) { @@ -68,7 +68,7 @@ static int set_nonblock (ctx_t *c, int nonblock) static int op_pollevents (void *impl) { - ctx_t *c = impl; + local_ctx_t *c = impl; struct pollfd pfd = { .fd = c->fd, .events = POLLIN | POLLOUT | POLLERR | POLLHUP, @@ -95,13 +95,13 @@ static int op_pollevents (void *impl) static int op_pollfd (void *impl) { - ctx_t *c = impl; + local_ctx_t *c = impl; return c->fd; } static int op_send (void *impl, const flux_msg_t *msg, int flags) { - ctx_t *c = impl; + local_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); if (set_nonblock (c, (flags & FLUX_O_NONBLOCK)) < 0) @@ -113,7 +113,7 @@ static int op_send (void *impl, const flux_msg_t *msg, int flags) static flux_msg_t *op_recv (void *impl, int flags) { - ctx_t *c = impl; + local_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); if (set_nonblock (c, (flags & FLUX_O_NONBLOCK)) < 0) @@ -123,7 +123,7 @@ static flux_msg_t *op_recv (void *impl, int flags) static int op_event (void *impl, const char *topic, const char *msg_topic) { - ctx_t *c = impl; + local_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); flux_rpc_t *rpc = NULL; int rc = 0; @@ -150,7 +150,7 @@ static int op_event_unsubscribe (void *impl, const char *topic) static void op_fini (void *impl) { - ctx_t *c = impl; + local_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); flux_msg_iobuf_clean (&c->outbuf); @@ -189,7 +189,7 @@ static int env_getint (char *name, int dflt) */ flux_t *connector_init (const char *path, int flags) { - ctx_t *c = NULL; + local_ctx_t *c = NULL; struct sockaddr_un addr; char pidfile[PATH_MAX + 1]; char sockfile[PATH_MAX + 1]; diff --git a/src/connectors/loop/loop.c b/src/connectors/loop/loop.c index 549afc278e31..487b637e1835 100644 --- a/src/connectors/loop/loop.c +++ b/src/connectors/loop/loop.c @@ -45,7 +45,7 @@ typedef struct { int pollevents; msglist_t *queue; -} ctx_t; +} loop_ctx_t; static const struct flux_handle_ops handle_ops; @@ -54,7 +54,7 @@ const char *fake_uuid = "12345678123456781234567812345678"; static int op_pollevents (void *impl) { - ctx_t *c = impl; + loop_ctx_t *c = impl; int e, revents = 0; if ((e = msglist_pollevents (c->queue)) < 0) @@ -70,13 +70,13 @@ static int op_pollevents (void *impl) static int op_pollfd (void *impl) { - ctx_t *c = impl; + loop_ctx_t *c = impl; return msglist_pollfd (c->queue); } static int op_send (void *impl, const flux_msg_t *msg, int flags) { - ctx_t *c = impl; + loop_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); int type; flux_msg_t *cpy = NULL; @@ -98,7 +98,7 @@ static int op_send (void *impl, const flux_msg_t *msg, int flags) static flux_msg_t *op_recv (void *impl, int flags) { - ctx_t *c = impl; + loop_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); flux_msg_t *msg = msglist_pop (c->queue); if (!msg) @@ -108,7 +108,7 @@ static flux_msg_t *op_recv (void *impl, int flags) static void op_fini (void *impl) { - ctx_t *c = impl; + loop_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); if (c->pollfd >= 0) @@ -120,7 +120,7 @@ static void op_fini (void *impl) flux_t *connector_init (const char *path, int flags) { - ctx_t *c = malloc (sizeof (*c)); + loop_ctx_t *c = malloc (sizeof (*c)); if (!c) { errno = ENOMEM; goto error; diff --git a/src/connectors/shmem/shmem.c b/src/connectors/shmem/shmem.c index 73698308c60a..47acc2123107 100644 --- a/src/connectors/shmem/shmem.c +++ b/src/connectors/shmem/shmem.c @@ -44,15 +44,15 @@ typedef struct { char *uri; flux_t *h; zctx_t *zctx; -} ctx_t; +} shmem_ctx_t; static const struct flux_handle_ops handle_ops; -static int connect_socket (ctx_t *ctx); +static int connect_socket (shmem_ctx_t *ctx); static int op_pollevents (void *impl) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); uint32_t e; size_t esize = sizeof (e); @@ -76,7 +76,7 @@ static int op_pollevents (void *impl) static int op_pollfd (void *impl) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); int fd = -1; size_t fdsize = sizeof (fd); @@ -92,7 +92,7 @@ static int op_pollfd (void *impl) static int op_send (void *impl, const flux_msg_t *msg, int flags) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); flux_msg_t *cpy = NULL; int type; @@ -131,7 +131,7 @@ static int op_send (void *impl, const flux_msg_t *msg, int flags) static flux_msg_t *op_recv (void *impl, int flags) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); zmq_pollitem_t zp = { .events = ZMQ_POLLIN, .socket = ctx->sock, .revents = 0, .fd = -1, @@ -155,7 +155,7 @@ static flux_msg_t *op_recv (void *impl, int flags) static int op_event_subscribe (void *impl, const char *topic) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); json_object *in = Jnew (); flux_rpc_t *rpc = NULL; @@ -176,7 +176,7 @@ static int op_event_subscribe (void *impl, const char *topic) static int op_event_unsubscribe (void *impl, const char *topic) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); json_object *in = Jnew (); flux_rpc_t *rpc = NULL; @@ -197,7 +197,7 @@ static int op_event_unsubscribe (void *impl, const char *topic) static int op_getopt (void *impl, const char *option, void *val, size_t size) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); int rc = -1; @@ -219,7 +219,7 @@ static int op_getopt (void *impl, const char *option, void *val, size_t size) static int op_setopt (void *impl, const char *option, const void *val, size_t size) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); size_t val_size; int rc = -1; @@ -244,7 +244,7 @@ static int op_setopt (void *impl, const char *option, static void op_fini (void *impl) { - ctx_t *ctx = impl; + shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); if (ctx->sock) zsocket_destroy (ctx->zctx, ctx->sock); @@ -259,7 +259,7 @@ static void op_fini (void *impl) /* We have to defer connection until the zctx is available to us. * This function is idempotent. */ -static int connect_socket (ctx_t *ctx) +static int connect_socket (shmem_ctx_t *ctx) { if (!ctx->sock) { if (!ctx->zctx) { @@ -289,7 +289,7 @@ flux_t *connector_init (const char *path, int flags) 1, &uuid, (const void **)&path, &length); #endif - ctx_t *ctx = NULL; + shmem_ctx_t *ctx = NULL; if (!path) { errno = EINVAL; goto error; diff --git a/src/connectors/ssh/ssh.c b/src/connectors/ssh/ssh.c index b6b4626ef7a7..c01e3b100351 100644 --- a/src/connectors/ssh/ssh.c +++ b/src/connectors/ssh/ssh.c @@ -59,11 +59,11 @@ typedef struct { int ssh_argc; struct popen2_child *p; flux_t *h; -} ctx_t; +} ssh_ctx_t; static const struct flux_handle_ops handle_ops; -static int set_nonblock (ctx_t *c, int nonblock) +static int set_nonblock (ssh_ctx_t *c, int nonblock) { int flags; if (c->fd_nonblock != nonblock) { @@ -79,7 +79,7 @@ static int set_nonblock (ctx_t *c, int nonblock) static int op_pollevents (void *impl) { - ctx_t *c = impl; + ssh_ctx_t *c = impl; struct pollfd pfd = { .fd = c->fd, .events = POLLIN | POLLOUT | POLLERR | POLLHUP, @@ -106,13 +106,13 @@ static int op_pollevents (void *impl) static int op_pollfd (void *impl) { - ctx_t *c = impl; + ssh_ctx_t *c = impl; return c->fd; } static int op_send (void *impl, const flux_msg_t *msg, int flags) { - ctx_t *c = impl; + ssh_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); if (set_nonblock (c, (flags & FLUX_O_NONBLOCK)) < 0) @@ -124,7 +124,7 @@ static int op_send (void *impl, const flux_msg_t *msg, int flags) static flux_msg_t *op_recv (void *impl, int flags) { - ctx_t *c = impl; + ssh_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); if (set_nonblock (c, (flags & FLUX_O_NONBLOCK)) < 0) @@ -134,7 +134,7 @@ static flux_msg_t *op_recv (void *impl, int flags) static int op_event_subscribe (void *impl, const char *topic) { - ctx_t *c = impl; + ssh_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); flux_rpc_t *rpc = NULL; json_object *in = Jnew (); @@ -153,7 +153,7 @@ static int op_event_subscribe (void *impl, const char *topic) static int op_event_unsubscribe (void *impl, const char *topic) { - ctx_t *c = impl; + ssh_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); flux_rpc_t *rpc = NULL; json_object *in = Jnew (); @@ -172,7 +172,7 @@ static int op_event_unsubscribe (void *impl, const char *topic) static void op_fini (void *impl) { - ctx_t *c = impl; + ssh_ctx_t *c = impl; assert (c->magic == CTX_MAGIC); flux_msg_iobuf_clean (&c->outbuf); @@ -189,7 +189,7 @@ static void op_fini (void *impl) free (c); } -static int parse_ssh_port (ctx_t *c, const char *path) +static int parse_ssh_port (ssh_ctx_t *c, const char *path) { char *p, *cpy = NULL; int rc = -1; @@ -214,7 +214,7 @@ static int parse_ssh_port (ctx_t *c, const char *path) return rc; } -static int parse_ssh_user_at_host (ctx_t *c, const char *path) +static int parse_ssh_user_at_host (ssh_ctx_t *c, const char *path) { char *p, *cpy = NULL; int rc = -1; @@ -289,7 +289,7 @@ static char *which (const char *prog, char *buf, size_t size) return result; } -static int parse_ssh_rcmd (ctx_t *c, const char *path) +static int parse_ssh_rcmd (ssh_ctx_t *c, const char *path) { char *cpy = NULL, *local = NULL, *extra = NULL; char *proxy_argz = NULL; @@ -344,7 +344,7 @@ static int parse_ssh_rcmd (ctx_t *c, const char *path) return rc; } -static int test_broker_connection (ctx_t *c) +static int test_broker_connection (ssh_ctx_t *c) { flux_msg_t *in = NULL; flux_msg_t *out = NULL; @@ -373,7 +373,7 @@ static int test_broker_connection (ctx_t *c) */ flux_t *connector_init (const char *path, int flags) { - ctx_t *c = NULL; + ssh_ctx_t *c = NULL; if (!(c = malloc (sizeof (*c)))) { errno = ENOMEM; diff --git a/src/modules/barrier/barrier.c b/src/modules/barrier/barrier.c index 1098dc8c3d1f..5611098ee5bc 100644 --- a/src/modules/barrier/barrier.c +++ b/src/modules/barrier/barrier.c @@ -46,14 +46,14 @@ typedef struct { bool timer_armed; flux_watcher_t *timer; uint32_t rank; -} ctx_t; +} barrier_ctx_t; typedef struct _barrier_struct { char *name; int nprocs; int count; zhash_t *clients; - ctx_t *ctx; + barrier_ctx_t *ctx; int errnum; flux_watcher_t *debug_timer; } barrier_t; @@ -64,7 +64,7 @@ static void timeout_cb (flux_reactor_t *r, flux_watcher_t *w, static void freectx (void *arg) { - ctx_t *ctx = arg; + barrier_ctx_t *ctx = arg; if (ctx) { zhash_destroy (&ctx->barriers); if (ctx->timer) @@ -73,9 +73,9 @@ static void freectx (void *arg) } } -static ctx_t *getctx (flux_t *h) +static barrier_ctx_t *getctx (flux_t *h) { - ctx_t *ctx = (ctx_t *)flux_aux_get (h, "flux::barrier"); + barrier_ctx_t *ctx = (barrier_ctx_t *)flux_aux_get (h, "flux::barrier"); if (!ctx) { ctx = xzmalloc (sizeof (*ctx)); @@ -123,7 +123,7 @@ static void barrier_destroy (void *arg) return; } -static barrier_t *barrier_create (ctx_t *ctx, const char *name, int nprocs) +static barrier_t *barrier_create (barrier_ctx_t *ctx, const char *name, int nprocs) { barrier_t *b; @@ -163,7 +163,7 @@ static int barrier_add_client (barrier_t *b, char *sender, const flux_msg_t *msg return 0; } -static void send_enter_request (ctx_t *ctx, barrier_t *b) +static void send_enter_request (barrier_ctx_t *ctx, barrier_t *b) { flux_rpc_t *rpc; @@ -185,7 +185,7 @@ static void send_enter_request (ctx_t *ctx, barrier_t *b) static int timeout_reduction (const char *key, void *item, void *arg) { - ctx_t *ctx = arg; + barrier_ctx_t *ctx = arg; barrier_t *b = item; if (b->count > 0) { @@ -205,7 +205,7 @@ static int timeout_reduction (const char *key, void *item, void *arg) static void enter_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + barrier_ctx_t *ctx = arg; barrier_t *b; char *sender = NULL; const char *name; @@ -263,7 +263,7 @@ static void enter_request_cb (flux_t *h, flux_msg_handler_t *w, static int disconnect (const char *key, void *item, void *arg) { barrier_t *b = item; - ctx_t *ctx = b->ctx; + barrier_ctx_t *ctx = b->ctx; char *sender = arg; if (zhash_lookup (b->clients, sender)) { @@ -276,7 +276,7 @@ static int disconnect (const char *key, void *item, void *arg) static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + barrier_ctx_t *ctx = arg; char *sender; if (flux_msg_get_route_first (msg, &sender) < 0) @@ -314,7 +314,7 @@ static int exit_event_send (flux_t *h, const char *name, int errnum) static void exit_event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + barrier_ctx_t *ctx = arg; barrier_t *b; const char *name; int errnum; @@ -335,7 +335,7 @@ static void exit_event_cb (flux_t *h, flux_msg_handler_t *w, static void timeout_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { - ctx_t *ctx = arg; + barrier_ctx_t *ctx = arg; assert (ctx->rank != 0); ctx->timer_armed = false; /* one shot */ @@ -353,7 +353,7 @@ static struct flux_msg_handler_spec htab[] = { int mod_main (flux_t *h, int argc, char **argv) { int rc = -1; - ctx_t *ctx = getctx (h); + barrier_ctx_t *ctx = getctx (h); if (!ctx) goto done; diff --git a/src/modules/barrier/libbarrier.c b/src/modules/barrier/libbarrier.c index 30399025a300..80358aa6cbc4 100644 --- a/src/modules/barrier/libbarrier.c +++ b/src/modules/barrier/libbarrier.c @@ -33,17 +33,17 @@ typedef struct { const char *id; int seq; -} ctx_t; +} libbarrier_ctx_t; static void freectx (void *arg) { - ctx_t *ctx = arg; + libbarrier_ctx_t *ctx = arg; free (ctx); } -static ctx_t *getctx (flux_t *h) +static libbarrier_ctx_t *getctx (flux_t *h) { - ctx_t *ctx = flux_aux_get (h, "flux::barrier_client"); + libbarrier_ctx_t *ctx = flux_aux_get (h, "flux::barrier_client"); if (!ctx) { const char *id = getenv ("FLUX_JOB_ID"); if (!id && !(id = getenv ("SLURM_STEPID"))) @@ -62,7 +62,7 @@ int flux_barrier (flux_t *h, const char *name, int nprocs) int ret = -1; if (!name) { - ctx_t *ctx = getctx (h); + libbarrier_ctx_t *ctx = getctx (h); if (!ctx) { errno = EINVAL; goto done; diff --git a/src/modules/connector-local/local.c b/src/modules/connector-local/local.c index 2c142c62c352..8622bfa45513 100644 --- a/src/modules/connector-local/local.c +++ b/src/modules/connector-local/local.c @@ -60,7 +60,7 @@ typedef struct { flux_reactor_t *reactor; uid_t session_owner; zhash_t *subscriptions; -} ctx_t; +} mod_local_ctx_t; typedef void (*unsubscribe_f)(void *handle, const char *topic); @@ -78,7 +78,7 @@ typedef struct { struct flux_msg_iobuf inbuf; struct flux_msg_iobuf outbuf; zlist_t *outqueue; /* queue of outbound flux_msg_t */ - ctx_t *ctx; + mod_local_ctx_t *ctx; zhash_t *disconnect_notify; zhash_t *subscriptions; zuuid_t *uuid; @@ -100,15 +100,15 @@ static void client_write_cb (flux_reactor_t *r, flux_watcher_t *w, static void freectx (void *arg) { - ctx_t *ctx = arg; + mod_local_ctx_t *ctx = arg; zlist_destroy (&ctx->clients); zhash_destroy (&ctx->subscriptions); free (ctx); } -static ctx_t *getctx (flux_t *h) +static mod_local_ctx_t *getctx (flux_t *h) { - ctx_t *ctx = (ctx_t *)flux_aux_get (h, "flux::local_connector"); + mod_local_ctx_t *ctx = (mod_local_ctx_t *)flux_aux_get (h, "flux::local_connector"); if (!ctx) { ctx = xzmalloc (sizeof (*ctx)); @@ -140,7 +140,7 @@ static int set_nonblock (int fd, bool nonblock) return 0; } -static client_t * client_create (ctx_t *ctx, int fd) +static client_t * client_create (mod_local_ctx_t *ctx, int fd) { client_t *c; socklen_t crlen = sizeof (c->ucred); @@ -250,7 +250,7 @@ static void subscription_destroy (void *data) free (sub); } -static int global_subscribe (ctx_t *ctx, const char *topic) +static int global_subscribe (mod_local_ctx_t *ctx, const char *topic) { subscription_t *sub; int rc = -1; @@ -275,7 +275,7 @@ static int global_subscribe (ctx_t *ctx, const char *topic) return rc; } -static int global_unsubscribe (ctx_t *ctx, const char *topic) +static int global_unsubscribe (mod_local_ctx_t *ctx, const char *topic) { subscription_t *sub; int rc = -1; @@ -585,7 +585,7 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, static void response_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + mod_local_ctx_t *ctx = arg; char *uuid = NULL; client_t *c; flux_msg_t *cpy = flux_msg_copy (msg, true); @@ -630,7 +630,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, static void event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + mod_local_ctx_t *ctx = arg; client_t *c; const char *topic; int count = 0; @@ -665,7 +665,7 @@ static void listener_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { int fd = flux_fd_watcher_get_fd (w); - ctx_t *ctx = arg; + mod_local_ctx_t *ctx = arg; flux_t *h = ctx->h; if (revents & FLUX_POLLIN) { @@ -690,7 +690,7 @@ static void listener_cb (flux_reactor_t *r, flux_watcher_t *w, return; } -static int listener_init (ctx_t *ctx, char *sockpath) +static int listener_init (mod_local_ctx_t *ctx, char *sockpath) { struct sockaddr_un addr; int fd; @@ -733,7 +733,7 @@ const int htablen = sizeof (htab) / sizeof (htab[0]); int mod_main (flux_t *h, int argc, char **argv) { - ctx_t *ctx = getctx (h); + mod_local_ctx_t *ctx = getctx (h); char sockpath[PATH_MAX + 1]; const char *local_uri = NULL; char *tmpdir; diff --git a/src/modules/content-sqlite/content-sqlite.c b/src/modules/content-sqlite/content-sqlite.c index dbca62e3d370..697b32200790 100644 --- a/src/modules/content-sqlite/content-sqlite.c +++ b/src/modules/content-sqlite/content-sqlite.c @@ -65,14 +65,14 @@ typedef struct { uint32_t blob_size_limit; size_t lzo_bufsize; void *lzo_buf; -} ctx_t; +} sqlite_ctx_t; #define HEAP_ALLOC(var,size) \ lzo_align_t __LZO_MMODEL var [ ((size) + (sizeof(lzo_align_t) - 1)) / sizeof(lzo_align_t) ] static HEAP_ALLOC(lzo_wrkmem, LZO1X_1_MEM_COMPRESS); -static void log_sqlite_error (ctx_t *ctx, const char *fmt, ...) +static void log_sqlite_error (sqlite_ctx_t *ctx, const char *fmt, ...) { va_list ap; va_start (ap, fmt); @@ -86,7 +86,7 @@ static void log_sqlite_error (ctx_t *ctx, const char *fmt, ...) free (s); } -static void set_errno_from_sqlite_error (ctx_t *ctx) +static void set_errno_from_sqlite_error (sqlite_ctx_t *ctx) { switch (sqlite3_errcode (ctx->db)) { case SQLITE_IOERR: /* os io error */ @@ -114,7 +114,7 @@ static void set_errno_from_sqlite_error (ctx_t *ctx) static void freectx (void *arg) { - ctx_t *ctx = arg; + sqlite_ctx_t *ctx = arg; if (ctx) { if (ctx->store_stmt) sqlite3_finalize (ctx->store_stmt); @@ -136,9 +136,9 @@ static void freectx (void *arg) } } -static ctx_t *getctx (flux_t *h) +static sqlite_ctx_t *getctx (flux_t *h) { - ctx_t *ctx = (ctx_t *)flux_aux_get (h, "flux::content-sqlite"); + sqlite_ctx_t *ctx = (sqlite_ctx_t *)flux_aux_get (h, "flux::content-sqlite"); const char *dir; const char *tmp; bool cleanup = false; @@ -228,7 +228,7 @@ static ctx_t *getctx (flux_t *h) return NULL; } -int grow_lzo_buf (ctx_t *ctx, size_t size) +int grow_lzo_buf (sqlite_ctx_t *ctx, size_t size) { size_t newsize = ctx->lzo_bufsize; void *newbuf; @@ -246,7 +246,7 @@ int grow_lzo_buf (ctx_t *ctx, size_t size) void load_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + sqlite_ctx_t *ctx = arg; char *blobref = "-"; int blobref_size; uint8_t hash[BLOBREF_MAX_DIGEST_SIZE]; @@ -322,7 +322,7 @@ void load_cb (flux_t *h, flux_msg_handler_t *w, void store_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + sqlite_ctx_t *ctx = arg; void *data; int size, hash_len; uint8_t hash[BLOBREF_MAX_DIGEST_SIZE]; @@ -417,7 +417,7 @@ int register_backing_store (flux_t *h, bool value, const char *name) void broker_shutdown_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + sqlite_ctx_t *ctx = arg; ctx->broker_shutdown = true; flux_log (h, LOG_DEBUG, "broker shutdown in progress"); } @@ -429,7 +429,7 @@ void broker_shutdown_cb (flux_t *h, flux_msg_handler_t *w, void shutdown_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + sqlite_ctx_t *ctx = arg; flux_rpc_t *rpc; int count = 0; @@ -513,7 +513,7 @@ static struct flux_msg_handler_spec htab[] = { int mod_main (flux_t *h, int argc, char **argv) { int lzo_rc = lzo_init (); - ctx_t *ctx = getctx (h); + sqlite_ctx_t *ctx = getctx (h); if (!ctx) return -1; if (lzo_rc != LZO_E_OK) { diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 57f21637be8d..a8d5fad42ed5 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -88,7 +88,7 @@ typedef struct { flux_watcher_t *check_w; int commit_merge; char *hash_name; -} ctx_t; +} kvs_ctx_t; typedef struct { json_object *ops; @@ -97,15 +97,15 @@ typedef struct { int nprocs; int count; int errnum; - ctx_t *ctx; + kvs_ctx_t *ctx; json_object *rootcpy; /* working copy of root dir */ int blocked:1; int rootcpy_stored:1; href_t newroot; } fence_t; -static int setroot_event_send (ctx_t *ctx, json_object *names); -static int error_event_send (ctx_t *ctx, json_object *names, int errnum); +static int setroot_event_send (kvs_ctx_t *ctx, json_object *names); +static int error_event_send (kvs_ctx_t *ctx, json_object *names, int errnum); void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg); void commit_check_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -113,7 +113,7 @@ void commit_check_cb (flux_reactor_t *r, flux_watcher_t *w, static void freectx (void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; if (ctx) { cache_destroy (ctx->cache); zhash_destroy (&ctx->fences); @@ -129,9 +129,9 @@ static void freectx (void *arg) } } -static ctx_t *getctx (flux_t *h) +static kvs_ctx_t *getctx (flux_t *h) { - ctx_t *ctx = (ctx_t *)flux_aux_get (h, "kvssrv"); + kvs_ctx_t *ctx = (kvs_ctx_t *)flux_aux_get (h, "kvssrv"); flux_reactor_t *r; if (!ctx) { @@ -174,7 +174,7 @@ static ctx_t *getctx (flux_t *h) static void content_load_completion (flux_rpc_t *rpc, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; json_object *o; const void *data; int size; @@ -205,7 +205,7 @@ static void content_load_completion (flux_rpc_t *rpc, void *arg) /* If now is true, perform the load rpc synchronously; * otherwise arrange for a continuation to handle the response. */ -static int content_load_request_send (ctx_t *ctx, const href_t ref, bool now) +static int content_load_request_send (kvs_ctx_t *ctx, const href_t ref, bool now) { flux_rpc_t *rpc = NULL; @@ -225,7 +225,7 @@ static int content_load_request_send (ctx_t *ctx, const href_t ref, bool now) return -1; } -static bool load (ctx_t *ctx, const href_t ref, wait_t *wait, json_object **op) +static bool load (kvs_ctx_t *ctx, const href_t ref, wait_t *wait, json_object **op) { struct cache_entry *hp = cache_lookup (ctx->cache, ref, ctx->epoch); bool stall = false; @@ -254,7 +254,7 @@ static bool load (ctx_t *ctx, const href_t ref, wait_t *wait, json_object **op) static int content_store_get (flux_rpc_t *rpc, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; struct cache_entry *hp; const char *blobref; int blobref_size; @@ -283,7 +283,7 @@ static void content_store_completion (flux_rpc_t *rpc, void *arg) (void)content_store_get (rpc, arg); } -static int content_store_request_send (ctx_t *ctx, const href_t ref, +static int content_store_request_send (kvs_ctx_t *ctx, const href_t ref, json_object *val, bool now) { flux_rpc_t *rpc; @@ -311,7 +311,7 @@ static int content_store_request_send (ctx_t *ctx, const href_t ref, * do it asynchronously and push wait onto cache object's wait queue. * FIXME: asynchronous errors need to be propagated back to caller. */ -static int store (ctx_t *ctx, json_object *o, href_t ref, wait_t *wait) +static int store (kvs_ctx_t *ctx, json_object *o, href_t ref, wait_t *wait) { struct cache_entry *hp; const char *s = json_object_to_json_string (o); @@ -351,7 +351,7 @@ static int store (ctx_t *ctx, json_object *o, href_t ref, wait_t *wait) return rc; } -static void setroot (ctx_t *ctx, const char *rootdir, int rootseq) +static void setroot (kvs_ctx_t *ctx, const char *rootdir, int rootseq) { if (rootseq == 0 || rootseq > ctx->rootseq) { memcpy (ctx->rootdir, rootdir, sizeof (href_t)); @@ -379,7 +379,7 @@ static json_object *copydir (json_object *dir) * Store (large) FILEVAL objects, converting them to FILEREFs. * Return false and enqueue wait_t on cache object(s) if any are dirty. */ -static int commit_unroll (ctx_t *ctx, json_object *dir, wait_t *wait) +static int commit_unroll (kvs_ctx_t *ctx, json_object *dir, wait_t *wait) { json_object_iter iter; json_object *subdir, *value; @@ -414,7 +414,7 @@ static int commit_unroll (ctx_t *ctx, json_object *dir, wait_t *wait) /* link (key, dirent) into directory 'dir'. */ -static int commit_link_dirent (ctx_t *ctx, json_object *rootdir, +static int commit_link_dirent (kvs_ctx_t *ctx, json_object *rootdir, const char *key, json_object *dirent, wait_t *wait) { @@ -492,7 +492,7 @@ static int commit_link_dirent (ctx_t *ctx, json_object *rootdir, */ static void commit_apply_fence (fence_t *f) { - ctx_t *ctx = f->ctx; + kvs_ctx_t *ctx = f->ctx; wait_t *wait = NULL; int count; @@ -592,7 +592,7 @@ static void commit_apply_fence (fence_t *f) void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; fence_t *f; if ((f = zlist_first (ctx->ready)) && !f->blocked) flux_watcher_start (ctx->idle_w); @@ -603,7 +603,7 @@ void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w, * The top commit can be appended to if it hasn't started, or is still * building the rootcpy, e.g. stalled walking the namespace. */ -void commit_merge_all (ctx_t *ctx) +void commit_merge_all (kvs_ctx_t *ctx) { fence_t *f = zlist_first (ctx->ready); @@ -637,7 +637,7 @@ void commit_merge_all (ctx_t *ctx) void commit_check_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; fence_t *f; flux_watcher_stop (ctx->idle_w); @@ -653,7 +653,7 @@ void commit_check_cb (flux_reactor_t *r, flux_watcher_t *w, static void dropcache_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; int size, expcount = 0; int rc = -1; @@ -671,7 +671,7 @@ static void dropcache_request_cb (flux_t *h, flux_msg_handler_t *w, static void dropcache_event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; int size, expcount = 0; if (flux_event_decode (msg, NULL, NULL) < 0) @@ -684,7 +684,7 @@ static void dropcache_event_cb (flux_t *h, flux_msg_handler_t *w, static void heartbeat_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; if (flux_heartbeat_decode (msg, &ctx->epoch) < 0) { flux_log_error (ctx->h, "%s", __FUNCTION__); @@ -703,7 +703,7 @@ static void heartbeat_cb (flux_t *h, flux_msg_handler_t *w, /* Get dirent containing requested key. */ -static bool walk (ctx_t *ctx, json_object *root, const char *path, +static bool walk (kvs_ctx_t *ctx, json_object *root, const char *path, json_object **direntp, wait_t *wait, int flags, int depth) { char *cpy = xstrdup (path); @@ -768,7 +768,7 @@ static bool walk (ctx_t *ctx, json_object *root, const char *path, return false; } -static bool lookup (ctx_t *ctx, json_object *root, wait_t *wait, +static bool lookup (kvs_ctx_t *ctx, json_object *root, wait_t *wait, int flags, const char *name, json_object **valp, int *ep) { @@ -865,7 +865,7 @@ static bool lookup (ctx_t *ctx, json_object *root, wait_t *wait, static void get_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; const char *json_str; json_object *in = NULL; json_object *out = NULL; @@ -938,7 +938,7 @@ static bool compare_json (json_object *o1, json_object *o2) static void watch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; const char *json_str; json_object *in = NULL; json_object *in2 = NULL; @@ -1050,7 +1050,7 @@ static bool unwatch_cmp (const flux_msg_t *msg, void *arg) static void unwatch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; const char *json_str; json_object *in = NULL; unwatch_param_t p = { NULL, NULL }; @@ -1096,7 +1096,7 @@ static void fence_destroy (fence_t *f) } } -static fence_t *fence_create (ctx_t *ctx, const char *name, int nprocs) +static fence_t *fence_create (kvs_ctx_t *ctx, const char *name, int nprocs) { fence_t *f; @@ -1145,7 +1145,7 @@ static int fence_append_request (fence_t *f, const flux_msg_t *request) return 0; } -static int fence_add (ctx_t *ctx, fence_t *f) +static int fence_add (kvs_ctx_t *ctx, fence_t *f) { const char *name; @@ -1163,12 +1163,12 @@ static int fence_add (ctx_t *ctx, fence_t *f) return -1; } -static fence_t *fence_lookup (ctx_t *ctx, const char *name) +static fence_t *fence_lookup (kvs_ctx_t *ctx, const char *name) { return zhash_lookup (ctx->fences, name); } -static void fence_finalize (ctx_t *ctx, fence_t *f, int errnum) +static void fence_finalize (kvs_ctx_t *ctx, fence_t *f, int errnum) { flux_msg_t *msg; const char *name; @@ -1182,7 +1182,7 @@ static void fence_finalize (ctx_t *ctx, fence_t *f, int errnum) zhash_delete (ctx->fences, name); } -static void fence_finalize_bynames (ctx_t *ctx, json_object *names, int errnum) +static void fence_finalize_bynames (kvs_ctx_t *ctx, json_object *names, int errnum) { int i, len; const char *name; @@ -1207,7 +1207,7 @@ static void fence_finalize_bynames (ctx_t *ctx, json_object *names, int errnum) static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; const char *json_str, *name; int nprocs; json_object *in = NULL; @@ -1259,7 +1259,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w, static void fence_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; const char *json_str, *name; int nprocs; json_object *in = NULL; @@ -1319,7 +1319,7 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *w, static void sync_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; const char *json_str; json_object *in = NULL; json_object *out = Jnew (); @@ -1355,7 +1355,7 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *w, static void getroot_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; json_object *out = NULL; int rc = -1; @@ -1371,7 +1371,7 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *w, Jput (out); } -static int getroot_rpc (ctx_t *ctx, int *rootseq, href_t rootdir) +static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, href_t rootdir) { flux_rpc_t *rpc; const char *json_str; @@ -1405,7 +1405,7 @@ static int getroot_rpc (ctx_t *ctx, int *rootseq, href_t rootdir) static void error_event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; const char *json_str; json_object *out = NULL; json_object *names = NULL; @@ -1429,7 +1429,7 @@ static void error_event_cb (flux_t *h, flux_msg_handler_t *w, Jput (out); } -static int error_event_send (ctx_t *ctx, json_object *names, int errnum) +static int error_event_send (kvs_ctx_t *ctx, json_object *names, int errnum) { json_object *in = NULL; flux_msg_t *msg = NULL; @@ -1453,7 +1453,7 @@ static int error_event_send (ctx_t *ctx, json_object *names, int errnum) static void setroot_event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; json_object *out = NULL; int rootseq; const char *json_str; @@ -1498,7 +1498,7 @@ static void setroot_event_cb (flux_t *h, flux_msg_handler_t *w, Jput (out); } -static int setroot_event_send (ctx_t *ctx, json_object *names) +static int setroot_event_send (kvs_ctx_t *ctx, json_object *names) { json_object *in = NULL; json_object *root = NULL; @@ -1538,7 +1538,7 @@ static bool disconnect_cmp (const flux_msg_t *msg, void *arg) static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; char *sender = NULL; if (flux_request_decode (msg, NULL, NULL) < 0) @@ -1567,7 +1567,7 @@ static void add_tstat (json_object *o, const char *name, tstat_t *ts, static void stats_get_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; json_object *o = Jnew (); tstat_t ts; int size, incomplete, dirty; @@ -1596,7 +1596,7 @@ static void stats_get_cb (flux_t *h, flux_msg_handler_t *w, static void stats_clear_event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; memset (&ctx->stats, 0, sizeof (ctx->stats)); } @@ -1604,7 +1604,7 @@ static void stats_clear_event_cb (flux_t *h, flux_msg_handler_t *w, static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + kvs_ctx_t *ctx = arg; memset (&ctx->stats, 0, sizeof (ctx->stats)); @@ -1632,7 +1632,7 @@ static struct flux_msg_handler_spec handlers[] = { FLUX_MSGHANDLER_TABLE_END, }; -static void process_args (ctx_t *ctx, int ac, char **av) +static void process_args (kvs_ctx_t *ctx, int ac, char **av) { int i; @@ -1646,7 +1646,7 @@ static void process_args (ctx_t *ctx, int ac, char **av) int mod_main (flux_t *h, int argc, char **argv) { - ctx_t *ctx = getctx (h); + kvs_ctx_t *ctx = getctx (h); if (!ctx) { flux_log_error (h, "error creating KVS context"); diff --git a/src/modules/live/live.c b/src/modules/live/live.c index e821801baa59..4bcca415e59d 100644 --- a/src/modules/live/live.c +++ b/src/modules/live/live.c @@ -127,20 +127,20 @@ typedef struct { ns_t *ns; /* master only */ json_object *topo; /* master only */ flux_t *h; -} ctx_t; +} live_ctx_t; static void parent_destroy (parent_t *p); static void child_destroy (child_t *c); -static int hello (ctx_t *ctx); -static void goodbye (ctx_t *ctx, int parent_rank); -static void manage_subscriptions (ctx_t *ctx); +static int hello (live_ctx_t *ctx); +static void goodbye (live_ctx_t *ctx, int parent_rank); +static void manage_subscriptions (live_ctx_t *ctx); static const int default_max_idle = 5; static const int default_slow_idle = 3; static const double reduce_timeout = 0.800; -static void ns_chg_one (ctx_t *ctx, uint32_t r, cstate_t from, cstate_t to); -static int ns_sync (ctx_t *ctx); +static void ns_chg_one (live_ctx_t *ctx, uint32_t r, cstate_t from, cstate_t to); +static int ns_sync (live_ctx_t *ctx); static void hello_destroy (void *arg); static void hello_forward (flux_reduce_t *r, int batchnum, void *arg); @@ -157,7 +157,7 @@ static struct flux_reduce_ops hello_ops = { static void freectx (void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; parent_t *p; if (ctx) { @@ -174,9 +174,9 @@ static void freectx (void *arg) } } -static ctx_t *getctx (flux_t *h) +static live_ctx_t *getctx (flux_t *h) { - ctx_t *ctx = flux_aux_get (h, "flux::live"); + live_ctx_t *ctx = flux_aux_get (h, "flux::live"); int n; if (!ctx) { @@ -272,7 +272,7 @@ static json_object *parent_tojson (parent_t *p) return o; } -static json_object *parents_tojson (ctx_t *ctx) +static json_object *parents_tojson (live_ctx_t *ctx) { parent_t *p; json_object *el; @@ -294,7 +294,7 @@ static json_object *parents_tojson (ctx_t *ctx) * zmq_connect(), as opposed to the parent which has a zmq_bind() URI * that could be a wildcard. */ -static void parents_fromjson (ctx_t *ctx, json_object *ar) +static void parents_fromjson (live_ctx_t *ctx, json_object *ar) { int i, len; json_object *el; @@ -317,7 +317,7 @@ static void parents_fromjson (ctx_t *ctx, json_object *ar) } } -static int reparent (ctx_t *ctx, int oldrank, parent_t *p) +static int reparent (live_ctx_t *ctx, int oldrank, parent_t *p) { int rc = -1; @@ -342,7 +342,7 @@ static int reparent (ctx_t *ctx, int oldrank, parent_t *p) /* Reparent to next alternate parent. */ -static int failover (ctx_t *ctx) +static int failover (live_ctx_t *ctx) { parent_t *p; int oldrank; @@ -362,7 +362,7 @@ static int failover (ctx_t *ctx) /* Reparent to original parent. */ -static int recover (ctx_t *ctx) +static int recover (live_ctx_t *ctx) { parent_t *p; int oldrank, maxrank = -1; /* max rank will be orig. parent */ @@ -387,7 +387,7 @@ static int recover (ctx_t *ctx) static void cstate_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; const char *json_str; json_object *event = NULL; int epoch, parent, rank; @@ -432,7 +432,7 @@ static void cstate_cb (flux_t *h, flux_msg_handler_t *w, Jput (event); } -static void cstate_change (ctx_t *ctx, child_t *c, cstate_t newstate) +static void cstate_change (live_ctx_t *ctx, child_t *c, cstate_t newstate) { json_object *event = Jnew (); flux_msg_t *msg; @@ -467,7 +467,7 @@ static void cstate_change (ctx_t *ctx, child_t *c, cstate_t newstate) static void hb_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; char *peers_str = NULL; json_object *peers = NULL; zlist_t *keys = NULL; @@ -524,7 +524,7 @@ static void hb_cb (flux_t *h, flux_msg_handler_t *w, free (peers_str); } -static void manage_subscriptions (ctx_t *ctx) +static void manage_subscriptions (live_ctx_t *ctx) { if (ctx->hb_subscribed && zhash_size (ctx->children) == 0) { if (flux_event_unsubscribe (ctx->h, "hb") < 0) @@ -543,7 +543,7 @@ static void manage_subscriptions (ctx_t *ctx) static int max_idle_cb (const char *key, int val, void *arg, int errnum) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; if (errnum != ENOENT && errnum != 0) return 0; @@ -555,7 +555,7 @@ static int max_idle_cb (const char *key, int val, void *arg, int errnum) static int slow_idle_cb (const char *key, int val, void *arg, int errnum) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; if (errnum != ENOENT && errnum != 0) return 0; @@ -570,7 +570,7 @@ static int slow_idle_cb (const char *key, int val, void *arg, int errnum) static void goodbye_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; const char *json_str; json_object *in = NULL; int n, rank, prank; @@ -599,7 +599,7 @@ static void goodbye_request_cb (flux_t *h, flux_msg_handler_t *w, Jput (in); } -static void goodbye (ctx_t *ctx, int parent_rank) +static void goodbye (live_ctx_t *ctx, int parent_rank) { json_object *in = Jnew (); flux_rpc_t *rpc; @@ -666,7 +666,7 @@ static ns_t *ns_fromjson (json_object *o) return ns; } -static int ns_tokvs (ctx_t *ctx) +static int ns_tokvs (live_ctx_t *ctx) { json_object *o = ns_tojson (ctx->ns); int rc = -1; @@ -681,7 +681,7 @@ static int ns_tokvs (ctx_t *ctx) return rc; } -static int ns_fromkvs (ctx_t *ctx) +static int ns_fromkvs (live_ctx_t *ctx) { char *json_str = NULL; json_object *o = NULL; @@ -702,7 +702,7 @@ static int ns_fromkvs (ctx_t *ctx) /* If ctx->ns is uninitialized, initialize it, using kvs data if any. * If ctx->ns is initialized, write it to kvs. */ -static int ns_sync (ctx_t *ctx) +static int ns_sync (live_ctx_t *ctx) { int rc = -1; bool writekvs = false; @@ -732,7 +732,7 @@ static int ns_sync (ctx_t *ctx) /* N.B. from=CS_UNKNOWN is treated as "from any other state". */ -static void ns_chg_one (ctx_t *ctx, uint32_t r, cstate_t from, cstate_t to) +static void ns_chg_one (live_ctx_t *ctx, uint32_t r, cstate_t from, cstate_t to) { if (from == CS_UNKNOWN) nodeset_delete_rank (ctx->ns->unknown, r); @@ -764,7 +764,7 @@ static void ns_chg_one (ctx_t *ctx, uint32_t r, cstate_t from, cstate_t to) * FIXME: should we generate a live.cstate event if state is * transitioning from CS_SLOW or CS_FAIL e.g. after reparenting? */ -static void ns_chg_hello (ctx_t *ctx, json_object *a) +static void ns_chg_hello (live_ctx_t *ctx, json_object *a) { json_object_iter iter; int i, len, crank; @@ -782,7 +782,7 @@ static void ns_chg_hello (ctx_t *ctx, json_object *a) * Topology in the kvs is a JSON array of arrays. * Topology in ctx->topo is a JSON hash of arrays, for ease of merging. */ -static int topo_fromkvs (ctx_t *ctx) +static int topo_fromkvs (live_ctx_t *ctx) { char *json_str = NULL; json_object *car; @@ -814,7 +814,7 @@ static int topo_fromkvs (ctx_t *ctx) return rc; } -static int topo_tokvs (ctx_t *ctx) +static int topo_tokvs (live_ctx_t *ctx) { json_object_iter iter; json_object *ar = Jnew_ar (); @@ -837,7 +837,7 @@ static int topo_tokvs (ctx_t *ctx) /* If ctx->topo is uninitialized, initialize it, using kvs data if any. * If ctx->topo is initialized, write it to kvs. */ -static int topo_sync (ctx_t *ctx) +static int topo_sync (live_ctx_t *ctx) { int rc = -1; bool writekvs = false; @@ -901,7 +901,7 @@ static void hello_destroy (void *arg) static void hello_forward (flux_reduce_t *r, int batchnum, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; flux_rpc_t *rpc; json_object *o; @@ -917,7 +917,7 @@ static void hello_forward (flux_reduce_t *r, int batchnum, void *arg) static void hello_sink (flux_reduce_t *r, int batchnum, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; json_object *o; while ((o = flux_reduce_pop (r))) { @@ -948,7 +948,7 @@ static void hello_reduce (flux_reduce_t *r, int batchnum, void *arg) /* Source: { "prank":[crank] } */ -static void hello_source (ctx_t *ctx, const char *prank, int crank) +static void hello_source (live_ctx_t *ctx, const char *prank, int crank) { json_object *a = Jnew (); json_object *c = Jnew_ar (); @@ -965,7 +965,7 @@ static void hello_source (ctx_t *ctx, const char *prank, int crank) static void push_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; const char *json_str; json_object *in = NULL; @@ -989,7 +989,7 @@ static void push_request_cb (flux_t *h, flux_msg_handler_t *w, static void hello_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; const char *json_str; json_object *in = NULL; json_object *out = NULL; @@ -1038,7 +1038,7 @@ static void hello_request_cb (flux_t *h, flux_msg_handler_t *w, /* Request: {"rank":N} * Response: {"parents":[...]} */ -static int hello (ctx_t *ctx) +static int hello (live_ctx_t *ctx) { const char *json_str; json_object *in = Jnew (); @@ -1075,7 +1075,7 @@ static int hello (ctx_t *ctx) static void failover_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; int rc = -1; if (flux_request_decode (msg, NULL, NULL) < 0) { @@ -1093,7 +1093,7 @@ static void failover_request_cb (flux_t *h, flux_msg_handler_t *w, static void recover_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; int rc = -1; if (flux_request_decode (msg, NULL, NULL) < 0) { @@ -1111,7 +1111,7 @@ static void recover_request_cb (flux_t *h, flux_msg_handler_t *w, static void recover_event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + live_ctx_t *ctx = arg; if (zlist_size (ctx->parents) > 0 && recover (ctx) < 0) { if (errno == EINVAL) @@ -1136,7 +1136,7 @@ static struct flux_msg_handler_spec htab[] = { int mod_main (flux_t *h, int argc, char **argv) { int rc = -1; - ctx_t *ctx = getctx (h); + live_ctx_t *ctx = getctx (h); int i, barrier_count = 0; const char *barrier_name = "live-init"; diff --git a/src/modules/resource-hwloc/resource.c b/src/modules/resource-hwloc/resource.c index d19dab293a68..a9d7c774686d 100644 --- a/src/modules/resource-hwloc/resource.c +++ b/src/modules/resource-hwloc/resource.c @@ -45,9 +45,9 @@ typedef struct hwloc_topology_t topology; bool loaded; bool walk_topology; -} ctx_t; +} resource_ctx_t; -static int ctx_hwloc_init (flux_t *h, ctx_t *ctx) +static int ctx_hwloc_init (flux_t *h, resource_ctx_t *ctx) { int ret = -1; char *key, *path = NULL; @@ -121,7 +121,7 @@ static int ctx_hwloc_init (flux_t *h, ctx_t *ctx) return ret; } -static void resource_hwloc_ctx_destroy (ctx_t *ctx) +static void resource_hwloc_ctx_destroy (resource_ctx_t *ctx) { if (ctx) { if (ctx->topology) @@ -130,9 +130,9 @@ static void resource_hwloc_ctx_destroy (ctx_t *ctx) } } -static ctx_t *resource_hwloc_ctx_create (flux_t *h) +static resource_ctx_t *resource_hwloc_ctx_create (flux_t *h) { - ctx_t *ctx = xzmalloc (sizeof(ctx_t)); + resource_ctx_t *ctx = xzmalloc (sizeof(resource_ctx_t)); if (flux_get_rank (h, &ctx->rank) < 0) { flux_log_error (h, "flux_get_rank"); goto error; @@ -147,7 +147,7 @@ static ctx_t *resource_hwloc_ctx_create (flux_t *h) return NULL; } -static int load_xml_to_kvs (flux_t *h, ctx_t *ctx) +static int load_xml_to_kvs (flux_t *h, resource_ctx_t *ctx) { char *xml_path = NULL; char *buffer = NULL; @@ -277,7 +277,7 @@ static int put_hostname (flux_t *h, const char *base, const char *hostname) return (rc); } -static int load_info_to_kvs (flux_t *h, ctx_t *ctx) +static int load_info_to_kvs (flux_t *h, resource_ctx_t *ctx) { char *base_path = NULL; int ret = -1, i; @@ -330,7 +330,7 @@ static int load_info_to_kvs (flux_t *h, ctx_t *ctx) return ret; } -static int load_hwloc (flux_t *h, ctx_t *ctx) +static int load_hwloc (flux_t *h, resource_ctx_t *ctx) { uint32_t size; char *completion_path = NULL; @@ -362,7 +362,7 @@ static int load_hwloc (flux_t *h, ctx_t *ctx) return rc; } -static int decode_reload_request (flux_t *h, ctx_t *ctx, +static int decode_reload_request (flux_t *h, resource_ctx_t *ctx, const flux_msg_t *msg) { const char *json_str; @@ -390,7 +390,7 @@ static void reload_request_cb (flux_t *h, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + resource_ctx_t *ctx = arg; int errnum = 0; if ((decode_reload_request (h, ctx, msg) < 0) @@ -406,7 +406,7 @@ static void topo_request_cb (flux_t *h, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = (ctx_t *)arg; + resource_ctx_t *ctx = (resource_ctx_t *)arg; kvsdir_t *kd = NULL; json_object *out = NULL; char *buffer; @@ -492,7 +492,7 @@ static void topo_request_cb (flux_t *h, Jput (out); } -static void process_args (flux_t *h, ctx_t *ctx, int argc, char **argv) +static void process_args (flux_t *h, resource_ctx_t *ctx, int argc, char **argv) { int i; for (i = 0; i < argc; i++) { @@ -511,7 +511,7 @@ static struct flux_msg_handler_spec htab[] = { int mod_main (flux_t *h, int argc, char **argv) { int rc = -1; - ctx_t *ctx; + resource_ctx_t *ctx; if (!(ctx = resource_hwloc_ctx_create (h))) goto done; diff --git a/t/kz/kzutil.c b/t/kz/kzutil.c index 38b27915e57c..9bf8c151f969 100644 --- a/t/kz/kzutil.c +++ b/t/kz/kzutil.c @@ -46,7 +46,7 @@ typedef struct { kz_t *kz[3]; int readers; int blocksize; -} ctx_t; +} t_kzutil_ctx_t; static void copy (flux_t *h, const char *src, const char *dst, int kzoutflags, int blocksize); @@ -203,7 +203,7 @@ static int write_all (int fd, char *buf, int len) static void attach_stdout_ready_cb (kz_t *kz, void *arg) { - ctx_t *ctx = arg; + t_kzutil_ctx_t *ctx = arg; char *data; int len; @@ -225,7 +225,7 @@ static void attach_stdout_ready_cb (kz_t *kz, void *arg) static void attach_stderr_ready_cb (kz_t *kz, void *arg) { - ctx_t *ctx = arg; + t_kzutil_ctx_t *ctx = arg; int len; char *data; @@ -249,7 +249,7 @@ static void attach_stdin_ready_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { int fd = flux_fd_watcher_get_fd (w); - ctx_t *ctx = arg; + t_kzutil_ctx_t *ctx = arg; char *buf = xzmalloc (ctx->blocksize); int len; @@ -272,7 +272,7 @@ static void attach_stdin_ready_cb (flux_reactor_t *r, flux_watcher_t *w, static void attach (flux_t *h, const char *key, bool rawtty, int kzoutflags, int blocksize) { - ctx_t *ctx = xzmalloc (sizeof (*ctx)); + t_kzutil_ctx_t *ctx = xzmalloc (sizeof (*ctx)); char *name; int fdin = dup (STDIN_FILENO); struct termios saved_tio; diff --git a/t/request/req.c b/t/request/req.c index 047645921b19..7581db5f079c 100644 --- a/t/request/req.c +++ b/t/request/req.c @@ -14,11 +14,11 @@ typedef struct { int ping_seq; zlist_t *clog_requests; uint32_t rank; -} ctx_t; +} t_req_ctx_t; static void freectx (void *arg) { - ctx_t *ctx = arg; + t_req_ctx_t *ctx = arg; flux_msg_t *msg; if (ctx) { @@ -32,10 +32,10 @@ static void freectx (void *arg) } } -static ctx_t *getctx (flux_t *h) +static t_req_ctx_t *getctx (flux_t *h) { int saved_errno; - ctx_t *ctx = (ctx_t *)flux_aux_get (h, "req"); + t_req_ctx_t *ctx = (t_req_ctx_t *)flux_aux_get (h, "req"); if (!ctx) { ctx = xzmalloc (sizeof (*ctx)); @@ -64,7 +64,7 @@ static ctx_t *getctx (flux_t *h) void count_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = getctx (h); + t_req_ctx_t *ctx = getctx (h); json_object *o = Jnew (); Jadd_int (o, "count", zlist_size (ctx->clog_requests)); @@ -78,7 +78,7 @@ void count_request_cb (flux_t *h, flux_msg_handler_t *w, void clog_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = getctx (h); + t_req_ctx_t *ctx = getctx (h); flux_msg_t *cpy = flux_msg_copy (msg, true); if (zlist_push (ctx->clog_requests, cpy) < 0) @@ -90,7 +90,7 @@ void clog_request_cb (flux_t *h, flux_msg_handler_t *w, void flush_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = getctx (h); + t_req_ctx_t *ctx = getctx (h); flux_msg_t *req; while ((req = zlist_pop (ctx->clog_requests))) { @@ -214,7 +214,7 @@ void echo_request_cb (flux_t *h, flux_msg_handler_t *w, void xping_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + t_req_ctx_t *ctx = arg; const char *json_str; int saved_errno; int rank, seq = ctx->ping_seq++; @@ -270,7 +270,7 @@ void xping_request_cb (flux_t *h, flux_msg_handler_t *w, void ping_response_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + t_req_ctx_t *ctx = arg; const char *json_str; json_object *o = NULL; json_object *out = Jnew ();; @@ -313,7 +313,7 @@ void ping_response_cb (flux_t *h, flux_msg_handler_t *w, void null_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - ctx_t *ctx = arg; + t_req_ctx_t *ctx = arg; const char *topic; int type, size; void *buf; @@ -388,7 +388,7 @@ struct flux_msg_handler_spec htab[] = { int mod_main (flux_t *h, int argc, char **argv) { int saved_errno; - ctx_t *ctx = getctx (h); + t_req_ctx_t *ctx = getctx (h); if (!ctx) { saved_errno = errno;