Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve reliability of module unloading #1017

Merged
merged 9 commits into from Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion etc/rc3
Expand Up @@ -21,7 +21,7 @@ flux module remove -r 0 userdb
flux module remove -r 0 cron
flux module remove -r all job
flux module remove -r all resource-hwloc

flux module remove -r all aggregator
flux module remove -r all kvs
flux module remove -r all barrier

Expand Down
226 changes: 113 additions & 113 deletions src/broker/broker.c
Expand Up @@ -84,8 +84,6 @@
#define zsocket_set_immediate zsocket_set_delay_attach_on_connect
#endif

const char *default_modules = "connector-local";

typedef enum {
ERROR_MODE_RESPOND,
ERROR_MODE_RETURN,
Expand Down Expand Up @@ -178,7 +176,11 @@ static void broker_unhandle_signals (zlist_t *sigwatchers);

static void broker_add_services (broker_ctx_t *ctx);

static void load_modules (broker_ctx_t *ctx, const char *default_modules);
static int load_module_byname (broker_ctx_t *ctx, const char *name,
const char *argz, size_t argz_len,
const flux_msg_t *request);
static int unload_module_byname (broker_ctx_t *ctx, const char *name,
const flux_msg_t *request, bool async);

static void update_proctitle (broker_ctx_t *ctx);
static void runlevel_cb (runlevel_t *r, int level, int rc, double elapsed,
Expand All @@ -195,8 +197,6 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec);
static int attr_get_snoop (const char *name, const char **val, void *arg);
static int attr_get_overlay (const char *name, const char **val, void *arg);

static void sigalrm_cb (int signum);

static void init_attrs (broker_ctx_t *ctx);

static const struct flux_handle_ops broker_handle_ops;
Expand Down Expand Up @@ -556,13 +556,13 @@ int main (int argc, char *argv[])
*/
if (ctx.shutdown_grace == 0) {
if (ctx.size < 16)
ctx.shutdown_grace = 0.5;
else if (ctx.size < 128)
ctx.shutdown_grace = 1;
else if (ctx.size < 1024)
else if (ctx.size < 128)
ctx.shutdown_grace = 2;
else if (ctx.size < 1024)
ctx.shutdown_grace = 4;
else
ctx.shutdown_grace = 5;
ctx.shutdown_grace = 10;
}

if (ctx.verbose) {
Expand Down Expand Up @@ -646,15 +646,22 @@ int main (int argc, char *argv[])

broker_add_services (&ctx);

/* Load default modules
/* Initialize comms module infrastructure.
*/
if (ctx.verbose)
log_msg ("loading default modules");
log_msg ("initializing modules");
modhash_set_zctx (ctx.modhash, ctx.zctx);
modhash_set_rank (ctx.modhash, ctx.rank);
modhash_set_flux (ctx.modhash, ctx.h);
modhash_set_heartbeat (ctx.modhash, ctx.heartbeat);
load_modules (&ctx, default_modules);
/* Load the local connector module.
* Other modules will be loaded in rc1 using flux module,
* which uses the local connector.
*/
if (ctx.verbose)
log_msg ("loading connector-local");
if (load_module_byname (&ctx, "connector-local", NULL, 0, NULL) < 0)
log_err_exit ("load_module connector-local");

/* install heartbeat (including timer on rank 0)
*/
Expand Down Expand Up @@ -693,15 +700,6 @@ int main (int argc, char *argv[])
log_err_exit ("sigaction");
if (sigaction (SIGTERM, &old_sigact_term, NULL) < 0)
log_err_exit ("sigaction");
/* Install SIGALRM handler and set timer in case we get stuck.
*/
struct sigaction sigact_alrm = {
.sa_handler = sigalrm_cb,
.sa_flags = 0,
};
if (sigaction (SIGALRM, &sigact_alrm, NULL) < 0)
log_err_exit ("sigaction");
alarm (1); // 1s to tear down

/* remove heartbeat timer, if any
*/
Expand All @@ -710,8 +708,11 @@ int main (int argc, char *argv[])
/* Unload modules.
*/
if (ctx.verbose)
log_msg ("unloading modules");
module_stop_all (ctx.modhash);
log_msg ("unloading connector-local");
if (unload_module_byname (&ctx, "connector-local", NULL, false) < 0)
log_err ("unload connector-local");
if (ctx.verbose)
log_msg ("finalizing modules");
modhash_destroy (ctx.modhash);

/* Unregister builtin services
Expand Down Expand Up @@ -824,13 +825,6 @@ static void hello_update_cb (hello_t *hello, void *arg)
}
}

/* Signal handler teardown timeout.
*/
static void sigalrm_cb (int signum)
{
exit (exit_rc);
}

/* Currently 'expired' is always true.
*/
static void shutdown_cb (shutdown_t *s, bool expired, void *arg)
Expand Down Expand Up @@ -1321,53 +1315,97 @@ static int mod_svc_cb (const flux_msg_t *msg, void *arg)
return rc;
}

/* Load command line/default comms modules. If module name contains
* one or more '/' characters, it refers to a .so path.
*/
static void load_modules (broker_ctx_t *ctx, const char *default_modules)
static int load_module_bypath (broker_ctx_t *ctx, const char *path,
const char *argz, size_t argz_len,
const flux_msg_t *request)
{
module_t *p = NULL;
char *name, *arg;

if (!(name = flux_modname (path))) {
errno = ENOENT;
goto error;
}
if (!(p = module_add (ctx->modhash, path)))
goto error;
if (!svc_add (ctx->services, module_get_name (p),
module_get_service (p), mod_svc_cb, p)) {
errno = EEXIST;
goto error;
}
arg = argz_next (argz, argz_len, NULL);
while (arg) {
module_add_arg (p, arg);
arg = argz_next (argz, argz_len, arg);
}
module_set_poller_cb (p, module_cb, ctx);
module_set_status_cb (p, module_status_cb, ctx);
if (request && module_push_insmod (p, request) < 0) // response deferred
goto error;
if (module_start (p) < 0)
goto error;
flux_log (ctx->h, LOG_DEBUG, "insmod %s", name);
free (name);
return 0;
error:
if (p)
module_remove (ctx->modhash, p);
free (name);
return -1;
}

static int load_module_byname (broker_ctx_t *ctx, const char *name,
const char *argz, size_t argz_len,
const flux_msg_t *request)
{
char *cpy = xstrdup (default_modules);
char *s, *saveptr = NULL, *a1 = cpy;
const char *modpath;
char *path;

if (attr_get (ctx->attrs, "conf.module_path", &modpath, NULL) < 0) {
log_msg ("conf.module_path is not set");
return -1;
}
if (!(path = flux_modfind (modpath, name))) {
log_msg ("%s: not found in module search path", name);
return -1;
}
if (load_module_bypath (ctx, path, argz, argz_len, request) < 0) {
free (path);
return -1;
}
free (path);
return 0;
}

/* If 'async' is true, service de-registration and module
* destruction (including join) are deferred until module keepalive
* status indicates module main() has exited (via module_status_cb).
* This allows modules with distributed shutdown to talk to each
* other while they shut down, and also does not block the reactor
* from handling other events. If 'async' is false, do all that
* teardown synchronously here.
*/
static int unload_module_byname (broker_ctx_t *ctx, const char *name,
const flux_msg_t *request, bool async)
{
module_t *p;

if (attr_get (ctx->attrs, "conf.module_path", &modpath, NULL) < 0)
log_err_exit ("conf.module_path is not set");

while ((s = strtok_r (a1, ",", &saveptr))) {
char *name = NULL;
char *path = NULL;
char *sp;
if ((sp = strchr (s, '['))) {
if (!nodeset_member (sp, ctx->rank))
goto next;
*sp = '\0';
}
if (strchr (s, '/')) {
if (!(name = flux_modname (s)))
log_msg_exit ("%s", dlerror ());
path = s;
} else {
if (!(path = flux_modfind (modpath, s)))
log_msg_exit ("%s: not found in module search path", s);
name = s;
}
if (!(p = module_add (ctx->modhash, path)))
log_err_exit ("%s: module_add %s", name, path);
if (!svc_add (ctx->services, module_get_name (p),
module_get_service (p), mod_svc_cb, p))
log_msg_exit ("could not register service %s", module_get_name (p));
module_set_poller_cb (p, module_cb, ctx);
module_set_status_cb (p, module_status_cb, ctx);
next:
if (name != s)
free (name);
if (path != s)
free (path);
a1 = NULL;
}
module_start_all (ctx->modhash);
free (cpy);
if (!(p = module_lookup_byname (ctx->modhash, name))) {
errno = ENOENT;
return -1;
}
if (module_stop (p) < 0)
return -1;
if (async) {
if (request && module_push_rmmod (p, request) < 0)
return -1;
} else {
assert (request == NULL);
svc_remove (ctx->services, module_get_name (p));
module_remove (ctx->modhash, p);
}
flux_log (ctx->h, LOG_DEBUG, "rmmod %s", name);
return 0;
}

static void broker_handle_signals (broker_ctx_t *ctx, zlist_t *sigwatchers)
Expand Down Expand Up @@ -1434,7 +1472,6 @@ static void cmb_rmmod_cb (flux_t *h, flux_msg_handler_t *w,
broker_ctx_t *ctx = arg;
const char *json_str;
char *name = NULL;
module_t *p;

if (flux_request_decode (msg, NULL, &json_str) < 0)
goto error;
Expand All @@ -1444,18 +1481,8 @@ static void cmb_rmmod_cb (flux_t *h, flux_msg_handler_t *w,
}
if (flux_rmmod_json_decode (json_str, &name) < 0)
goto error;
if (!(p = module_lookup_byname (ctx->modhash, name))) {
errno = ENOENT;
goto error;
}
/* N.B. can't remove 'service' entry here as distributed
* module shutdown may require inter-rank module communication.
*/
if (module_stop (p) < 0)
goto error;
if (module_push_rmmod (p, msg) < 0) // response deferred
if (unload_module_byname (ctx, name, msg, true) < 0)
goto error;
flux_log (h, LOG_DEBUG, "rmmod %s", name);
free (name);
return;
error:
Expand All @@ -1469,11 +1496,9 @@ static void cmb_insmod_cb (flux_t *h, flux_msg_handler_t *w,
{
broker_ctx_t *ctx = arg;
const char *json_str;
char *name = NULL;
char *path = NULL;
char *argz = NULL;
size_t argz_len = 0;
module_t *p = NULL;

if (flux_request_decode (msg, NULL, &json_str) < 0)
goto error;
Expand All @@ -1483,39 +1508,14 @@ static void cmb_insmod_cb (flux_t *h, flux_msg_handler_t *w,
}
if (flux_insmod_json_decode (json_str, &path, &argz, &argz_len) < 0)
goto error;
if (!(name = flux_modname (path))) {
errno = ENOENT;
goto error;
}
if (!(p = module_add (ctx->modhash, path)))
goto error;
if (!svc_add (ctx->services, module_get_name (p),
module_get_service (p), mod_svc_cb, p)) {
errno = EEXIST;
if (load_module_bypath (ctx, path, argz, argz_len, msg) < 0)
goto error;
}
arg = argz_next (argz, argz_len, NULL);
while (arg) {
module_add_arg (p, arg);
arg = argz_next (argz, argz_len, arg);
}
module_set_poller_cb (p, module_cb, ctx);
module_set_status_cb (p, module_status_cb, ctx);
if (module_push_insmod (p, msg) < 0) // response deferred
goto error;
if (module_start (p) < 0)
goto error;
flux_log (h, LOG_DEBUG, "insmod %s", name);
free (name);
free (path);
free (argz);
return;
error:
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
if (p)
module_remove (ctx->modhash, p);
free (name);
free (path);
free (argz);
}
Expand Down