Skip to content

Commit

Permalink
Add resource manager support for flux
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Wang <wangvsa@gmail.com>
  • Loading branch information
wangvsa committed Sep 19, 2023
1 parent 087bf1b commit 4324449
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 0 deletions.
1 change: 1 addition & 0 deletions common/src/rm_enumerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
ENUMITEM(SLURM, "SchedMD SLURM") \
ENUMITEM(LSF, "IBM Spectrum LSF") \
ENUMITEM(LSF_CSM, "IBM Spectrum LSF with Cluster System Management") \
ENUMITEM(FLUX, "Flux") \

#ifdef __cplusplus
extern "C" {
Expand Down
200 changes: 200 additions & 0 deletions util/unifyfs/src/unifyfs-rm.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,93 @@ static int slurm_read_resource(unifyfs_resource_t* resource)
return ret;
}

/**
*
* @brief Get list of nodes using Flux resource
*
* @param resource The job resource record to be filled
*
* @return 0 on success, negative errno otherwise
*/
static int flux_read_resource(unifyfs_resource_t* resource)
{
size_t n_nodes = 0;
char num_nodes_str[10] = {0};
char nodelist_str[1024] = {0};
FILE* pipe_fp = NULL;
char** nodes = NULL;
int node_idx = 0;

// get num nodes using flux resource command
pipe_fp = popen("flux resource list --states=free -no {nnodes}", "r");
fgets(num_nodes_str, sizeof(num_nodes_str), pipe_fp);
n_nodes = (size_t) strtoul(num_nodes_str, NULL, sizeof(num_nodes_str));
pclose(pipe_fp);

nodes = calloc(sizeof(char*), n_nodes);
if (nodes == NULL) {
return -ENOMEM;
}

// get node list using flux resource command
// the returned list is in a condensed format
// e.g., tioga[18-19,21,32]
// TODO: is it safe to assume flux resource only
// returns a single line?
pipe_fp = popen("flux resource list --states=free -no {nodelist}", "r");
fgets(nodelist_str, sizeof(nodelist_str), pipe_fp);
pclose(pipe_fp);

// remove the trailing ']'
nodelist_str[strlen(nodelist_str)-1] = 0;
// get the node ids, i.e., the list in []
char* node_ids = strstr(nodelist_str, "[");
if (node_ids) {
char* host = calloc(1, (node_ids-nodelist_str)+2);
strncpy(host, nodelist_str, (node_ids-nodelist_str));

// separate by ","
char* end_str;
char* token = strtok_r(node_ids+1, ",", &end_str);
while (token) {
// case 1: contiguous node range
// e.g., 3-5, lo=3, hi=5
// case 2: a single node, then lo=hi
int lo, hi;
if (strstr(token, "-")) {
char* end_str2;
char* lo_str = strtok_r(token, "-", &end_str2);
char* hi_str = strtok_r(NULL, "-", &end_str2);
lo = atoi(lo_str);
hi = atoi(hi_str);
} else {
lo = atoi(token);
hi = lo;
}

for (int i = lo; i <= hi; i++) {
char nodename[30] = {0};
sprintf(nodename, "%s%d", host, i);
if (node_idx >= n_nodes) {
return -EINVAL;
}
nodes[node_idx++] = strdup(nodename);
}
token = strtok_r(NULL, ",", &end_str);
}
} else {
// no '[' in the string, meaning it has a single node
if (n_nodes != 1) {
return -EINVAL;
}
nodes[0] = strdup(nodelist_str);
}

resource->n_nodes = n_nodes;
resource->nodes = nodes;
return 0;
}

// construct_server_argv():
// This function is called in two ways.
// Call it once with server_argv==NULL and it
Expand Down Expand Up @@ -1143,6 +1230,107 @@ static int srun_stage(unifyfs_resource_t* resource,
return -errno;
}

/**
* @brief Launch servers using Flux
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_launch(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t argc, flux_argc, server_argc;
char** argv = NULL;
char n_nodes[16];
char n_tasks[16];
char n_cores[8];

snprintf(n_nodes, sizeof(n_nodes), "-N%zu", resource->n_nodes);
// without -n, --ntasks, Flux will schedule the server job
// to use all nodes exclusively
snprintf(n_tasks, sizeof(n_tasks), "-n%zu", resource->n_nodes);
snprintf(n_cores, sizeof(n_cores), "-c%d", resource->n_cores_per_server);

// full command: srun <srun args> <server args>
flux_argc = 5;
server_argc = construct_server_argv(args, NULL);

// setup full command argv
argc = 1 + flux_argc + server_argc;
argv = calloc(argc, sizeof(char*));
argv[0] = strdup("flux");
argv[1] = strdup("run");
argv[2] = strdup(n_nodes);
argv[3] = strdup(n_tasks);
argv[4] = strdup(n_cores);
construct_server_argv(args, argv + flux_argc);

if (args->debug) {
for (int i = 0; i < (argc - 1); i++) {
fprintf(stdout, "UNIFYFS LAUNCH DEBUG: flux argv[%d] = %s\n",
i, argv[i]);
fflush(stdout);
}
}

execvp(argv[0], argv);
perror("failed to execvp() flux to launch unifyfsd");
return -errno;
}

/**
* @brief Terminate servers using Flux
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_terminate(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t argc, flux_argc;
char** argv = NULL;

// full command: flux <flux args> pkill name:unifyfsd
flux_argc = 3;
argc = 1 + flux_argc;
argv = calloc(argc, sizeof(char*));
argv[0] = strdup("flux");
argv[1] = strdup("pkill");
argv[2] = strdup("name:unifyfsd");

execvp(argv[0], argv);
perror("failed to execvp() flux to pkill unifyfsd");
return -errno;
}

/**
* @brief Launch unifyfs-stage using flux run
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_stage(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t flux_argc = 5;
char cmd[200];

// full command: flux run <flux args> <server args>
snprintf(cmd, sizeof(cmd), "flux run -N%zu -n%zu -c1",
resource->n_nodes, resource->n_nodes);

generic_stage(cmd, flux_argc, args);

perror("failed to execvp() flux to launch unifyfsd");
return -errno;
}

/**
* @brief Launch servers using custom script
*
Expand Down Expand Up @@ -1249,6 +1437,13 @@ static _ucr_resource_manager_t resource_managers[] = {
.terminate = &jsrun_terminate,
.stage = &jsrun_stage,
},
{
.type = "flux",
.read_resource = &flux_read_resource,
.launch = &flux_launch,
.terminate = &flux_terminate,
.stage = &flux_stage,
},
};

int unifyfs_detect_resources(unifyfs_resource_t* resource)
Expand All @@ -1257,6 +1452,11 @@ int unifyfs_detect_resources(unifyfs_resource_t* resource)
resource->rm = UNIFYFS_RM_PBS;
} else if (getenv("SLURM_JOBID") != NULL) {
resource->rm = UNIFYFS_RM_SLURM;
} else if (getenv("FLUX_EXEC_PATH") != NULL) {
// TODO: need to use a better environment
// variable or maybe a better way to decide
// whether to use flux scheduler.
resource->rm = UNIFYFS_RM_FLUX;
} else if (getenv("LSB_JOBID") != NULL) {
if (getenv("CSM_ALLOCATION_ID") != NULL) {
resource->rm = UNIFYFS_RM_LSF_CSM;
Expand Down

0 comments on commit 4324449

Please sign in to comment.