Skip to content

Commit

Permalink
Merge pull request #3175 from SteVwonder/extend-jobspec-parsing
Browse files Browse the repository at this point in the history
shell: allow multiple resources per level in jobspec
  • Loading branch information
mergify[bot] committed Sep 18, 2020
2 parents 4fee45e + 0aca61d commit fde68d3
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 103 deletions.
192 changes: 92 additions & 100 deletions src/shell/jobspec.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,111 +74,134 @@ void jobspec_destroy (struct jobspec *job)
static int recursive_parse_helper (struct jobspec *job,
json_t *curr_resource,
json_error_t *error,
struct res_level *res,
int level,
int with_multiplier,
int num_nodes
)
int with_multiplier)
{
size_t index;
json_t *value;
size_t size = json_array_size (curr_resource);
struct res_level res;
int curr_multiplier;

if (size == 0) {
set_error (error, "Malformed jobspec: resource entry is not a list");
return -1;
}

json_array_foreach (curr_resource, index, value) {
if (parse_res_level (value, level, res, error) < 0) {
return -1;
}
if (!strcmp (res->type, "core")) {
// We allow multiple resources to be defined at the core-level.
// We also allow resources below the core-level to be defined.
// Neither of these allowances adds much complication given that
// the finest granularity info we need is cores-per-slot.
break;
}
if ((index > 0) && (res->with != NULL)) {
set_error (error,
"Unsupported jobspec: multiple resources at non-leaf level %d",
level);
if (parse_res_level (value, level, &res, error) < 0) {
return -1;
}
}

with_multiplier = with_multiplier * res->count;
curr_multiplier = with_multiplier * res.count;

if (!strcmp (res->type, "node")) {
if (job->slot_count > 0) {
set_error (error, "node resource encountered after slot resource");
return -1;
}
if (!strcmp (res.type, "node")) {
if (job->slot_count > 0) {
set_error (error, "node resource encountered after slot resource");
return -1;
}
if (job->cores_per_slot > 0) {
set_error (error, "node resource encountered after core resource");
return -1;
}
if (job->node_count > 0) {
set_error (error, "node resource encountered after node resource");
return -1;
}

num_nodes = with_multiplier;
} else if (!strcmp (res->type, "slot")) {
job->slot_count = with_multiplier;
job->node_count = curr_multiplier;
} else if (!strcmp (res.type, "slot")) {
if (job->cores_per_slot > 0) {
set_error (error, "slot resource encountered after core resource");
return -1;
}
if (job->slot_count > 0) {
set_error (error, "slot resource encountered after slot resource");
return -1;
}

// Reset the with_multiplier since we are now looking
// to calculate the cores_per_slot value
with_multiplier = 1;
job->slot_count = curr_multiplier;

// Check if we already encountered the `node` resource
if (num_nodes > 0) {
// N.B.: with a strictly enforced ordering of node then slot
// (with arbitrary non-core resources in between)
// the slots_per_node will always be a perfectly round integer
// (i.e., job->slot_count % num_nodes == 0)
job->slots_per_node = job->slot_count / num_nodes;
}
} else if (!strcmp (res->type, "core")) {
if (job->slot_count < 1) {
set_error (error, "core resource encountered before slot resource");
return -1;
// Reset the multiplier since we are now looking
// to calculate the cores_per_slot value
curr_multiplier = 1;

// Check if we already encountered the `node` resource
if (job->node_count > 0) {
// N.B.: with a strictly enforced ordering of node then slot
// (with arbitrary non-core resources in between)
// the slots_per_node will always be a perfectly round integer
// (i.e., job->slot_count % job->node_count == 0)
job->slots_per_node = job->slot_count / job->node_count;
}
} else if (!strcmp (res.type, "core")) {
if (job->slot_count < 1) {
set_error (error, "core resource encountered before slot resource");
return -1;
}
if (job->cores_per_slot > 0) {
set_error (error, "core resource encountered after core resource");
return -1;
}

job->cores_per_slot = curr_multiplier;
// N.B.: despite having found everything we were looking for (i.e.,
// node, slot, and core resources), we have to keep recursing to
// make sure their aren't additional erroneous node/slot/core
// resources in the jobspec
}

job->cores_per_slot = with_multiplier;
// We've found everything we needed, we can stop recursing
return 0;
}
if (res.with != NULL) {
if (recursive_parse_helper (job,
res.with,
error,
level+1,
curr_multiplier)
< 0) {
return -1;
}
}

if (res->with != NULL) {
return recursive_parse_helper (job,
res->with,
error,
res,
level+1,
with_multiplier,
num_nodes);
if (!strcmp (res.type, "node")) {
if ((job->slot_count <= 0) || (job->cores_per_slot <= 0)) {
set_error (error,
"node encountered without slot&core below it");
return -1;
}
} else if (!strcmp (res.type, "slot")) {
if (job->cores_per_slot <= 0) {
set_error (error, "slot encountered without core below it");
return -1;
}
}
}
return 0;
}

/* This function requires that the jobspec resource ordering is the
* same as the ordering specified in V1, but it allows additional
* resources before and in between the V1 resources
* (i.e., node, slot, and core).
* In shorthand, it requires that the jobspec follows the form
* ...->[node]->...->slot->...->core. Where `node` is optional,
* and `...` represents any non-V1 resources.
/* This function requires that the jobspec resource ordering is the same as the
* ordering specified in V1, but it allows additional resources before and in
* between the V1 resources (i.e., node, slot, and core). In shorthand, it
* requires that the jobspec follows the form ...->[node]->...->slot->...->core.
* Where `node` is optional, and `...` represents any non-V1
* resources. Additionally, this function also allows multiple resources at any
* level, as long as there is only a single node, slot, and core within the
* entire jobspec.
*/
static int recursive_parse_jobspec_resources (struct jobspec *job,
json_t *curr_resource,
json_error_t *error)
{
struct res_level res;

if (curr_resource == NULL) {
set_error (error, "jobspec top-level resources empty");
return -1;
}

// Set slots_per_node to -1 ahead of time, if the recursive descent
// encounters node->...->slot, it will overwrite this value
// Set node-related values to -1 ahead of time, if the recursive descent
// encounters node in the jobspec, it will overwrite these values
job->slots_per_node = -1;
job->node_count = -1;

int rc = recursive_parse_helper (job, curr_resource, error, &res, 0, 1, -1);
int rc = recursive_parse_helper (job, curr_resource, error, 0, 1);

if ((rc == 0) && (job->cores_per_slot < 1)) {
set_error (error, "Missing core resource");
Expand All @@ -193,7 +216,6 @@ struct jobspec *jobspec_parse (const char *jobspec, json_error_t *error)
int version;
json_t *tasks;
json_t *resources;
struct res_level res[3];

if (!(job = calloc (1, sizeof (*job)))) {
set_error (error, "Out of memory");
Expand Down Expand Up @@ -242,41 +264,11 @@ struct jobspec *jobspec_parse (const char *jobspec, json_error_t *error)
goto error;
}

/* For jobspec version 1, expect either:
* - node->slot->core->NIL
* - slot->core->NIL
* Set job->slot_count and job->cores_per_slot.
*/
memset (res, 0, sizeof (res));
if (parse_res_level (json_array_get(resources, 0), 0, &res[0], error) < 0)
goto error;
if (res[0].with &&
(parse_res_level (json_array_get(res[0].with, 0), 1, &res[1], error)
< 0))
goto error;
if (res[1].with &&
(parse_res_level (json_array_get(res[1].with, 0), 2, &res[2], error)
< 0))
goto error;
if (res[0].type != NULL && !strcmp (res[0].type, "slot")
&& res[1].type != NULL && !strcmp (res[1].type, "core")
&& res[1].with == NULL) {
job->slot_count = res[0].count;
job->cores_per_slot = res[1].count;
job->slots_per_node = -1; // unspecified
}
else if (res[0].type != NULL && !strcmp (res[0].type, "node")
&& res[1].type != NULL && !strcmp (res[1].type, "slot")
&& res[2].type != NULL && !strcmp (res[2].type, "core")
&& res[2].with == NULL) {
job->slot_count = res[0].count * res[1].count;
job->cores_per_slot = res[2].count;
job->slots_per_node = res[1].count;
}
else if (recursive_parse_jobspec_resources (job, resources, error) < 0) {
if (recursive_parse_jobspec_resources (job, resources, error) < 0) {
// recursive_parse_jobspec_resources calls set_error
goto error;
}

/* Set job->task_count
*/
if (json_unpack_ex (tasks, NULL, 0,
Expand Down
1 change: 1 addition & 0 deletions src/shell/jobspec.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ struct jobspec {
int slot_count; // number of task slots
int cores_per_slot; // number of cores per task slot
int slots_per_node; // number of slots per node (-1=unspecified)
int node_count; // number of nodes (-1=unspecified)
json_t *command;
const char *cwd;
json_t *environment;
Expand Down
49 changes: 46 additions & 3 deletions src/shell/test/jobspec.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct output {

struct input good_input[] = {
{
"flux jobspec srun hostname",
"flux jobspec srun hostname (slot->core)",
"{\"tasks\": [{\"slot\": \"task\", \"count\": {\"per_slot\": 1}, \"command\": [\"hostname\"], \"attributes\": {}}], \"attributes\": {\"system\": {\"cwd\": \"/home/garlick/proj/flux-core/src/cmd\"}}, \"version\": 1, \"resources\": [{\"count\": 1, \"with\": [{\"count\": 1, \"type\": \"core\"}], \"type\": \"slot\", \"label\": \"task\"}]}",
},
{
Expand All @@ -58,6 +58,18 @@ struct input good_input[] = {
"node->socket->slot->(core[2]->PU,gpu)",
"{\"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"socket\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"task\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 2, \"with\": [{\"type\": \"PU\", \"count\": 1}]}, {\"type\": \"gpu\", \"count\": 1}]}]}]}], \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"task\", \"count\": {\"per_slot\": 1}}], \"attributes\": {\"system\": {\"duration\": 0, \"cwd\": \"/usr/libexec/flux\", \"environment\": {}}}, \"version\": 1}",
},
{
"node[2]->(storage,slot[3]->core[5])",
"{\"resources\": [{\"type\": \"node\", \"count\": 2, \"with\": [{\"type\": \"storage\", \"count\": 1}, {\"type\": \"slot\", \"label\": \"task\", \"count\": 3, \"with\": [{\"type\": \"core\", \"count\": 5}]}]}], \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"task\", \"count\": {\"per_slot\": 1}}], \"attributes\": {\"system\": {\"duration\": 0, \"cwd\": \"/usr/libexec/flux\", \"environment\": {}}}, \"version\": 1}",
},
{
"(storage,node->slot->core)",
"{\"version\": 1, \"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}]}, {\"type\": \"storage\", \"count\": 1562, \"exclusive\": true}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{
"cluster->(storage,node->slot->core)",
"{\"version\": 1, \"resources\": [{\"type\": \"cluster\", \"count\": 1, \"with\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}]}, {\"type\": \"storage\", \"count\": 1562, \"exclusive\": true}]}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{ NULL, NULL },
};
struct output good_output[] =
Expand All @@ -69,6 +81,9 @@ struct output good_output[] =
{1, 1, 2, 1},
{1, 1, 1, 1},
{1, 1, 2, 1},
{6, 6, 5, 3},
{1, 1, 1, 1},
{1, 1, 1, 1},
{0, 0, 0, 0},
};
struct input bad_input[] = {
Expand Down Expand Up @@ -121,13 +136,41 @@ struct input bad_input[] = {
"{\"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"task\", \"count\": 1}]}]}], \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"task\", \"count\": {\"per_slot\": 1}}], \"attributes\": {\"system\": {\"duration\": 0, \"cwd\": \"/usr/libexec/flux\", \"environment\": {}}}, \"version\": 1}",
},
{
"node->(storage,slot->core)",
"{\"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"storage\", \"count\": 1}, {\"type\": \"slot\", \"label\": \"task\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}]}], \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"task\", \"count\": {\"per_slot\": 1}}], \"attributes\": {\"system\": {\"duration\": 0, \"cwd\": \"/usr/libexec/flux\", \"environment\": {}}}, \"version\": 1}",
"node->(storage,slot->PU)",
"{\"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"storage\", \"count\": 1}, {\"type\": \"slot\", \"label\": \"task\", \"count\": 1, \"with\": [{\"type\": \"PU\", \"count\": 1}]}]}], \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"task\", \"count\": {\"per_slot\": 1}}], \"attributes\": {\"system\": {\"duration\": 0, \"cwd\": \"/usr/libexec/flux\", \"environment\": {}}}, \"version\": 1}",
},
{
"node->slot->(PU,gpu)",
"{\"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"task\", \"count\": 1, \"with\": [{\"type\": \"PU\", \"count\": 1}, {\"type\": \"gpu\", \"count\": 1}]}]}], \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"task\", \"count\": {\"per_slot\": 1}}], \"attributes\": {\"system\": {\"duration\": 0, \"cwd\": \"/usr/libexec/flux\", \"environment\": {}}}, \"version\": 1}",
},
{
"node->(storage->core,slot->PU)",
"{\"version\": 1, \"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"storage\", \"count\": 1562, \"exclusive\": true, \"with\": [{\"type\": \"core\", \"count\": 1}]}, {\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"PU\", \"count\": 1}]}]}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{
"node->(slot,core)",
"{\"version\": 1, \"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"default\", \"count\": 1}, {\"type\": \"core\", \"count\": 1}]}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{
"(node,slot->core)",
"{\"version\": 1, \"resources\": [{\"type\": \"node\", \"count\": 1}, {\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{
"cluster->(node->slot->core,node)",
"{\"version\": 1, \"resources\": [{\"type\": \"cluster\", \"count\": 1, \"with\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}]}, {\"type\": \"node\", \"count\": 1}]}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{
"cluster->(slot->core,node)",
"{\"version\": 1, \"resources\": [{\"type\": \"cluster\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}, {\"type\": \"node\", \"count\": 1}]}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{
"cluster->(slot->core,slot->core)",
"{\"version\": 1, \"resources\": [{\"type\": \"cluster\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}, {\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}]}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{
"node->(slot->core,storage->core)",
"{\"version\": 1, \"resources\": [{\"type\": \"node\", \"count\": 1, \"with\": [{\"type\": \"slot\", \"label\": \"default\", \"count\": 1, \"with\": [{\"type\": \"core\", \"count\": 1}]}, {\"type\": \"storage\", \"count\": 1562, \"exclusive\": true, \"with\": [{\"type\": \"core\", \"count\": 1}]}]}], \"attributes\": {\"system\": {\"duration\": 57600}}, \"tasks\": [{\"command\": [\"hostname\"], \"slot\": \"default\", \"count\": {\"per_slot\": 1}}]}",
},
{ NULL, NULL },
};

Expand Down

0 comments on commit fde68d3

Please sign in to comment.