Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource manager support for Flux #798

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/rm_enumerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
ENUMITEM(SLURM, "SchedMD SLURM") \
ENUMITEM(LSF, "IBM Spectrum LSF") \
ENUMITEM(LSF_CSM, "IBM Spectrum LSF with Cluster System Management") \
ENUMITEM(FLUX, "Flux") \

#ifdef __cplusplus
extern "C" {
Expand Down
209 changes: 209 additions & 0 deletions util/unifyfs/src/unifyfs-rm.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,102 @@ static int slurm_read_resource(unifyfs_resource_t* resource)
return ret;
}

/**
*
* @brief Get list of nodes using Flux resource
*
* @param resource The job resource record to be filled
*
* @return 0 on success, negative errno otherwise
*/
static int flux_read_resource(unifyfs_resource_t* resource)
{
size_t n_nodes = 0;
char num_nodes_str[10] = {0};
char nodelist_str[1024] = {0};
char* ret = NULL;
FILE* pipe_fp = NULL;
char** nodes = NULL;
int node_idx = 0;

// get num nodes using flux resource command
pipe_fp = popen("flux resource list --states=free -no {nnodes}", "r");
ret = fgets(num_nodes_str, sizeof(num_nodes_str), pipe_fp);
if (ret == NULL) {
pclose(pipe_fp);
return -EINVAL;
}
n_nodes = (size_t) strtoul(num_nodes_str, NULL, sizeof(num_nodes_str));
pclose(pipe_fp);

nodes = calloc(sizeof(char*), n_nodes);
if (nodes == NULL) {
return -ENOMEM;
}

// get node list using flux resource command
// the returned list is in a condensed format
// e.g., tioga[18-19,21,32]
// TODO: is it safe to assume flux resource only
// returns a single line?
pipe_fp = popen("flux resource list --states=free -no {nodelist}", "r");
ret = fgets(nodelist_str, sizeof(nodelist_str), pipe_fp);
if (ret == NULL) {
pclose(pipe_fp);
return -EINVAL;
}
pclose(pipe_fp);

// remove the trailing ']'
nodelist_str[strlen(nodelist_str)-1] = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the trailing ']' still here when only allocating one node?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's missing, actually:

>>: flux alloc -q pdebug -N 1
>>: flux resource list --states=free -no '{nodelist}'
tioga20

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to avoid parsing directly, another option is to rely on /bin/hostlist.

>>: /bin/hostlist -e 'tioga[18-19,21,32]'
tioga18,tioga19,tioga21,tioga32
>>:  /bin/hostlist -e 'tioga20'
tioga20

Though I think that command might just be available on LLNL systems and is not distributed with flux.

I think I've seen that flux has some python packages that parse hostlists, too. I can dig that up if you're interested.

I'm also fine with parsing the hostlist directly.

Copy link
Collaborator Author

@wangvsa wangvsa Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out the last character of the buffer returned from fgets is either \n or EOF. So my code actually removes that instead of ], and it just happens that the rest of the code still works fine even with ] in the buffer. Anyway, I have fixed this and now I will remove the last two characters all together.

// get the node ids, i.e., the list in []
char* node_ids = strstr(nodelist_str, "[");
if (node_ids) {
char* host = calloc(1, (node_ids-nodelist_str)+2);
strncpy(host, nodelist_str, (node_ids-nodelist_str));

// separate by ","
char* end_str;
char* token = strtok_r(node_ids+1, ",", &end_str);
while (token) {
// case 1: contiguous node range
// e.g., 3-5, lo=3, hi=5
// case 2: a single node, then lo=hi
int lo, hi;
if (strstr(token, "-")) {
char* end_str2;
char* lo_str = strtok_r(token, "-", &end_str2);
char* hi_str = strtok_r(NULL, "-", &end_str2);
lo = atoi(lo_str);
hi = atoi(hi_str);
} else {
lo = atoi(token);
hi = lo;
}

for (int i = lo; i <= hi; i++) {
char nodename[30] = {0};
sprintf(nodename, "%s%d", host, i);
if (node_idx >= n_nodes) {
return -EINVAL;
}
nodes[node_idx++] = strdup(nodename);
}
token = strtok_r(NULL, ",", &end_str);
}
} else {
// no '[' in the string, meaning it has a single node
if (n_nodes != 1) {
return -EINVAL;
}
nodes[0] = strdup(nodelist_str);
wangvsa marked this conversation as resolved.
Show resolved Hide resolved
}

resource->n_nodes = n_nodes;
resource->nodes = nodes;
return 0;
}

// construct_server_argv():
// This function is called in two ways.
// Call it once with server_argv==NULL and it
Expand Down Expand Up @@ -1143,6 +1239,107 @@ static int srun_stage(unifyfs_resource_t* resource,
return -errno;
}

/**
* @brief Launch servers using Flux
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_launch(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t argc, flux_argc, server_argc;
char** argv = NULL;
char n_nodes[16];
char n_tasks[16];
char n_cores[8];

snprintf(n_nodes, sizeof(n_nodes), "-N%zu", resource->n_nodes);
// without -n, --ntasks, Flux will schedule the server job
// to use all nodes exclusively
snprintf(n_tasks, sizeof(n_tasks), "-n%zu", resource->n_nodes);
snprintf(n_cores, sizeof(n_cores), "-c%d", resource->n_cores_per_server);

// full command: srun <srun args> <server args>
flux_argc = 5;
server_argc = construct_server_argv(args, NULL);

// setup full command argv
argc = 1 + flux_argc + server_argc;
argv = calloc(argc, sizeof(char*));
argv[0] = strdup("flux");
argv[1] = strdup("run");
argv[2] = strdup(n_nodes);
argv[3] = strdup(n_tasks);
argv[4] = strdup(n_cores);
construct_server_argv(args, argv + flux_argc);

if (args->debug) {
for (int i = 0; i < (argc - 1); i++) {
fprintf(stdout, "UNIFYFS LAUNCH DEBUG: flux argv[%d] = %s\n",
i, argv[i]);
fflush(stdout);
}
}

execvp(argv[0], argv);
perror("failed to execvp() flux to launch unifyfsd");
return -errno;
}

/**
* @brief Terminate servers using Flux
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_terminate(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t argc, flux_argc;
char** argv = NULL;

// full command: flux <flux args> pkill name:unifyfsd
flux_argc = 3;
argc = 1 + flux_argc;
argv = calloc(argc, sizeof(char*));
argv[0] = strdup("flux");
argv[1] = strdup("pkill");
argv[2] = strdup("name:unifyfsd");

execvp(argv[0], argv);
perror("failed to execvp() flux to pkill unifyfsd");
return -errno;
}

/**
* @brief Launch unifyfs-stage using flux run
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_stage(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t flux_argc = 5;
char cmd[200];

// full command: flux run <flux args> <server args>
snprintf(cmd, sizeof(cmd), "flux run -N%zu -n%zu -c1",
resource->n_nodes, resource->n_nodes);

generic_stage(cmd, flux_argc, args);

perror("failed to execvp() flux to launch unifyfsd");
return -errno;
}

/**
* @brief Launch servers using custom script
*
Expand Down Expand Up @@ -1249,6 +1446,13 @@ static _ucr_resource_manager_t resource_managers[] = {
.terminate = &jsrun_terminate,
.stage = &jsrun_stage,
},
{
.type = "flux",
.read_resource = &flux_read_resource,
.launch = &flux_launch,
.terminate = &flux_terminate,
.stage = &flux_stage,
},
};

int unifyfs_detect_resources(unifyfs_resource_t* resource)
Expand All @@ -1257,6 +1461,11 @@ int unifyfs_detect_resources(unifyfs_resource_t* resource)
resource->rm = UNIFYFS_RM_PBS;
} else if (getenv("SLURM_JOBID") != NULL) {
resource->rm = UNIFYFS_RM_SLURM;
} else if (getenv("FLUX_EXEC_PATH") != NULL) {
// TODO: need to use a better environment
// variable or maybe a better way to decide
// whether to use flux scheduler.
resource->rm = UNIFYFS_RM_FLUX;
} else if (getenv("LSB_JOBID") != NULL) {
if (getenv("CSM_ALLOCATION_ID") != NULL) {
resource->rm = UNIFYFS_RM_LSF_CSM;
Expand Down