Skip to content

Commit

Permalink
Merge pull request #5735 from grondo/taskmap-raw
Browse files Browse the repository at this point in the history
libtaskmap: support decode of raw (semicolon-delimited) taskmaps
  • Loading branch information
mergify[bot] committed Feb 14, 2024
2 parents e71f17b + 6778358 commit 5fe073b
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 14 deletions.
7 changes: 5 additions & 2 deletions doc/man1/flux-job.rst
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,12 @@ support task mapping formats:
.. option:: --to=raw|pmi|multiline

Convert the taskmap to *raw* or *pmi* formats (described in RFC 34), or
*multiline* which prints the node ID of each task, one per line.
*multiline* which prints the node ID of each task, one per line. The
default behavior is to print the RFC 34 taskmap. This option can be useful
to convert between mapping forms, since :program:`flux job taskmap` can
take a raw, pmi, or RFC 34 task map on the command line.

One one of the above options may be used per call.
Only one of the above options may be used per call.

timeleft
--------
Expand Down
198 changes: 189 additions & 9 deletions src/common/libtaskmap/taskmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,188 @@ static struct taskmap *taskmap_decode_pmi (const char *s, flux_error_t *errp)
return NULL;
}

struct raw_task {
int taskid;
int nodeid;
int repeat;
};

static void item_destructor (void **item)
{
if (item) {
free (*item);
*item = NULL;
}
}

static int taskid_cmp (const void *a, const void *b)
{
const struct raw_task *t1 = a;
const struct raw_task *t2 = b;
return (t1->taskid - t2->taskid);
}

static int raw_task_append (zlistx_t *l, int taskid, int nodeid, int repeat)
{
struct raw_task *t = calloc (1, sizeof (*t));
if (!t)
return -1;
t->taskid = taskid;
t->nodeid = nodeid;
t->repeat = repeat;
if (!zlistx_add_end (l, t)) {
free (t);
return -1;
}
return 0;
}

static zlistx_t *raw_task_list_create (void)
{
zlistx_t *l;
if (!(l = zlistx_new ())) {
errno = ENOMEM;
return NULL;
}
zlistx_set_destructor (l, item_destructor);
zlistx_set_comparator (l, &taskid_cmp);
return l;
}

static int raw_task_list_append (zlistx_t *l,
const char *s,
int nodeid,
flux_error_t *errp)
{
int rc = -1;
unsigned int id;
idset_error_t error;
struct idset *ids;

if (!(ids = idset_decode_ex (s, -1, 0, IDSET_FLAG_AUTOGROW, &error))) {
errprintf (errp, "%s", error.text);
goto error;
}
id = idset_first (ids);
while (id != IDSET_INVALID_ID) {
unsigned int next = idset_next (ids, id);
int repeat = 1;
while (next == id + repeat) {
next = idset_next (ids, next);
repeat++;
}
if (raw_task_append (l, id, nodeid, repeat) < 0) {
errprintf (errp, "Out of memory");
goto error;
}
id = next;
}
rc = 0;
error:
idset_destroy (ids);
return rc;
}

static int raw_task_check (struct raw_task *a,
struct raw_task *b,
flux_error_t *errp)
{
struct raw_task t_init = { .taskid = -1, .repeat = 1 };
int start, end1, end2, end;

if (a == NULL)
a = &t_init;

/* Note: a->taskid <= b->taskid since taskmap_decode_raw() sorts
* raw_task objects.
*/
start = b->taskid;
end1 = a->taskid + a->repeat - 1;
end2 = b->taskid + b->repeat - 1;
end = end1 <= end2 ? end1 : end2;

/* If end - start is nonzero then we have overlap. report it.
*/
int overlap = end - start;
if (overlap >= 0) {
/* taskid overlap detected, report as error
*/
if (overlap == 0)
errprintf (errp, "duplicate taskid specified: %d", start);
else
errprintf (errp, "duplicate taskids specified: %d-%d", start, end);
return -1;
}
/* Now check that tasks are consecutive. It is an error if not since
* holes in taskids in a taskmap are not allowed
*/
if (overlap != -1) {
if (overlap == -2)
return errprintf (errp, "missing taskid: %d", end + 1);
else
return errprintf (errp,
"missing taskids: %d-%d",
end + 1,
end - overlap - 1);
}
return 0;
}

static struct taskmap *taskmap_decode_raw (const char *s, flux_error_t *errp)
{
char *tok;
char *p;
char *q;
char *cpy = NULL;
struct taskmap *map = NULL;
zlistx_t *l = NULL;
int nodeid = 0;
struct raw_task *t, *prev;

if (!s || strlen (s) == 0) {
errprintf (errp, "Invalid argument");
return NULL;
}
if (!(map = taskmap_create ())
|| !(cpy = strdup (s))
|| !(l = raw_task_list_create ())) {
errprintf (errp, "Out of memory");
goto error;
}

p = cpy;

while ((tok = strtok_r (p, ";", &q))) {
if (raw_task_list_append (l, tok, nodeid++, errp) < 0)
goto error;
p = NULL;
}

/* sort by taskid */
zlistx_sort (l);
t = zlistx_first (l);
prev = NULL;

while (t) {
if (raw_task_check (prev, t, errp) < 0)
goto error;
if (taskmap_append (map, t->nodeid, 1, t->repeat) < 0) {
errprintf (errp, "taskmap_append: %s", strerror (errno));
goto error;
}
prev = t;
t = zlistx_next (l);
}
zlistx_destroy (&l);
free (cpy);
return map;
error:
zlistx_destroy (&l);
taskmap_destroy (map);
free (cpy);
return NULL;
}

struct taskmap *taskmap_decode (const char *s, flux_error_t *errp)
{
struct taskmap *map = NULL;
Expand All @@ -435,11 +617,17 @@ struct taskmap *taskmap_decode (const char *s, flux_error_t *errp)
|| strstr (s, "vector,"))
return taskmap_decode_pmi (s, errp);

/* A string without special characters might be a raw taskmap:
*/
if (!strpbrk (s, "({[]})"))
return taskmap_decode_raw (s, errp);

/* O/w, decode as RFC 34 Taskmap
*/
if (!(o = json_loads (s, JSON_DECODE_ANY, &error))) {
errprintf (errp, "%s", error.text);
goto out;
}

map = taskmap_decode_json (o, errp);
out:
json_decref (o);
Expand Down Expand Up @@ -675,14 +863,6 @@ static char *list_join (zlistx_t *l, char *sep)
return result;
}

static void item_destructor (void **item)
{
if (item) {
free (*item);
*item = NULL;
}
}

static char *taskmap_encode_raw (const struct taskmap *map, int flags)
{
char *result = NULL;
Expand Down
5 changes: 3 additions & 2 deletions src/common/libtaskmap/taskmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ void taskmap_destroy (struct taskmap *map);
int taskmap_append (struct taskmap *map, int nodeid, int nnodes, int ppn);

/* Decode string 'map' into taskmap object.
* The string may be a JSON array, RFC 34 wrapped object, or a mapping
* encoded in PMI-1 PMI_process_mapping form described in RFC 13.
* The string may be a JSON array, RFC 34 wrapped object, a mapping
* encoded in PMI-1 PMI_process_mapping form described in RFC 13, or
* a raw, semicolon-delimited list of taskids.
* Returns taskmap on success, or NULL on error with error string in 'errp'.
*/
struct taskmap *taskmap_decode (const char *map, flux_error_t *errp);
Expand Down
44 changes: 43 additions & 1 deletion src/common/libtaskmap/test/taskmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ static void rfc34_tests ()
"taskmap is known");
taskmap_destroy (map);
free (s);

/* Try raw back to taskmap:
*/
map = taskmap_decode (t->expected, NULL);
ok (map != NULL,
"taskmap_decode (%s)",
t->expected);
if (map) {
ok ((s = taskmap_encode (map, 0)) != NULL,
"taskmap_encode works");
is (s, t->taskmap,
"taskmap=%s",
s);
taskmap_destroy (map);
free (s);
}
}
}

Expand Down Expand Up @@ -272,7 +288,6 @@ static void main_tests ()

static const char *invalid[] = {
"}",
"42",
"{}",
"{\"version\":1}",
"{\"version\":1,\"map\":{}}",
Expand Down Expand Up @@ -553,6 +568,32 @@ void test_deranged (void)
taskmap_destroy (map);
}

struct test_vector raw_tests[] = {
{ "-1", "error parsing range '-1'" },
{ "1-3;a-b", "error parsing range 'a-b'" },
{ "1,1", "range '1' is out of order" },
{ "0-1;1-2", "duplicate taskid specified: 1" },
{ "5-15;0-10", "duplicate taskids specified: 5-10" },
{ "1", "missing taskid: 0" },
{ "3-4;0-1", "missing taskid: 2" },
{ "0-1;10-11", "missing taskids: 2-9" },
{ NULL, NULL },
};

static void test_raw_decode_errors (void)
{
struct test_vector *t;
for (t = &raw_tests[0]; t->taskmap != NULL; t++) {
flux_error_t error;
ok (taskmap_decode (t->taskmap, &error) == NULL,
"taskmap_decode (%s) fails",
t->taskmap);
is (error.text, t->expected,
"taskmap_decode: %s",
error.text);
}
}

int main (int ac, char **av)
{
plan (NO_PLAN);
Expand All @@ -565,6 +606,7 @@ int main (int ac, char **av)
append_cyclic_one ();
test_check ();
test_deranged ();
test_raw_decode_errors ();
done_testing ();
}

Expand Down

0 comments on commit 5fe073b

Please sign in to comment.