Skip to content

Commit

Permalink
broker: Move rank and size out of broker_ctx_t
Browse files Browse the repository at this point in the history
Relocate rank and size from broker_ctx_t into overlay_t.
This keeps the information about the size of the overlay network
and our rank in the overlay network closer to the thing that
would know about and manage the overlay network.

Eventually more will be abstracted away into the overlay portion
of the code, which will make it easier to offer alternate methods
of gathering information to feed to the overlay initialization
code.
  • Loading branch information
morrone committed Oct 13, 2017
1 parent 5914c74 commit 0e47ed5
Showing 1 changed file with 43 additions and 46 deletions.
89 changes: 43 additions & 46 deletions src/broker/broker.c
Expand Up @@ -113,8 +113,6 @@ typedef struct {

/* Session parameters
*/
uint32_t size; /* session size */
uint32_t rank; /* our rank in session */
attr_t *attrs;
uint32_t userid; /* instance owner */
uint32_t rolemask;
Expand Down Expand Up @@ -190,7 +188,6 @@ static int create_dummyattrs (flux_t *h, uint32_t rank, uint32_t size);
static char *calc_endpoint (attr_t *attrs, const char *endpoint);

static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
uint32_t *rank_out, uint32_t *size_out,
double *elapsed_sec);

static int attr_get_overlay (const char *name, const char **val, void *arg);
Expand Down Expand Up @@ -334,7 +331,6 @@ int main (int argc, char *argv[])
if (!(sigwatchers = zlist_new ()))
oom ();

ctx.rank = FLUX_NODEID_ANY;
ctx.modhash = modhash_create ();
if (!(ctx.services = service_switch_create ()))
log_err_exit ("service_switch_create");
Expand Down Expand Up @@ -439,29 +435,29 @@ int main (int argc, char *argv[])
/* Boot with PMI.
*/
double pmi_elapsed_sec;
if (boot_pmi (ctx.overlay, ctx.attrs, ctx.tbon.k,
&ctx.rank, &ctx.size, &pmi_elapsed_sec) < 0)
if (boot_pmi (ctx.overlay, ctx.attrs, ctx.tbon.k, &pmi_elapsed_sec) < 0)
log_msg_exit ("bootstrap failed");
uint32_t rank = overlay_get_rank(ctx.overlay);
uint32_t size = overlay_get_size(ctx.overlay);

assert (ctx.rank != FLUX_NODEID_ANY);
assert (ctx.size > 0);
assert (size > 0);
assert (attr_get (ctx.attrs, "session-id", NULL, NULL) == 0);

ctx.tbon.level = kary_levelof (ctx.tbon.k, ctx.rank);
ctx.tbon.maxlevel = kary_levelof (ctx.tbon.k, ctx.size - 1);
ctx.tbon.descendants = kary_sum_descendants (ctx.tbon.k, ctx.size, ctx.rank);
ctx.tbon.level = kary_levelof (ctx.tbon.k, rank);
ctx.tbon.maxlevel = kary_levelof (ctx.tbon.k, size - 1);
ctx.tbon.descendants = kary_sum_descendants (ctx.tbon.k, size, rank);

if (ctx.verbose) {
const char *sid = "unknown";
(void)attr_get (ctx.attrs, "session-id", &sid, NULL);
log_msg ("boot: rank=%d size=%d session-id=%s", ctx.rank, ctx.size, sid);
log_msg ("boot: rank=%d size=%d session-id=%s", rank, size, sid);
}

if (attr_set_flags (ctx.attrs, "session-id", FLUX_ATTRFLAG_IMMUTABLE) < 0)
log_err_exit ("attr_set_flags session-id");

// Setup profiling
setup_profiling (argv[0], ctx.rank);
setup_profiling (argv[0], rank);

/* Create/validate runtime directory (this function is idempotent)
*/
Expand All @@ -470,20 +466,20 @@ int main (int argc, char *argv[])
/* If persist-filesystem or persist-directory are set, initialize those,
* but only on rank 0.
*/
if (create_persistdir (ctx.attrs, ctx.rank) < 0)
if (create_persistdir (ctx.attrs, rank) < 0)
log_err_exit ("create_persistdir");

/* Initialize logging.
* OK to call flux_log*() after this.
*/
logbuf_initialize (ctx.h, ctx.rank, ctx.attrs);
logbuf_initialize (ctx.h, rank, ctx.attrs);

/* Allow flux_get_rank() and flux_get_size() to work in the broker.
*/
if (create_dummyattrs (ctx.h, ctx.rank, ctx.size) < 0)
if (create_dummyattrs (ctx.h, rank, size) < 0)
log_err_exit ("creating dummy attributes");

overlay_set_rank (ctx.overlay, ctx.rank);
overlay_set_rank (ctx.overlay, rank);

/* Registers message handlers and obtains rank.
*/
Expand All @@ -499,9 +495,9 @@ int main (int argc, char *argv[])
|| attr_add_active (ctx.attrs, "mcast.relay-endpoint",
FLUX_ATTRFLAG_IMMUTABLE,
attr_get_overlay, NULL, ctx.overlay) < 0
|| attr_add_uint32 (ctx.attrs, "rank", ctx.rank,
|| attr_add_uint32 (ctx.attrs, "rank", rank,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_uint32 (ctx.attrs, "size", ctx.size,
|| attr_add_uint32 (ctx.attrs, "size", size,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_int (ctx.attrs, "tbon.arity", ctx.tbon.k,
FLUX_ATTRFLAG_IMMUTABLE) < 0
Expand All @@ -517,7 +513,7 @@ int main (int argc, char *argv[])
log_err_exit ("configuring attributes");
}

if (ctx.rank == 0) {
if (rank == 0) {
if (runlevel_register_attrs (ctx.runlevel, ctx.attrs) < 0)
log_err_exit ("configuring runlevel attributes");
}
Expand All @@ -542,11 +538,11 @@ int main (int argc, char *argv[])
* make a guess.
*/
if (ctx.shutdown_grace == 0) {
if (ctx.size < 16)
if (size < 16)
ctx.shutdown_grace = 1;
else if (ctx.size < 128)
else if (size < 128)
ctx.shutdown_grace = 2;
else if (ctx.size < 1024)
else if (size < 1024)
ctx.shutdown_grace = 4;
else
ctx.shutdown_grace = 10;
Expand All @@ -563,9 +559,9 @@ int main (int argc, char *argv[])
log_msg ("relay: %s", relay ? relay : "none");
}

set_proctitle (ctx.rank);
set_proctitle (rank);

if (ctx.rank == 0) {
if (rank == 0) {
const char *rc1, *rc3, *pmi, *uri;
const char *rc2 = ctx.init_shell_cmd;
size_t rc2_len = ctx.init_shell_cmd_len;
Expand All @@ -579,7 +575,7 @@ int main (int argc, char *argv[])
if (attr_get (ctx.attrs, "conf.pmi_library_path", &pmi, NULL) < 0)
log_err_exit ("conf.pmi_library_path is not set");

runlevel_set_size (ctx.runlevel, ctx.size);
runlevel_set_size (ctx.runlevel, size);
runlevel_set_subprocess_manager (ctx.runlevel, ctx.sm);
runlevel_set_callback (ctx.runlevel, runlevel_cb, &ctx);
runlevel_set_io_callback (ctx.runlevel, runlevel_io_cb, &ctx);
Expand Down Expand Up @@ -613,7 +609,7 @@ int main (int argc, char *argv[])
log_msg_exit ("heaptrace_initialize");
if (sequence_hash_initialize (ctx.h) < 0)
log_err_exit ("sequence_hash_initialize");
if (exec_initialize (ctx.h, ctx.sm, ctx.rank, ctx.attrs) < 0)
if (exec_initialize (ctx.h, ctx.sm, rank, ctx.attrs) < 0)
log_err_exit ("exec_initialize");
if (ping_initialize (ctx.h, "cmb") < 0)
log_err_exit ("ping_initialize");
Expand All @@ -626,7 +622,7 @@ int main (int argc, char *argv[])
*/
if (ctx.verbose)
log_msg ("initializing modules");
modhash_set_rank (ctx.modhash, ctx.rank);
modhash_set_rank (ctx.modhash, rank);
modhash_set_flux (ctx.modhash, ctx.h);
modhash_set_heartbeat (ctx.modhash, ctx.heartbeat);
/* Load the local connector module.
Expand All @@ -645,7 +641,7 @@ int main (int argc, char *argv[])
log_err_exit ("initializing heartbeat attributes");
if (heartbeat_start (ctx.heartbeat) < 0)
log_err_exit ("heartbeat_start");
if (ctx.rank == 0 && ctx.verbose)
if (rank == 0 && ctx.verbose)
log_msg ("installing session heartbeat: T=%0.1fs",
heartbeat_get_rate (ctx.heartbeat));

Expand Down Expand Up @@ -812,15 +808,17 @@ static void hello_update_cb (hello_t *hello, void *arg)

if (hello_complete (hello)) {
flux_log (ctx->h, LOG_INFO, "wireup: %d/%d (complete) %.1fs",
hello_get_count (hello), ctx->size, hello_get_time (hello));
hello_get_count (hello), overlay_get_size(ctx->overlay),
hello_get_time (hello));
flux_log (ctx->h, LOG_INFO, "Run level %d starting", 1);
overlay_set_idle_warning (ctx->overlay, 3);
if (runlevel_set_level (ctx->runlevel, 1) < 0)
log_err_exit ("runlevel_set_level 1");
/* FIXME: shutdown hello protocol */
} else {
flux_log (ctx->h, LOG_INFO, "wireup: %d/%d (incomplete) %.1fs",
hello_get_count (hello), ctx->size, hello_get_time (hello));
hello_get_count (hello), overlay_get_size(ctx->overlay),
hello_get_time (hello));
}
}

Expand All @@ -830,7 +828,7 @@ static void shutdown_cb (shutdown_t *s, bool expired, void *arg)
{
broker_ctx_t *ctx = arg;
if (expired) {
if (ctx->rank == 0)
if (overlay_get_rank(ctx->overlay) == 0)
exit_rc = shutdown_get_rc (s);
flux_reactor_stop (flux_get_reactor (ctx->h));
}
Expand Down Expand Up @@ -1105,7 +1103,6 @@ static char * calc_endpoint (attr_t *attrs, const char *endpoint)
}

static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
uint32_t *rank_out, uint32_t *size_out,
double *elapsed_sec)
{
int spawned, size, rank, appnum;
Expand Down Expand Up @@ -1137,18 +1134,16 @@ static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
log_msg ("PMI_Get_size: %s", pmi_strerror (e));
goto done;
}
overlay_set_size (overlay, (uint32_t)size);
if ((e = PMI_Get_rank (&rank)) != PMI_SUCCESS) {
log_msg ("PMI_Get_rank: %s", pmi_strerror (e));
goto done;
}
*rank_out = (uint32_t)rank;
overlay_set_rank (overlay, (uint32_t)rank);
if ((e = PMI_Get_appnum (&appnum)) != PMI_SUCCESS) {
log_msg ("PMI_Get_appnum: %s", pmi_strerror (e));
goto done;
}
*size_out = (uint32_t)size;

overlay_set_rank (overlay, (uint32_t)rank);

/* Get id string.
*/
Expand Down Expand Up @@ -1788,7 +1783,7 @@ static void broker_add_services (broker_ctx_t *ctx)
{
struct internal_service *svc;
for (svc = &services[0]; svc->name != NULL; svc++) {
if (!nodeset_member (svc->nodeset, ctx->rank))
if (!nodeset_member (svc->nodeset, overlay_get_rank(ctx->overlay)))
continue;
if (service_add (ctx->services, svc->name, NULL,
route_to_handle, ctx) < 0)
Expand Down Expand Up @@ -2077,7 +2072,7 @@ static int subvert_sendmsg_child (broker_ctx_t *ctx, const flux_msg_t *msg,
char uuid[16];
int rc = -1;

snprintf (uuid, sizeof (uuid), "%"PRIu32, ctx->rank);
snprintf (uuid, sizeof (uuid), "%"PRIu32, overlay_get_rank(ctx->overlay));
if (flux_msg_push_route (cpy, uuid) < 0)
goto done;
snprintf (uuid, sizeof (uuid), "%"PRIu32, nodeid);
Expand Down Expand Up @@ -2108,14 +2103,16 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg,
uint32_t nodeid, gw;
int flags;
int rc = -1;
uint32_t rank = overlay_get_rank(ctx->overlay);
uint32_t size = overlay_get_size(ctx->overlay);

if (flux_msg_get_nodeid (msg, &nodeid, &flags) < 0)
goto error;
if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid == ctx->rank) {
if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid == rank) {
rc = overlay_sendmsg_parent (ctx->overlay, msg);
if (rc < 0)
goto error;
} else if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid != ctx->rank) {
} else if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid != rank) {
rc = service_send (ctx->services, msg);
if (rc < 0 && errno == ENOSYS) {
rc = overlay_sendmsg_parent (ctx->overlay, msg);
Expand All @@ -2133,12 +2130,12 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg,
}
if (rc < 0)
goto error;
} else if (nodeid == ctx->rank) {
} else if (nodeid == rank) {
rc = service_send (ctx->services, msg);
if (rc < 0)
goto error;
} else if ((gw = kary_child_route (ctx->tbon.k, ctx->size,
ctx->rank, nodeid)) != KARY_NONE) {
} else if ((gw = kary_child_route (ctx->tbon.k, size, rank, nodeid))
!= KARY_NONE) {
rc = subvert_sendmsg_child (ctx, msg, gw);
if (rc < 0)
goto error;
Expand Down Expand Up @@ -2173,7 +2170,7 @@ static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
goto done;
}

parent = kary_parentof (ctx->tbon.k, ctx->rank);
parent = kary_parentof (ctx->tbon.k, overlay_get_rank(ctx->overlay));
snprintf (puuid, sizeof (puuid), "%"PRIu32, parent);

/* See if it should go to the parent (backwards!)
Expand Down Expand Up @@ -2207,7 +2204,7 @@ static int broker_event_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)

if (!(cpy = flux_msg_copy (msg, true)))
goto done;
if (ctx->rank > 0) {
if (overlay_get_rank(ctx->overlay) > 0) {
if (flux_msg_enable_route (cpy) < 0)
goto done;
rc = overlay_sendmsg_parent (ctx->overlay, cpy);
Expand Down

0 comments on commit 0e47ed5

Please sign in to comment.