Skip to content

Commit

Permalink
Merge pull request #2278 from garlick/PMI_process_mapping
Browse files Browse the repository at this point in the history
flux-shell: populate PMI_process_mapping key
  • Loading branch information
grondo committed Jul 31, 2019
2 parents 94cf968 + b3852e9 commit 505e7c1
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 6 deletions.
6 changes: 3 additions & 3 deletions src/common/libpmi/Makefile.am
Expand Up @@ -19,16 +19,16 @@ libpmi_common_sources = \
pmi_strerror.c \
pmi_strerror.h \
keyval.c \
keyval.h
keyval.h \
clique.c \
clique.h

libpmi_client_la_SOURCES = \
simple_client.c \
simple_client.h \
pmi.c \
dgetline.c \
dgetline.h \
clique.c \
clique.h \
$(libpmi_common_sources)

libpmi_server_la_SOURCES = \
Expand Down
38 changes: 38 additions & 0 deletions src/common/libpmi/clique.c
Expand Up @@ -11,6 +11,7 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
Expand All @@ -20,6 +21,43 @@
#include "pmi.h"
#include "clique.h"

static int catprintf (char **buf, int *bufsz, const char *fmt, ...)
{
va_list ap;
int n;

va_start (ap, fmt);
n = vsnprintf (*buf, *bufsz, fmt, ap);
va_end (ap);
if (n >= *bufsz)
return -1;
*bufsz -= n;
*buf += n;
return 0;
}

int pmi_process_mapping_encode (struct pmi_map_block *blocks,
int nblocks,
char *buf,
int bufsz)
{
int i;

if (catprintf (&buf, &bufsz, "(vector,") < 0)
return -1;
for (i = 0; i < nblocks; i++) {
if (catprintf (&buf, &bufsz, "%s(%d,%d,%d)",
i > 0 ? "," : "",
blocks[i].nodeid,
blocks[i].nodes,
blocks[i].procs) < 0)
return -1;
}
if (catprintf (&buf, &bufsz, ")") < 0)
return -1;
return 0;
}

static int parse_block (const char *s, struct pmi_map_block *block)
{
char *endptr;
Expand Down
8 changes: 8 additions & 0 deletions src/common/libpmi/clique.h
Expand Up @@ -41,6 +41,14 @@ struct pmi_map_block {
int pmi_process_mapping_parse (const char *s,
struct pmi_map_block **blocks, int *nblocks);

/* Generate PMI_process_mapping value string from array of pmi_map_blocks,
* and place it in 'buf'. Result will be null terminated.
*/
int pmi_process_mapping_encode (struct pmi_map_block *blocks,
int nblocks,
char *buf,
int bufsz);


/* Determine the nodeid that will start 'rank', and return it in 'nodeid'.
*/
Expand Down
64 changes: 61 additions & 3 deletions src/shell/pmi.c
Expand Up @@ -27,10 +27,10 @@
*
* Other requests have callbacks from the engine to provide data,
* which is fed back to the engine, which then calls shell_pmi_response_send().
* These are kvs_get, kvs_put, and barrier. Although the protocol engine
* These are kvs_get, kvs_put, and barrier. Although the task
* is effectively blocked while these callbacks are handled, they are
* implemented with asynchronous continuation callbacks so that the shell's
* reactor remains live while they are waiting for an answer.
* implemented with asynchronous continuation callbacks so that other tasks
* and the shell's reactor remain live while the task awaits an answer.
*
* The PMI KVS supports a put / barrier / get pattern. The barrier
* distributes KVS data that was "put" so that it is available to "get".
Expand Down Expand Up @@ -68,6 +68,7 @@
#include <flux/core.h>

#include "src/common/libpmi/simple_server.h"
#include "src/common/libpmi/clique.h"
#include "src/common/libutil/log.h"

#include "task.h"
Expand Down Expand Up @@ -204,6 +205,10 @@ static int shell_pmi_barrier_enter (void *arg)
val = zhashx_first (pmi->kvs);
while (val) {
key = zhashx_cursor (pmi->kvs);
if (!strcmp (key, "PMI_process_mapping")) {
val = zhashx_next (pmi->kvs);
continue;
}
if (shell_pmi_kvs_key (nkey,
sizeof (nkey),
pmi->shell->jobid,
Expand Down Expand Up @@ -279,6 +284,57 @@ void shell_pmi_task_ready (struct shell_task *task, void *arg)
}
}

/* Generate 'PMI_process_mapping' key (see RFC 13) for MPI clique computation.
*
* Create an array of pmi_map_block structures, sized for worst case mapping
* (no compression possible). Walk through the rcalc info for each shell rank.
* If shell's mapping looks identical to previous one, increment block->nodes;
* otherwise consume another array slot. Finally, encode to string, put it
* in the local KVS hash, and free array.
*/
int init_clique (struct shell_pmi *pmi)
{
struct pmi_map_block *blocks;
int nblocks;
int i;
char val[SIMPLE_KVS_VAL_MAX];

if (!(blocks = calloc (pmi->shell->info->shell_size, sizeof (*blocks))))
return -1;
nblocks = 0;

for (i = 0; i < pmi->shell->info->shell_size; i++) {
struct rcalc_rankinfo ri;

if (rcalc_get_nth (pmi->shell->info->rcalc, i, &ri) < 0)
goto error;
if (nblocks == 0 || blocks[nblocks - 1].procs != ri.ntasks) {
blocks[nblocks].nodeid = i;
blocks[nblocks].procs = ri.ntasks;
blocks[nblocks].nodes = 1;
nblocks++;
}
else
blocks[nblocks - 1].nodes++;
}
/* If value exceeds SIMPLE_KVS_VAL_MAX, skip setting the key
* without generating an error. The client side will not treat
* a missing key as an error. It should be unusual though so log it.
*/
if (pmi_process_mapping_encode (blocks, nblocks, val, sizeof (val)) < 0) {
log_err ("pmi_process_mapping_encode");
goto out;
}
zhashx_update (pmi->kvs, "PMI_process_mapping", val);
out:
free (blocks);
return 0;
error:
free (blocks);
errno = EINVAL;
return -1;
}

void shell_pmi_destroy (struct shell_pmi *pmi)
{
if (pmi) {
Expand Down Expand Up @@ -339,6 +395,8 @@ struct shell_pmi *shell_pmi_create (flux_shell_t *shell)
}
zhashx_set_destructor (pmi->kvs, kvs_value_destructor);
zhashx_set_duplicator (pmi->kvs, kvs_value_duplicator);
if (init_clique (pmi) < 0)
goto error;
return pmi;
error:
shell_pmi_destroy (pmi);
Expand Down
7 changes: 7 additions & 0 deletions t/t2601-job-shell-standalone.t
Expand Up @@ -135,6 +135,13 @@ test_expect_success 'flux-shell: shell PMI works' '
${FLUX_SHELL} -v -s -r 0 -j j8pmi -R R8 51 \
>pmi_info.out 2>pmi_info.err
'
test_expect_success 'flux-shell: shell PMI exports clique info' '
flux jobspec srun -N1 -n8 ${PMI_INFO} -c >j8pmi_clique &&
${FLUX_SHELL} -v -s -r 0 -j j8pmi_clique -R R8 51 \
>pmi_clique.out 2>pmi_clique.err &&
COUNT=$(grep "clique=0,1,2,3,4,5,6,7" pmi_clique.out | wc -l) &&
test ${COUNT} -eq 8
'
test_expect_success 'flux-shell: shell PMI KVS works' '
flux jobspec srun -N1 -n8 ${KVSTEST} >j8kvs &&
${FLUX_SHELL} -v -s -r 0 -j j8kvs -R R8 52 \
Expand Down
37 changes: 37 additions & 0 deletions t/t2602-job-shell.t
Expand Up @@ -70,6 +70,43 @@ test_expect_success 'job-shell: PMI works' '
flux job attach $id >pmi_info.out 2>pmi_info.err &&
grep size=4 pmi_info.out
'
test_expect_success 'pmi-shell: PMI cliques are correct for 1 ppn' '
id=$(flux jobspec srun -N4 -n4 ${PMI_INFO} -c | flux job submit) &&
flux job attach $id >pmi_clique1.raw 2>pmi_clique1.err &&
sort -snk1 <pmi_clique1.raw >pmi_clique1.out &&
sort >pmi_clique1.exp <<-EOT &&
0: clique=0
1: clique=1
2: clique=2
3: clique=3
EOT
test_cmp pmi_clique1.exp pmi_clique1.out
'
test_expect_success 'pmi-shell: PMI cliques are correct for 2 ppn' '
id=$(flux jobspec srun -N2 -n4 ${PMI_INFO} -c | flux job submit) &&
flux job attach $id >pmi_clique2.raw 2>pmi_clique2.err &&
sort -snk1 <pmi_clique2.raw >pmi_clique2.out &&
sort >pmi_clique2.exp <<-EOT &&
0: clique=0,1
1: clique=0,1
2: clique=2,3
3: clique=2,3
EOT
test_cmp pmi_clique2.exp pmi_clique2.out
'
test_expect_success 'pmi-shell: PMI cliques are correct for irregular ppn' '
id=$(flux jobspec srun -N4 -n5 ${PMI_INFO} -c | flux job submit) &&
flux job attach $id >pmi_cliquex.raw 2>pmi_cliquex.err &&
sort -snk1 <pmi_cliquex.raw >pmi_cliquex.out &&
sort >pmi_cliquex.exp <<-EOT &&
0: clique=0,1
1: clique=0,1
2: clique=2
3: clique=3
4: clique=4
EOT
test_cmp pmi_cliquex.exp pmi_cliquex.out
'
test_expect_success 'job-shell: PMI KVS works' '
id=$(flux jobspec srun -N4 ${KVSTEST} | flux job submit) &&
flux job attach $id >kvstest.out 2>kvstest.err &&
Expand Down

0 comments on commit 505e7c1

Please sign in to comment.