Skip to content

Commit

Permalink
Merge pull request #5213 from chu11/issue2864_idsync_state
Browse files Browse the repository at this point in the history
job-list: allow list-id to wait for job state
  • Loading branch information
mergify[bot] committed Jun 6, 2023
2 parents 7693cd7 + 75a64be commit 80c8a69
Show file tree
Hide file tree
Showing 11 changed files with 454 additions and 307 deletions.
31 changes: 28 additions & 3 deletions src/cmd/flux-job.c
Expand Up @@ -145,6 +145,13 @@ static struct optparse_option list_inactive_opts[] = {
OPTPARSE_TABLE_END
};

static struct optparse_option list_ids_opts[] = {
{ .name = "wait-state", .key = 'W', .has_arg = 1, .arginfo = "STATE",
.usage = "Return only after jobid has reached specified state",
},
OPTPARSE_TABLE_END
};

static struct optparse_option urgency_opts[] = {
{ .name = "verbose", .key = 'v', .has_arg = 0,
.usage = "Output old urgency value on success",
Expand Down Expand Up @@ -442,7 +449,7 @@ static struct optparse_subcommand subcommands[] = {
"List job(s) by id",
cmd_list_ids,
OPTPARSE_SUBCMD_HIDDEN,
NULL,
list_ids_opts,
},
{ "urgency",
"[OPTIONS] id urgency",
Expand Down Expand Up @@ -1380,6 +1387,8 @@ int cmd_list_ids (optparse_t *p, int argc, char **argv)
int optindex = optparse_option_index (p);
flux_t *h;
int i, ids_len;
flux_job_state_t state;
const char *state_str;

if (isatty (STDOUT_FILENO)) {
fprintf (stderr,
Expand All @@ -1394,12 +1403,28 @@ int cmd_list_ids (optparse_t *p, int argc, char **argv)
if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");

/* if no job state specified by user, pick first job state of
* depend, which means will return as soon as the job-list module
* is aware of the job
*/
state_str = optparse_get_str (p, "wait-state", "depend");
if (flux_job_strtostate (state_str, &state) < 0)
log_msg_exit ("invalid job state specified");

ids_len = argc - optindex;
for (i = 0; i < ids_len; i++) {
flux_jobid_t id = parse_jobid (argv[optindex + i]);
flux_future_t *f;
if (!(f = flux_job_list_id (h, id, "[\"all\"]")))
log_err_exit ("flux_job_list_id");
if (!(f = flux_rpc_pack (h,
"job-list.list-id",
FLUX_NODEID_ANY,
0,
"{s:I s:[s] s:i}",
"id", id,
"attrs",
"all",
"state", state)))
log_err_exit ("flux_rpc_pack");
if (flux_future_then (f, -1, list_id_continuation, NULL) < 0)
log_err_exit ("flux_future_then");
}
Expand Down
76 changes: 55 additions & 21 deletions src/modules/job-list/idsync.c
Expand Up @@ -22,6 +22,12 @@
#include "idsync.h"
#include "job_util.h"

/* Used in waits hash, need to store job id within data structure for lookup */
struct idsync_wait_list {
zlistx_t *l;
flux_jobid_t id;
};

void idsync_data_destroy (void *data)
{
if (data) {
Expand All @@ -48,6 +54,7 @@ static struct idsync_data *idsync_data_create (flux_t *h,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs,
flux_job_state_t state,
flux_future_t *f_lookup)
{
struct idsync_data *isd = NULL;
Expand All @@ -60,6 +67,7 @@ static struct idsync_data *idsync_data_create (flux_t *h,
if (!(isd->msg = flux_msg_copy (msg, false)))
goto error;
isd->attrs = json_incref (attrs);
isd->state = state;
isd->f_lookup = f_lookup;
return isd;

Expand All @@ -70,10 +78,15 @@ static struct idsync_data *idsync_data_create (flux_t *h,
return NULL;
}

static void idsync_waits_list_destroy (void **data)
static void idsync_wait_list_destroy (void **data)
{
if (data)
zlistx_destroy ((zlistx_t **) data);
if (data) {
struct idsync_wait_list *iwl = *data;
if (iwl) {
zlistx_destroy (&iwl->l);
free (iwl);
}
}
}

struct idsync_ctx *idsync_ctx_create (flux_t *h)
Expand All @@ -93,7 +106,7 @@ struct idsync_ctx *idsync_ctx_create (flux_t *h)
if (!(isctx->waits = job_hash_create ()))
goto error;

zhashx_set_destructor (isctx->waits, idsync_waits_list_destroy);
zhashx_set_destructor (isctx->waits, idsync_wait_list_destroy);

return isctx;

Expand Down Expand Up @@ -126,7 +139,8 @@ void idsync_ctx_destroy (struct idsync_ctx *isctx)
struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs)
json_t *attrs,
flux_job_state_t state)
{
flux_future_t *f = NULL;
struct idsync_data *isd = NULL;
Expand All @@ -142,7 +156,7 @@ struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
goto error;
}

if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, f)))
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, state, f)))
goto error;

/* future now owned by struct idsync_data */
Expand Down Expand Up @@ -174,26 +188,31 @@ void idsync_check_id_valid_cleanup (struct idsync_ctx *isctx,
static int idsync_add_waiter (struct idsync_ctx *isctx,
struct idsync_data *isd)
{
zlistx_t *list_isd;
struct idsync_wait_list *iwl = NULL;

/* isctx->waits holds lists of ids waiting on, b/c multiple callers
* could wait on same id */
if (!(list_isd = zhashx_lookup (isctx->waits, &isd->id))) {
if (!(list_isd = zlistx_new ()))
if (!(iwl = zhashx_lookup (isctx->waits, &isd->id))) {
iwl = calloc (1, sizeof (*iwl));
if (!iwl)
goto enomem;
zlistx_set_destructor (list_isd, idsync_data_destroy_wrapper);

if (zhashx_insert (isctx->waits, &isd->id, list_isd) < 0)
if (!(iwl->l = zlistx_new ()))
goto enomem;
zlistx_set_destructor (iwl->l, idsync_data_destroy_wrapper);
iwl->id = isd->id;

if (zhashx_insert (isctx->waits, &iwl->id, iwl) < 0)
goto enomem;
}

if (!zlistx_add_end (list_isd, isd))
if (!zlistx_add_end (iwl->l, isd))
goto enomem;

return 0;

enomem:
idsync_data_destroy (isd);
idsync_wait_list_destroy ((void **)&iwl);
errno = ENOMEM;
return -1;
}
Expand All @@ -215,11 +234,12 @@ int idsync_wait_valid (struct idsync_ctx *isctx, struct idsync_data *isd)
int idsync_wait_valid_id (struct idsync_ctx *isctx,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs)
json_t *attrs,
flux_job_state_t state)
{
struct idsync_data *isd = NULL;

if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, NULL)))
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, state, NULL)))
return -1;

return idsync_add_waiter (isctx, isd);
Expand Down Expand Up @@ -248,16 +268,30 @@ static void idsync_data_respond (struct idsync_ctx *isctx,

void idsync_check_waiting_id (struct idsync_ctx *isctx, struct job *job)
{
zlistx_t *list_isd;
struct idsync_wait_list *iwl;

if ((list_isd = zhashx_lookup (isctx->waits, &job->id))) {
if ((iwl = zhashx_lookup (isctx->waits, &job->id))) {
struct idsync_data *isd;
isd = zlistx_first (list_isd);
isd = zlistx_first (iwl->l);
while (isd) {
idsync_data_respond (isctx, isd, job);
isd = zlistx_next (list_isd);
/* Some job states can be missed. For example a job that
* is canceled before it runs will never reach the
* FLUX_JOB_STATE_RUN state. To ensure jobs waiting on
* states that are missed will eventually get a response, always
* respond once the job has reached the inactive state.
*/
if (!isd->state
|| (isd->state & job->states_mask)
|| (isd->state && job->state == FLUX_JOB_STATE_INACTIVE)) {
struct idsync_data *tmp;
idsync_data_respond (isctx, isd, job);
tmp = zlistx_detach_cur (iwl->l);
idsync_data_destroy (tmp);
}
isd = zlistx_next (iwl->l);
}
zhashx_delete (isctx->waits, &job->id);
if (!zlistx_size (iwl->l))
zhashx_delete (isctx->waits, &job->id);
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/modules/job-list/idsync.h
Expand Up @@ -28,6 +28,7 @@ struct idsync_data {
flux_jobid_t id;
flux_msg_t *msg;
json_t *attrs;
flux_job_state_t state;

flux_future_t *f_lookup;
};
Expand All @@ -45,7 +46,8 @@ void idsync_data_destroy (void *data);
struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs);
json_t *attrs,
flux_job_state_t state);


/* free / cleanup 'struct idsync_data' after
Expand All @@ -65,7 +67,8 @@ int idsync_wait_valid (struct idsync_ctx *isctx, struct idsync_data *isd);
int idsync_wait_valid_id (struct idsync_ctx *isctx,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs);
json_t *attrs,
flux_job_state_t state);

/* check if 'job' is in waits list, if so respond to original
* message */
Expand Down
3 changes: 2 additions & 1 deletion src/modules/job-list/job_state.c
Expand Up @@ -267,6 +267,8 @@ static void update_job_state_and_list (struct job_state_ctx *jsctx,
zlistx_reorder (jsctx->pending,
job->list_handle,
search_direction (job));

idsync_check_waiting_id (jsctx->isctx, job);
}

static void state_depend_lookup_continuation (flux_future_t *f, void *arg)
Expand All @@ -291,7 +293,6 @@ static void state_depend_lookup_continuation (flux_future_t *f, void *arg)
st = zlist_head (job->next_states);
assert (st);
update_job_state_and_list (jsctx, job, st->state, st->timestamp);
idsync_check_waiting_id (jsctx->isctx, job);
zlist_remove (job->next_states, st);
process_next_state (jsctx, job);

Expand Down
35 changes: 27 additions & 8 deletions src/modules/job-list/list.c
Expand Up @@ -32,6 +32,7 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
const flux_msg_t *msg,
flux_jobid_t id,
json_t *attrs,
flux_job_state_t state,
bool *stall);

/* Filter test to determine if job desired by caller */
Expand Down Expand Up @@ -294,7 +295,7 @@ void check_id_valid_continuation (flux_future_t *f, void *arg)
else {
json_t *o;
if (!(o = get_job_by_id (jsctx, NULL, isd->msg,
isd->id, isd->attrs, NULL))) {
isd->id, isd->attrs, isd->state, NULL))) {
flux_log_error (jsctx->h, "%s: get_job_by_id", __FUNCTION__);
goto cleanup;
}
Expand All @@ -314,14 +315,16 @@ void check_id_valid_continuation (flux_future_t *f, void *arg)
int check_id_valid (struct job_state_ctx *jsctx,
const flux_msg_t *msg,
flux_jobid_t id,
json_t *attrs)
json_t *attrs,
flux_job_state_t state)
{
struct idsync_data *isd = NULL;

if (!(isd = idsync_check_id_valid (jsctx->isctx,
id,
msg,
attrs))
attrs,
state))
|| flux_future_aux_set (isd->f_lookup,
"job_state_ctx",
jsctx,
Expand Down Expand Up @@ -349,13 +352,14 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
const flux_msg_t *msg,
flux_jobid_t id,
json_t *attrs,
flux_job_state_t state,
bool *stall)
{
struct job *job;

if (!(job = zhashx_lookup (jsctx->index, &id))) {
if (stall) {
if (check_id_valid (jsctx, msg, id, attrs) < 0) {
if (check_id_valid (jsctx, msg, id, attrs, state) < 0) {
flux_log_error (jsctx->h, "%s: check_id_valid", __FUNCTION__);
return NULL;
}
Expand All @@ -367,7 +371,7 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
if (job->state == FLUX_JOB_STATE_NEW) {
if (stall) {
/* Must wait for job-list to see state change */
if (idsync_wait_valid_id (jsctx->isctx, id, msg, attrs) < 0) {
if (idsync_wait_valid_id (jsctx->isctx, id, msg, attrs, state) < 0) {
flux_log_error (jsctx->h, "%s: idsync_wait_valid_id",
__FUNCTION__);
return NULL;
Expand All @@ -388,11 +392,14 @@ void list_id_cb (flux_t *h, flux_msg_handler_t *mh,
json_t *job;
flux_jobid_t id;
json_t *attrs;
int state = 0;
int valid_states = FLUX_JOB_STATE_ACTIVE | FLUX_JOB_STATE_INACTIVE;
bool stall = false;

if (flux_request_unpack (msg, NULL, "{s:I s:o}",
if (flux_request_unpack (msg, NULL, "{s:I s:o s?i}",
"id", &id,
"attrs", &attrs) < 0) {
"attrs", &attrs,
"state", &state) < 0) {
seterror (&err, "invalid payload: %s", flux_msg_last_error (msg));
errno = EPROTO;
goto error;
Expand All @@ -404,7 +411,19 @@ void list_id_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (!(job = get_job_by_id (ctx->jsctx, &err, msg, id, attrs, &stall))) {
if (state && (state & ~valid_states)) {
seterror (&err, "invalid payload: invalid state specified");
errno = EPROTO;
goto error;
}

if (!(job = get_job_by_id (ctx->jsctx,
&err,
msg,
id,
attrs,
state,
&stall))) {
/* response handled after KVS lookup complete */
if (stall)
goto stall;
Expand Down

0 comments on commit 80c8a69

Please sign in to comment.