Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add flux --parent option and new 'instance-level' attribute #2326

Merged
merged 9 commits into from Aug 28, 2019
5 changes: 5 additions & 0 deletions doc/man1/flux.adoc
Expand Up @@ -30,6 +30,11 @@ OPTIONS
*-h, --help*::
Display help on options, and a list of the core Flux sub-commands.

*-p, --parent*::
If current instance is a child, connect to parent instead. Also sets
'FLUX_KVS_NAMEPSACE' if current instance is confined to a KVS namespace
in the parent. This option may be specified multiple times.

*-v, --verbose*::
Display command environment, and the path search for 'CMD'.

Expand Down
29 changes: 28 additions & 1 deletion src/broker/boot_pmi.c
Expand Up @@ -144,6 +144,30 @@ static int update_endpoint_attr (attr_t *attrs, const char *name,
return rc;
}

/* If the broker is launched via flux-shell, then the shell may opt
* to set a "flux.instance-level" parameter in the PMI kvs to tell
* the booting instance at what "level" it will be running, i.e. the
* number of parents. If the PMI key is missing, this is not an error,
* instead the level of this instance is considered to be zero.
*/
static int set_instance_level_attr (struct pmi_handle *pmi,
const char *kvsname,
attr_t *attrs)
{
int result;
char val[32];
const char *level = "0";

result = broker_pmi_kvs_get (pmi,
kvsname,
"flux.instance-level",
val,
sizeof (val));
if (result == PMI_SUCCESS)
level = val;
return attr_add (attrs, "instance-level", level, FLUX_ATTRFLAG_IMMUTABLE);
}

int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k)
{
int parent_rank;
Expand All @@ -170,7 +194,10 @@ int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k)
log_msg ("broker_pmi_get_params: %s", pmi_strerror (result));
goto error;
}

if (set_instance_level_attr (pmi, pmi_params.kvsname, attrs) < 0) {
log_err ("set_instance_level_attr");
goto error;
}
if (overlay_init (overlay,
(uint32_t)pmi_params.size,
(uint32_t)pmi_params.rank,
Expand Down
77 changes: 70 additions & 7 deletions src/cmd/flux.c
Expand Up @@ -37,11 +37,15 @@ void setup_path (struct environment *env, const char *argv0);
void setup_keydir (struct environment *env, int flags);
static void print_environment (struct environment *env);
static void register_builtin_subcommands (optparse_t *p);
static void push_parent_environment (optparse_t *p, struct environment *env);

static struct optparse_option opts[] = {
{ .name = "verbose", .key = 'v', .has_arg = 0,
.usage = "Be verbose about environment and command search",
},
{ .name = "parent", .key = 'p', .has_arg = 0,
.usage = "Set environment of parent instead of current instance",
},
OPTPARSE_TABLE_END
};

Expand Down Expand Up @@ -197,7 +201,14 @@ int main (int argc, char *argv[])
if ((flags & CONF_FLAG_INTREE))
environment_push (env, "FLUX_CONF_INTREE", "1");

environment_apply(env);
environment_apply (env);

/* If --parent, push parent environment for each occurence
*/
for (int n = optparse_getopt (p, "parent", NULL); n > 0; n--) {
push_parent_environment (p, env);
environment_apply (env);
}
optparse_set_data (p, "env", env);

if (vopt)
Expand Down Expand Up @@ -378,6 +389,63 @@ void exec_subcommand (const char *searchpath, bool vopt, int argc, char *argv[])
}
}

static flux_t *flux_open_internal (optparse_t *p, const char *uri)
{
flux_t *h = NULL;
if ((h = optparse_get_data (p, "flux_t")))
flux_incref (h);
else if ((h = flux_open (NULL, 0)) == NULL)
log_err_exit ("flux_open");
return h;
}

static void flux_close_internal (optparse_t *p)
{
flux_t *h = optparse_get_data (p, "flux_t");
if (h) {
flux_close (h);
optparse_set_data (p, "flux_t", NULL);
}
}

static void push_parent_environment (optparse_t *p, struct environment *env)
{
const char *uri;
const char *ns;
const char *path;
flux_t *h = flux_open_internal (p, NULL);

if (h == NULL)
log_err_exit ("flux_open");

/* If parent-uri doesn't exist then we are at the root instance,
* just do nothing.
*/
if (!(uri = flux_attr_get (h, "parent-uri")))
return;

environment_set (env, "FLUX_URI", uri, 0);

/* Before closing current instance handle, set FLUX_KVS_NAMESPACE
* if parent-kvs-namespace attr is set.
*/
if ((ns = flux_attr_get (h, "parent-kvs-namespace")))
environment_set (env, "FLUX_KVS_NAMESPACE", ns, 0);
else
environment_unset (env, "FLUX_KVS_NAMESPACE");

/* Now close current handle and connect to parent URI, pushing
* parent exec and module paths onto env
*/
flux_close_internal (p);
if (!(h = flux_open_internal (p, uri)))
log_err_exit ("flux_open (parent)");
if ((path = flux_attr_get (h, "conf.exec_path")))
environment_push (env, "FLUX_EXEC_PATH", path);
if ((path = flux_attr_get (h, "conf.module_path")))
environment_push (env, "FLUX_MODULE_PATH", path);
}

static void print_environment (struct environment *env)
{
const char *val;
Expand All @@ -388,12 +456,7 @@ static void print_environment (struct environment *env)

flux_t *builtin_get_flux_handle (optparse_t *p)
{
flux_t *h = NULL;
if ((h = optparse_get_data (p, "flux_t")))
flux_incref (h);
else if ((h = flux_open (NULL, 0)) == NULL)
log_err_exit ("flux_open");
return h;
return flux_open_internal (p, NULL);
}

static void register_builtin_subcommands (optparse_t *p)
Expand Down
53 changes: 50 additions & 3 deletions src/shell/pmi.c
Expand Up @@ -81,6 +81,7 @@ struct shell_pmi {
flux_shell_t *shell;
struct pmi_simple_server *server;
zhashx_t *kvs;
zhashx_t *locals;
int cycle; // count cycles of put / barrier / get
};

Expand All @@ -95,6 +96,14 @@ static int shell_pmi_kvs_put (void *arg,
return 0;
}

static void pmi_kvs_put_local (struct shell_pmi *pmi,
const char *key,
const char *val)
{
zhashx_update (pmi->kvs, key, (char *)val);
zhashx_update (pmi->locals, key, (void *) 0x1);
}

/* Handle kvs lookup response.
*/
static void kvs_lookup_continuation (flux_future_t *f, void *arg)
Expand Down Expand Up @@ -205,7 +214,12 @@ static int shell_pmi_barrier_enter (void *arg)
val = zhashx_first (pmi->kvs);
while (val) {
key = zhashx_cursor (pmi->kvs);
if (!strcmp (key, "PMI_process_mapping")) {
/* Special case:
* Keys in pmi->locals are not added to the KVS transaction
* because they were locally generated and need not be
* shared with the other shells.
*/
if (zhashx_lookup (pmi->locals, key)) {
val = zhashx_next (pmi->kvs);
continue;
}
Expand Down Expand Up @@ -325,7 +339,7 @@ int init_clique (struct shell_pmi *pmi)
log_err ("pmi_process_mapping_encode");
goto out;
}
zhashx_update (pmi->kvs, "PMI_process_mapping", val);
pmi_kvs_put_local (pmi, "PMI_process_mapping", val);
out:
free (blocks);
return 0;
Expand All @@ -335,12 +349,42 @@ int init_clique (struct shell_pmi *pmi)
return -1;
}

static int set_flux_instance_level (struct shell_pmi *pmi)
{
char *p;
long l;
int n;
int rc = -1;
char val [SIMPLE_KVS_VAL_MAX];
const char *level = flux_attr_get (pmi->shell->h, "instance-level");

if (!level)
return 0;

errno = 0;
l = strtol (level, &p, 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from strtol(3):

Since strtol() can legitimately return 0, LONG_MAX, or LONG_MIN
(LLONG_MAX or LLONG_MIN for strtoll()) on both success and failure, the
calling program should set errno to 0 before the call, and then deter‐
mine if an error occurred by checking whether errno has a nonzero value
after the call.

might want to treat < 0 as an error too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you see this comment @grondo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, yeah I thought I had added that check? Let me make sure I committed what I thought I committed.

And sorry for the sloppy work!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe because I went ahead and squashed you missed the updated code. I now have the below. Does that address your comments, or did I mess it up again?

    errno = 0;
    l = strtol (level, &p, 10);
    if (errno != 0 || *p != '\0' || l < 0) {
        log_msg ("set_flux_instance_level level=%s invalid", level);
        goto out;
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry. That looks great!

if (errno != 0 || *p != '\0' || l < 0) {
log_msg ("set_flux_instance_level level=%s invalid", level);
goto out;
}
n = snprintf (val, sizeof (val), "%lu", l+1);
if (n >= sizeof (val)) {
log_err ("set_flux_instance_level: snprintf");
goto out;
}
pmi_kvs_put_local (pmi, "flux.instance-level", val);
rc = 0;
out:
return rc;
}

void shell_pmi_destroy (struct shell_pmi *pmi)
{
if (pmi) {
int saved_errno = errno;
pmi_simple_server_destroy (pmi->server);
zhashx_destroy (&pmi->kvs);
zhashx_destroy (&pmi->locals);
free (pmi);
errno = saved_errno;
}
Expand Down Expand Up @@ -389,14 +433,17 @@ struct shell_pmi *shell_pmi_create (flux_shell_t *shell)
flags,
pmi)))
goto error;
if (!(pmi->kvs = zhashx_new ())) {
if (!(pmi->kvs = zhashx_new ())
|| !(pmi->locals = zhashx_new ())) {
errno = ENOMEM;
goto error;
}
zhashx_set_destructor (pmi->kvs, kvs_value_destructor);
zhashx_set_duplicator (pmi->kvs, kvs_value_duplicator);
if (init_clique (pmi) < 0)
goto error;
if (!shell->standalone && set_flux_instance_level (pmi) < 0)
goto error;
return pmi;
error:
shell_pmi_destroy (pmi);
Expand Down
38 changes: 37 additions & 1 deletion t/t3100-flux-in-flux.t
Expand Up @@ -11,7 +11,7 @@ test_under_flux ${SIZE}
echo "# $0: flux session size will be ${SIZE}"

test_expect_success "flux can run flux instance as a job" '
run_timeout 5 flux srun -n1 -N1 \
run_timeout 10 flux srun -n1 -N1 \
flux start flux getattr size >size.out &&
echo 1 >size.exp &&
test_cmp size.exp size.out
Expand All @@ -25,4 +25,40 @@ test_expect_success "flux subinstance leaves local_uri, remote_uri in KVS" '
flux job info $id guest.flux.remote_uri
'

test_expect_success "flux --parent works in subinstance" '
id=$(flux jobspec srun -n1 \
flux start flux --parent kvs put test=ok \
| flux job submit) &&
flux job attach $id &&
flux job info $id guest.test > guest.test &&
cat <<-EOF >guest.test.exp &&
ok
EOF
test_cmp guest.test.exp guest.test
'

test_expect_success "flux --parent --parent works in subinstance" '
id=$(flux jobspec srun -n1 \
flux start flux start flux --parent --parent kvs put test=ok \
| flux job submit) &&
flux job attach $id &&
flux job info $id guest.test > guest2.test &&
cat <<-EOF >guest2.test.exp &&
ok
EOF
test_cmp guest2.test.exp guest2.test
'

test_expect_success "flux sets instance-level attribute" '
level=$(flux srun flux start \
flux getattr instance-level) &&
level2=$(flux srun flux start \
flux srun flux start \
flux getattr instance-level) &&
level0=$(flux start flux getattr instance-level) &&
test "$level" = "1" &&
test "$level2" = "2" &&
test "$level0" = "0"
'

test_done