Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job-list: allow list-id to wait for job state #5213

Merged
merged 8 commits into from Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 28 additions & 3 deletions src/cmd/flux-job.c
Expand Up @@ -145,6 +145,13 @@ static struct optparse_option list_inactive_opts[] = {
OPTPARSE_TABLE_END
};

static struct optparse_option list_ids_opts[] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message:

However, the flux job list-ids does not support anything to use it.

Suggestion: "the flux job list-ids command does not support any way to use it" ?

{ .name = "wait-state", .key = 'W', .has_arg = 1, .arginfo = "STATE",
.usage = "Return only after jobid has reached specified state",
},
OPTPARSE_TABLE_END
};

static struct optparse_option urgency_opts[] = {
{ .name = "verbose", .key = 'v', .has_arg = 0,
.usage = "Output old urgency value on success",
Expand Down Expand Up @@ -442,7 +449,7 @@ static struct optparse_subcommand subcommands[] = {
"List job(s) by id",
cmd_list_ids,
OPTPARSE_SUBCMD_HIDDEN,
NULL,
list_ids_opts,
},
{ "urgency",
"[OPTIONS] id urgency",
Expand Down Expand Up @@ -1380,6 +1387,8 @@ int cmd_list_ids (optparse_t *p, int argc, char **argv)
int optindex = optparse_option_index (p);
flux_t *h;
int i, ids_len;
flux_job_state_t state;
const char *state_str;

if (isatty (STDOUT_FILENO)) {
fprintf (stderr,
Expand All @@ -1394,12 +1403,28 @@ int cmd_list_ids (optparse_t *p, int argc, char **argv)
if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");

/* if no job state specified by user, pick first job state of
* depend, which means will return as soon as the job-list module
* is aware of the job
*/
state_str = optparse_get_str (p, "wait-state", "depend");
if (flux_job_strtostate (state_str, &state) < 0)
log_msg_exit ("invalid job state specified");

ids_len = argc - optindex;
for (i = 0; i < ids_len; i++) {
flux_jobid_t id = parse_jobid (argv[optindex + i]);
flux_future_t *f;
if (!(f = flux_job_list_id (h, id, "[\"all\"]")))
log_err_exit ("flux_job_list_id");
if (!(f = flux_rpc_pack (h,
"job-list.list-id",
FLUX_NODEID_ANY,
0,
"{s:I s:[s] s:i}",
"id", id,
"attrs",
"all",
"state", state)))
log_err_exit ("flux_rpc_pack");
if (flux_future_then (f, -1, list_id_continuation, NULL) < 0)
log_err_exit ("flux_future_then");
}
Expand Down
76 changes: 55 additions & 21 deletions src/modules/job-list/idsync.c
Expand Up @@ -22,6 +22,12 @@
#include "idsync.h"
#include "job_util.h"

/* Used in waits hash, need to store job id within data structure for lookup */
struct idsync_wait_list {
zlistx_t *l;
flux_jobid_t id;
};

void idsync_data_destroy (void *data)
{
if (data) {
Expand All @@ -48,6 +54,7 @@ static struct idsync_data *idsync_data_create (flux_t *h,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs,
flux_job_state_t state,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message: it might be fine, but I was stumbling over reading of:

Solution: Support an option job state in the job-list.list-id service.

Is it supposed to be either "Support an optional job state in the..." or maybe "Support a job state option in the..."?

flux_future_t *f_lookup)
{
struct idsync_data *isd = NULL;
Expand All @@ -60,6 +67,7 @@ static struct idsync_data *idsync_data_create (flux_t *h,
if (!(isd->msg = flux_msg_copy (msg, false)))
goto error;
isd->attrs = json_incref (attrs);
isd->state = state;
isd->f_lookup = f_lookup;
return isd;

Expand All @@ -70,10 +78,15 @@ static struct idsync_data *idsync_data_create (flux_t *h,
return NULL;
}

static void idsync_waits_list_destroy (void **data)
static void idsync_wait_list_destroy (void **data)
{
if (data)
zlistx_destroy ((zlistx_t **) data);
if (data) {
struct idsync_wait_list *iwl = *data;
if (iwl) {
zlistx_destroy (&iwl->l);
free (iwl);
}
}
}

struct idsync_ctx *idsync_ctx_create (flux_t *h)
Expand All @@ -93,7 +106,7 @@ struct idsync_ctx *idsync_ctx_create (flux_t *h)
if (!(isctx->waits = job_hash_create ()))
goto error;

zhashx_set_destructor (isctx->waits, idsync_waits_list_destroy);
zhashx_set_destructor (isctx->waits, idsync_wait_list_destroy);

return isctx;

Expand Down Expand Up @@ -126,7 +139,8 @@ void idsync_ctx_destroy (struct idsync_ctx *isctx)
struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs)
json_t *attrs,
flux_job_state_t state)
{
flux_future_t *f = NULL;
struct idsync_data *isd = NULL;
Expand All @@ -142,7 +156,7 @@ struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
goto error;
}

if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, f)))
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, state, f)))
goto error;

/* future now owned by struct idsync_data */
Expand Down Expand Up @@ -174,26 +188,31 @@ void idsync_check_id_valid_cleanup (struct idsync_ctx *isctx,
static int idsync_add_waiter (struct idsync_ctx *isctx,
struct idsync_data *isd)
{
zlistx_t *list_isd;
struct idsync_wait_list *iwl = NULL;

/* isctx->waits holds lists of ids waiting on, b/c multiple callers
* could wait on same id */
if (!(list_isd = zhashx_lookup (isctx->waits, &isd->id))) {
if (!(list_isd = zlistx_new ()))
if (!(iwl = zhashx_lookup (isctx->waits, &isd->id))) {
iwl = calloc (1, sizeof (*iwl));
if (!iwl)
goto enomem;
zlistx_set_destructor (list_isd, idsync_data_destroy_wrapper);

if (zhashx_insert (isctx->waits, &isd->id, list_isd) < 0)
if (!(iwl->l = zlistx_new ()))
goto enomem;
zlistx_set_destructor (iwl->l, idsync_data_destroy_wrapper);
iwl->id = isd->id;

if (zhashx_insert (isctx->waits, &iwl->id, iwl) < 0)
goto enomem;
}

if (!zlistx_add_end (list_isd, isd))
if (!zlistx_add_end (iwl->l, isd))
goto enomem;

return 0;

enomem:
idsync_data_destroy (isd);
idsync_wait_list_destroy ((void **)&iwl);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cast to (void **) is unnecessary here I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i get a error/warning with it :-(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, ok, my bad then!

errno = ENOMEM;
return -1;
}
Expand All @@ -215,11 +234,12 @@ int idsync_wait_valid (struct idsync_ctx *isctx, struct idsync_data *isd)
int idsync_wait_valid_id (struct idsync_ctx *isctx,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs)
json_t *attrs,
flux_job_state_t state)
{
struct idsync_data *isd = NULL;

if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, NULL)))
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, state, NULL)))
return -1;

return idsync_add_waiter (isctx, isd);
Expand Down Expand Up @@ -248,16 +268,30 @@ static void idsync_data_respond (struct idsync_ctx *isctx,

void idsync_check_waiting_id (struct idsync_ctx *isctx, struct job *job)
{
zlistx_t *list_isd;
struct idsync_wait_list *iwl;

if ((list_isd = zhashx_lookup (isctx->waits, &job->id))) {
if ((iwl = zhashx_lookup (isctx->waits, &job->id))) {
struct idsync_data *isd;
isd = zlistx_first (list_isd);
isd = zlistx_first (iwl->l);
while (isd) {
idsync_data_respond (isctx, isd, job);
isd = zlistx_next (list_isd);
/* Some job states can be missed. For example a job that
* is canceled before it runs will never reach the
* FLUX_JOB_STATE_RUN state. To ensure jobs waiting on
* states that are missed will eventually get a response, always
* respond once the job has reached the inactive state.
*/
if (!isd->state
|| (isd->state & job->states_mask)
|| (isd->state && job->state == FLUX_JOB_STATE_INACTIVE)) {
struct idsync_data *tmp;
idsync_data_respond (isctx, isd, job);
tmp = zlistx_detach_cur (iwl->l);
idsync_data_destroy (tmp);
}
isd = zlistx_next (iwl->l);
}
zhashx_delete (isctx->waits, &job->id);
if (!zlistx_size (iwl->l))
zhashx_delete (isctx->waits, &job->id);
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/modules/job-list/idsync.h
Expand Up @@ -28,6 +28,7 @@ struct idsync_data {
flux_jobid_t id;
flux_msg_t *msg;
json_t *attrs;
flux_job_state_t state;

flux_future_t *f_lookup;
};
Expand All @@ -45,7 +46,8 @@ void idsync_data_destroy (void *data);
struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs);
json_t *attrs,
flux_job_state_t state);


/* free / cleanup 'struct idsync_data' after
Expand All @@ -65,7 +67,8 @@ int idsync_wait_valid (struct idsync_ctx *isctx, struct idsync_data *isd);
int idsync_wait_valid_id (struct idsync_ctx *isctx,
flux_jobid_t id,
const flux_msg_t *msg,
json_t *attrs);
json_t *attrs,
flux_job_state_t state);

/* check if 'job' is in waits list, if so respond to original
* message */
Expand Down
3 changes: 2 additions & 1 deletion src/modules/job-list/job_state.c
Expand Up @@ -267,6 +267,8 @@ static void update_job_state_and_list (struct job_state_ctx *jsctx,
zlistx_reorder (jsctx->pending,
job->list_handle,
search_direction (job));

idsync_check_waiting_id (jsctx->isctx, job);
}

static void state_depend_lookup_continuation (flux_future_t *f, void *arg)
Expand All @@ -291,7 +293,6 @@ static void state_depend_lookup_continuation (flux_future_t *f, void *arg)
st = zlist_head (job->next_states);
assert (st);
update_job_state_and_list (jsctx, job, st->state, st->timestamp);
idsync_check_waiting_id (jsctx->isctx, job);
zlist_remove (job->next_states, st);
process_next_state (jsctx, job);

Expand Down
35 changes: 27 additions & 8 deletions src/modules/job-list/list.c
Expand Up @@ -32,6 +32,7 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
const flux_msg_t *msg,
flux_jobid_t id,
json_t *attrs,
flux_job_state_t state,
bool *stall);

/* Filter test to determine if job desired by caller */
Expand Down Expand Up @@ -294,7 +295,7 @@ void check_id_valid_continuation (flux_future_t *f, void *arg)
else {
json_t *o;
if (!(o = get_job_by_id (jsctx, NULL, isd->msg,
isd->id, isd->attrs, NULL))) {
isd->id, isd->attrs, isd->state, NULL))) {
flux_log_error (jsctx->h, "%s: get_job_by_id", __FUNCTION__);
goto cleanup;
}
Expand All @@ -314,14 +315,16 @@ void check_id_valid_continuation (flux_future_t *f, void *arg)
int check_id_valid (struct job_state_ctx *jsctx,
const flux_msg_t *msg,
flux_jobid_t id,
json_t *attrs)
json_t *attrs,
flux_job_state_t state)
{
struct idsync_data *isd = NULL;

if (!(isd = idsync_check_id_valid (jsctx->isctx,
id,
msg,
attrs))
attrs,
state))
|| flux_future_aux_set (isd->f_lookup,
"job_state_ctx",
jsctx,
Expand Down Expand Up @@ -349,13 +352,14 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
const flux_msg_t *msg,
flux_jobid_t id,
json_t *attrs,
flux_job_state_t state,
bool *stall)
{
struct job *job;

if (!(job = zhashx_lookup (jsctx->index, &id))) {
if (stall) {
if (check_id_valid (jsctx, msg, id, attrs) < 0) {
if (check_id_valid (jsctx, msg, id, attrs, state) < 0) {
flux_log_error (jsctx->h, "%s: check_id_valid", __FUNCTION__);
return NULL;
}
Expand All @@ -367,7 +371,7 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
if (job->state == FLUX_JOB_STATE_NEW) {
if (stall) {
/* Must wait for job-list to see state change */
if (idsync_wait_valid_id (jsctx->isctx, id, msg, attrs) < 0) {
if (idsync_wait_valid_id (jsctx->isctx, id, msg, attrs, state) < 0) {
flux_log_error (jsctx->h, "%s: idsync_wait_valid_id",
__FUNCTION__);
return NULL;
Expand All @@ -388,11 +392,14 @@ void list_id_cb (flux_t *h, flux_msg_handler_t *mh,
json_t *job;
flux_jobid_t id;
json_t *attrs;
int state = 0;
int valid_states = FLUX_JOB_STATE_ACTIVE | FLUX_JOB_STATE_INACTIVE;
bool stall = false;

if (flux_request_unpack (msg, NULL, "{s:I s:o}",
if (flux_request_unpack (msg, NULL, "{s:I s:o s?i}",
"id", &id,
"attrs", &attrs) < 0) {
"attrs", &attrs,
"state", &state) < 0) {
seterror (&err, "invalid payload: %s", flux_msg_last_error (msg));
errno = EPROTO;
goto error;
Expand All @@ -404,7 +411,19 @@ void list_id_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (!(job = get_job_by_id (ctx->jsctx, &err, msg, id, attrs, &stall))) {
if (state && (state & ~valid_states)) {
seterror (&err, "invalid payload: invalid state specified");
errno = EPROTO;
goto error;
}

if (!(job = get_job_by_id (ctx->jsctx,
&err,
msg,
id,
attrs,
state,
&stall))) {
/* response handled after KVS lookup complete */
if (stall)
goto stall;
Expand Down