Skip to content

Commit

Permalink
Merge pull request #3107 from garlick/broker_cleanup
Browse files Browse the repository at this point in the history
broker: state machine refactoring
  • Loading branch information
mergify[bot] committed Aug 5, 2020
2 parents 63b1f16 + 55c9385 commit 762bf46
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 204 deletions.
124 changes: 45 additions & 79 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,14 @@ static int create_rundir (attr_t *attrs);
static int create_broker_rundir (struct overlay *ov, void *arg);
static int create_dummyattrs (flux_t *h, uint32_t rank, uint32_t size);

static int create_runat_phases (broker_ctx_t *ctx, uint32_t rank);
static int create_runat_phases (broker_ctx_t *ctx);

static int handle_event (broker_ctx_t *ctx, const flux_msg_t *msg);

static void init_attrs (attr_t *attrs, pid_t pid);

static const struct flux_handle_ops broker_handle_ops;

static int exit_rc = 1;

#define OPTIONS "+vs:X:k:H:g:S:c:"
static const struct option longopts[] = {
{"verbose", no_argument, 0, 'v'},
Expand Down Expand Up @@ -268,6 +266,8 @@ int main (int argc, char *argv[])
memset (&ctx, 0, sizeof (ctx));
log_init (argv[0]);

ctx.exit_rc = 1;

if (!(ctx.sigwatchers = zlist_new ()))
oom ();
if (!(ctx.modhash = modhash_create ()))
Expand All @@ -293,7 +293,6 @@ int main (int argc, char *argv[])
ctx.cred.rolemask = FLUX_ROLE_OWNER;
ctx.heartbeat_rate = 2;
ctx.sec_typemask = ZSECURITY_TYPE_CURVE;
ctx.state = STATE_NONE;

init_attrs (ctx.attrs, getpid ());

Expand Down Expand Up @@ -423,12 +422,11 @@ int main (int argc, char *argv[])
flux_log (ctx.h, LOG_INFO, "pmi: bootstrap time %.1fs", elapsed_sec);

}
uint32_t rank = overlay_get_rank (ctx.overlay);
uint32_t size = overlay_get_size (ctx.overlay);
char rank_str[16];
snprintf (rank_str, sizeof (rank_str), "%"PRIu32, rank);
ctx.rank = overlay_get_rank (ctx.overlay);
ctx.size = overlay_get_size (ctx.overlay);
snprintf (ctx.uuid, sizeof (ctx.uuid), "%"PRIu32, ctx.rank);

assert (size > 0);
assert (ctx.size > 0);

/* Must be called after overlay setup */
if (overlay_register_attrs (ctx.overlay, ctx.attrs) < 0) {
Expand All @@ -437,19 +435,19 @@ int main (int argc, char *argv[])
}

if (ctx.verbose)
log_msg ("boot: rank=%d size=%d", rank, size);
log_msg ("boot: rank=%d size=%d", ctx.rank, ctx.size);

// Setup profiling
setup_profiling (argv[0], rank);
setup_profiling (argv[0], ctx.rank);

/* Initialize logging.
* OK to call flux_log*() after this.
*/
logbuf_initialize (ctx.h, rank, ctx.attrs);
logbuf_initialize (ctx.h, ctx.rank, ctx.attrs);

/* Allow flux_get_rank() and flux_get_size() to work in the broker.
*/
if (create_dummyattrs (ctx.h, rank, size) < 0) {
if (create_dummyattrs (ctx.h, ctx.rank, ctx.size) < 0) {
log_err ("creating dummy attributes");
goto cleanup;
}
Expand All @@ -472,9 +470,9 @@ int main (int argc, char *argv[])
log_msg ("child: %s", child ? child : "none");
}

set_proctitle (rank);
set_proctitle (ctx.rank);

if (create_runat_phases (&ctx, rank) < 0)
if (create_runat_phases (&ctx) < 0)
goto cleanup;

/* If Flux was launched by Flux, now that PMI bootstrap and runat
Expand All @@ -487,7 +485,7 @@ int main (int argc, char *argv[])

/* Wire up the overlay.
*/
if (rank > 0) {
if (ctx.rank > 0) {
if (ctx.verbose)
log_msg ("initializing overlay connect");
if (overlay_connect (ctx.overlay) < 0) {
Expand All @@ -498,7 +496,7 @@ int main (int argc, char *argv[])

if (!(ctx.shutdown = shutdown_create (ctx.h,
ctx.shutdown_grace,
size,
ctx.size,
ctx.tbon_k,
ctx.overlay))) {
log_err ("shutdown_create");
Expand All @@ -516,11 +514,11 @@ int main (int argc, char *argv[])
log_err ("heaptrace_initialize");
goto cleanup;
}
if (exec_initialize (ctx.h, rank, ctx.attrs) < 0) {
if (exec_initialize (ctx.h, ctx.rank, ctx.attrs) < 0) {
log_err ("exec_initialize");
goto cleanup;
}
if (ping_initialize (ctx.h, "cmb", rank_str) < 0) {
if (ping_initialize (ctx.h, "cmb", ctx.uuid) < 0) {
log_err ("ping_initialize");
goto cleanup;
}
Expand All @@ -538,7 +536,7 @@ int main (int argc, char *argv[])
*/
if (ctx.verbose)
log_msg ("initializing modules");
modhash_set_rank (ctx.modhash, rank);
modhash_set_rank (ctx.modhash, ctx.rank);
modhash_set_flux (ctx.modhash, ctx.h);
modhash_set_heartbeat (ctx.modhash, ctx.heartbeat);

Expand All @@ -557,10 +555,17 @@ int main (int argc, char *argv[])
log_err ("heartbeat_start");
goto cleanup;
}
if (rank == 0 && ctx.verbose)
if (ctx.rank == 0 && ctx.verbose)
log_msg ("installing session heartbeat: T=%0.1fs",
heartbeat_get_rate (ctx.heartbeat));

/* Configure broker state machine
*/
if (!(ctx.state_machine = state_machine_create (&ctx))) {
log_err ("error creating broker state machine");
goto cleanup;
}

/* Send hello message to parent.
* N.B. uses tbon topology attributes set above.
* hello_cb() tracks progress on rank 0.
Expand Down Expand Up @@ -590,7 +595,7 @@ int main (int argc, char *argv[])
if (ctx.verbose)
log_msg ("entering event loop");
/* Once we enter the reactor, default exit_rc is now 0 */
exit_rc = 0;
ctx.exit_rc = 0;
if (flux_reactor_run (ctx.reactor, 0) < 0)
log_err ("flux_reactor_run");
if (ctx.verbose)
Expand Down Expand Up @@ -626,6 +631,7 @@ int main (int argc, char *argv[])

modhash_destroy (ctx.modhash);
zlist_destroy (&ctx.sigwatchers);
state_machine_destroy (ctx.state_machine);
overlay_destroy (ctx.overlay);
heartbeat_destroy (ctx.heartbeat);
service_switch_destroy (ctx.services);
Expand All @@ -640,7 +646,7 @@ int main (int argc, char *argv[])
zlist_destroy (&ctx.subscriptions);
free (ctx.init_shell_cmd);

return exit_rc;
return ctx.exit_rc;
}

struct attrmap {
Expand Down Expand Up @@ -742,7 +748,7 @@ static void hello_cb (struct hello *hello, void *arg)

if (hello_complete (hello)) {
overlay_set_idle_warning (ctx->overlay, 3);
state_machine (ctx, "wireup-complete");
state_machine_post (ctx->state_machine, "wireup-complete");
}

free (s);
Expand All @@ -758,8 +764,7 @@ static void shutdown_cb (struct shutdown *s, void *arg)
broker_ctx_t *ctx = arg;

if (shutdown_is_expired (s)) {
log_msg ("shutdown timer expired on rank %"PRIu32,
overlay_get_rank (ctx->overlay));
log_msg ("shutdown timer expired on rank %"PRIu32, ctx->rank);
_exit (1);
}
if (!shutdown_is_complete (s)) {
Expand All @@ -780,36 +785,6 @@ static void set_proctitle (uint32_t rank)
(void)prctl (PR_SET_NAME, proctitle, 0, 0, 0);
}

static void runat_completion_cb (struct runat *r, const char *name, void *arg)
{
broker_ctx_t *ctx = arg;
int rc = 1;

if (runat_get_exit_code (r, name, &rc) < 0)
log_err ("runat_get_exit_code %s", name);

if (!strcmp (name, "rc1")) {
if (rc != 0)
exit_rc = rc;
state_machine (ctx, rc == 0 ? "rc1-success" : "rc1-fail");
}
else if (!strcmp (name, "rc2")) {
if (rc != 0)
exit_rc = rc;
state_machine (ctx, rc == 0 ? "rc2-success" : "rc2-fail");
}
else if (!strcmp (name, "cleanup")) {
if (rc != 0)
exit_rc = rc;
state_machine (ctx, rc == 0 ? "cleanup-success" : "cleanup-fail");
}
else if (!strcmp (name, "rc3")) {
if (rc != 0)
exit_rc = rc;
state_machine (ctx, rc == 0 ? "rc3-success" : "rc3-fail");
}
}

static int create_runat_rc2 (struct runat *r, const char *argz, size_t argz_len)
{
if (argz == NULL) { // run interactive shell
Expand All @@ -827,9 +802,9 @@ static int create_runat_rc2 (struct runat *r, const char *argz, size_t argz_len)
return 0;
}

static int create_runat_phases (broker_ctx_t *ctx, uint32_t rank)
static int create_runat_phases (broker_ctx_t *ctx)
{
if (rank == 0) {
if (ctx->rank == 0) {
const char *rc1, *rc3, *local_uri;
bool rc2_none = false;

Expand All @@ -848,10 +823,7 @@ static int create_runat_phases (broker_ctx_t *ctx, uint32_t rank)
if (attr_get (ctx->attrs, "broker.rc2_none", NULL, NULL) == 0)
rc2_none = true;

if (!(ctx->runat = runat_create (ctx->h,
local_uri,
runat_completion_cb,
ctx))) {
if (!(ctx->runat = runat_create (ctx->h, local_uri))) {
log_err ("runat_create");
return -1;
}
Expand Down Expand Up @@ -1525,7 +1497,7 @@ static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx)
flux_msg_handler_t **handlers;
struct internal_service *svc;
for (svc = &services[0]; svc->name != NULL; svc++) {
if (!nodeset_member (svc->nodeset, overlay_get_rank (ctx->overlay)))
if (!nodeset_member (svc->nodeset, ctx->rank))
continue;
if (service_add (ctx->services, svc->name, NULL,
route_to_handle, ctx) < 0) {
Expand Down Expand Up @@ -1844,8 +1816,8 @@ static void signal_cb (flux_reactor_t *r, flux_watcher_t *w,
int signum = flux_signal_watcher_get_signum (w);

flux_log (ctx->h, LOG_INFO, "signal %d", signum);
if (overlay_get_rank (ctx->overlay) == 0)
state_abort (ctx);
if (ctx->rank == 0)
state_machine_kill (ctx->state_machine, signum);
}

/* Send a request message down the TBON.
Expand All @@ -1860,12 +1832,10 @@ static int sendmsg_child_request (broker_ctx_t *ctx,
uint32_t nodeid)
{
flux_msg_t *cpy = flux_msg_copy (msg, true);
int saved_errno;
char uuid[16];
int rc = -1;

snprintf (uuid, sizeof (uuid), "%"PRIu32, overlay_get_rank (ctx->overlay));
if (flux_msg_push_route (cpy, uuid) < 0)
if (flux_msg_push_route (cpy, ctx->uuid) < 0)
goto done;
snprintf (uuid, sizeof (uuid), "%"PRIu32, nodeid);
if (flux_msg_push_route (cpy, uuid) < 0)
Expand All @@ -1874,9 +1844,7 @@ static int sendmsg_child_request (broker_ctx_t *ctx,
goto done;
rc = 0;
done:
saved_errno = errno;
flux_msg_destroy (cpy);
errno = saved_errno;
return rc;
}

Expand All @@ -1886,8 +1854,6 @@ static int sendmsg_child_request (broker_ctx_t *ctx,
static int broker_request_sendmsg_internal (broker_ctx_t *ctx,
const flux_msg_t *msg)
{
uint32_t rank = overlay_get_rank (ctx->overlay);
uint32_t size = overlay_get_size (ctx->overlay);
uint32_t nodeid;
uint8_t flags;

Expand All @@ -1897,14 +1863,14 @@ static int broker_request_sendmsg_internal (broker_ctx_t *ctx,
return -1;
/* Route up TBON if destination if upstream of this broker.
*/
if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid == rank) {
if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid == ctx->rank) {
if (overlay_sendmsg_parent (ctx->overlay, msg) < 0)
return -1;
}
/* Deliver to local service if destination *could* be this broker.
* If there is no such service locally (ENOSYS), route up TBON.
*/
else if (((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid != rank)
else if (((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid != ctx->rank)
|| nodeid == FLUX_NODEID_ANY) {
if (service_send (ctx->services, msg) < 0) {
if (errno != ENOSYS)
Expand All @@ -1918,15 +1884,15 @@ static int broker_request_sendmsg_internal (broker_ctx_t *ctx,
}
/* Deliver to local service if this broker is the addressed rank.
*/
else if (nodeid == rank) {
else if (nodeid == ctx->rank) {
if (service_send (ctx->services, msg) < 0)
return -1;
}
/* Send the request up or down TBON as addressed.
*/
else {
uint32_t down_rank;
down_rank = kary_child_route (ctx->tbon_k, size, rank, nodeid);
down_rank = kary_child_route (ctx->tbon_k, ctx->size, ctx->rank, nodeid);
if (down_rank == KARY_NONE) { // up
if (overlay_sendmsg_parent (ctx->overlay, msg) < 0)
return -1;
Expand Down Expand Up @@ -1990,7 +1956,7 @@ static bool uuid_to_rank (const char *s, uint32_t size, uint32_t *rank)
*/
static bool is_my_parent (broker_ctx_t *ctx, uint32_t rank)
{
if (kary_parentof (ctx->tbon_k, overlay_get_rank (ctx->overlay)) == rank)
if (kary_parentof (ctx->tbon_k, ctx->rank) == rank)
return true;
return false;
}
Expand All @@ -2012,7 +1978,7 @@ static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
if (flux_requeue (ctx->h, msg, FLUX_RQ_TAIL) < 0)
goto done;
}
else if (uuid_to_rank (uuid, overlay_get_size (ctx->overlay), &rank)) {
else if (uuid_to_rank (uuid, ctx->size, &rank)) {
if (is_my_parent (ctx, rank)) {
/* N.B. this message is going from DEALER socket to ROUTER socket.
* Instead of popping a route off the stack, ROUTER pushes one
Expand Down Expand Up @@ -2046,7 +2012,7 @@ static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
static int broker_event_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
{

if (overlay_get_rank (ctx->overlay) > 0) {
if (ctx->rank > 0) {
flux_msg_t *cpy;
if (!(cpy = flux_msg_copy (msg, true)))
return -1;
Expand Down
7 changes: 6 additions & 1 deletion src/broker/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ struct broker {
flux_reactor_t *reactor;

struct overlay *overlay;
uint32_t rank;
uint32_t size;
char uuid[16];

struct broker_attr *attrs;
struct flux_msg_cred cred; /* instance owner */
Expand All @@ -40,10 +43,12 @@ struct broker {

struct hello *hello;
struct runat *runat;
broker_state_t state;
struct state_machine *state_machine;

char *init_shell_cmd;
size_t init_shell_cmd_len;

int exit_rc;
};

typedef struct broker broker_ctx_t;
Expand Down

0 comments on commit 762bf46

Please sign in to comment.