New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
job-list: allow list-id to wait for job state #5213
Changes from all commits
66db8b7
6336369
01f18eb
c123d54
526a2d3
0d912ab
b658aa9
75a64be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,12 @@ | |
#include "idsync.h" | ||
#include "job_util.h" | ||
|
||
/* Used in waits hash, need to store job id within data structure for lookup */ | ||
struct idsync_wait_list { | ||
zlistx_t *l; | ||
flux_jobid_t id; | ||
}; | ||
|
||
void idsync_data_destroy (void *data) | ||
{ | ||
if (data) { | ||
|
@@ -48,6 +54,7 @@ static struct idsync_data *idsync_data_create (flux_t *h, | |
flux_jobid_t id, | ||
const flux_msg_t *msg, | ||
json_t *attrs, | ||
flux_job_state_t state, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Commit message: it might be fine, but I was stumbling over reading of:
Is it supposed to be either "Support an optional job state in the..." or maybe "Support a job state option in the..."? |
||
flux_future_t *f_lookup) | ||
{ | ||
struct idsync_data *isd = NULL; | ||
|
@@ -60,6 +67,7 @@ static struct idsync_data *idsync_data_create (flux_t *h, | |
if (!(isd->msg = flux_msg_copy (msg, false))) | ||
goto error; | ||
isd->attrs = json_incref (attrs); | ||
isd->state = state; | ||
isd->f_lookup = f_lookup; | ||
return isd; | ||
|
||
|
@@ -70,10 +78,15 @@ static struct idsync_data *idsync_data_create (flux_t *h, | |
return NULL; | ||
} | ||
|
||
static void idsync_waits_list_destroy (void **data) | ||
static void idsync_wait_list_destroy (void **data) | ||
{ | ||
if (data) | ||
zlistx_destroy ((zlistx_t **) data); | ||
if (data) { | ||
struct idsync_wait_list *iwl = *data; | ||
if (iwl) { | ||
zlistx_destroy (&iwl->l); | ||
free (iwl); | ||
} | ||
} | ||
} | ||
|
||
struct idsync_ctx *idsync_ctx_create (flux_t *h) | ||
|
@@ -93,7 +106,7 @@ struct idsync_ctx *idsync_ctx_create (flux_t *h) | |
if (!(isctx->waits = job_hash_create ())) | ||
goto error; | ||
|
||
zhashx_set_destructor (isctx->waits, idsync_waits_list_destroy); | ||
zhashx_set_destructor (isctx->waits, idsync_wait_list_destroy); | ||
|
||
return isctx; | ||
|
||
|
@@ -126,7 +139,8 @@ void idsync_ctx_destroy (struct idsync_ctx *isctx) | |
struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx, | ||
flux_jobid_t id, | ||
const flux_msg_t *msg, | ||
json_t *attrs) | ||
json_t *attrs, | ||
flux_job_state_t state) | ||
{ | ||
flux_future_t *f = NULL; | ||
struct idsync_data *isd = NULL; | ||
|
@@ -142,7 +156,7 @@ struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx, | |
goto error; | ||
} | ||
|
||
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, f))) | ||
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, state, f))) | ||
goto error; | ||
|
||
/* future now owned by struct idsync_data */ | ||
|
@@ -174,26 +188,31 @@ void idsync_check_id_valid_cleanup (struct idsync_ctx *isctx, | |
static int idsync_add_waiter (struct idsync_ctx *isctx, | ||
struct idsync_data *isd) | ||
{ | ||
zlistx_t *list_isd; | ||
struct idsync_wait_list *iwl = NULL; | ||
|
||
/* isctx->waits holds lists of ids waiting on, b/c multiple callers | ||
* could wait on same id */ | ||
if (!(list_isd = zhashx_lookup (isctx->waits, &isd->id))) { | ||
if (!(list_isd = zlistx_new ())) | ||
if (!(iwl = zhashx_lookup (isctx->waits, &isd->id))) { | ||
iwl = calloc (1, sizeof (*iwl)); | ||
if (!iwl) | ||
goto enomem; | ||
zlistx_set_destructor (list_isd, idsync_data_destroy_wrapper); | ||
|
||
if (zhashx_insert (isctx->waits, &isd->id, list_isd) < 0) | ||
if (!(iwl->l = zlistx_new ())) | ||
goto enomem; | ||
zlistx_set_destructor (iwl->l, idsync_data_destroy_wrapper); | ||
iwl->id = isd->id; | ||
|
||
if (zhashx_insert (isctx->waits, &iwl->id, iwl) < 0) | ||
goto enomem; | ||
} | ||
|
||
if (!zlistx_add_end (list_isd, isd)) | ||
if (!zlistx_add_end (iwl->l, isd)) | ||
goto enomem; | ||
|
||
return 0; | ||
|
||
enomem: | ||
idsync_data_destroy (isd); | ||
idsync_wait_list_destroy ((void **)&iwl); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cast to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i get a error/warning with it :-( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, ok, my bad then! |
||
errno = ENOMEM; | ||
return -1; | ||
} | ||
|
@@ -215,11 +234,12 @@ int idsync_wait_valid (struct idsync_ctx *isctx, struct idsync_data *isd) | |
int idsync_wait_valid_id (struct idsync_ctx *isctx, | ||
flux_jobid_t id, | ||
const flux_msg_t *msg, | ||
json_t *attrs) | ||
json_t *attrs, | ||
flux_job_state_t state) | ||
{ | ||
struct idsync_data *isd = NULL; | ||
|
||
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, NULL))) | ||
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, state, NULL))) | ||
return -1; | ||
|
||
return idsync_add_waiter (isctx, isd); | ||
|
@@ -248,16 +268,30 @@ static void idsync_data_respond (struct idsync_ctx *isctx, | |
|
||
void idsync_check_waiting_id (struct idsync_ctx *isctx, struct job *job) | ||
{ | ||
zlistx_t *list_isd; | ||
struct idsync_wait_list *iwl; | ||
|
||
if ((list_isd = zhashx_lookup (isctx->waits, &job->id))) { | ||
if ((iwl = zhashx_lookup (isctx->waits, &job->id))) { | ||
struct idsync_data *isd; | ||
isd = zlistx_first (list_isd); | ||
isd = zlistx_first (iwl->l); | ||
while (isd) { | ||
idsync_data_respond (isctx, isd, job); | ||
isd = zlistx_next (list_isd); | ||
/* Some job states can be missed. For example a job that | ||
* is canceled before it runs will never reach the | ||
* FLUX_JOB_STATE_RUN state. To ensure jobs waiting on | ||
* states that are missed will eventually get a response, always | ||
* respond once the job has reached the inactive state. | ||
*/ | ||
if (!isd->state | ||
|| (isd->state & job->states_mask) | ||
|| (isd->state && job->state == FLUX_JOB_STATE_INACTIVE)) { | ||
struct idsync_data *tmp; | ||
idsync_data_respond (isctx, isd, job); | ||
tmp = zlistx_detach_cur (iwl->l); | ||
idsync_data_destroy (tmp); | ||
} | ||
isd = zlistx_next (iwl->l); | ||
} | ||
zhashx_delete (isctx->waits, &job->id); | ||
if (!zlistx_size (iwl->l)) | ||
zhashx_delete (isctx->waits, &job->id); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit message:
Suggestion: "the flux job list-ids command does not support any way to use it" ?