Skip to content

Commit

Permalink
Merge pull request #5837 from garlick/inactive_job_journal
Browse files Browse the repository at this point in the history
eliminate duplicate KVS restart in job-list and job-manager
  • Loading branch information
mergify[bot] committed Mar 30, 2024
2 parents 69711aa + abdb9b6 commit 57ac53a
Show file tree
Hide file tree
Showing 19 changed files with 432 additions and 1,188 deletions.
5 changes: 0 additions & 5 deletions doc/man5/flux-config-job-manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ table, which may contain the following keys:
KEYS
====

journal-size-limit
(optional) Integer value that determines the maximum number of job events to
be retained in the in-memory journal used to answer queries. The default
is 1000.

inactive-age-limit
(optional) String (in RFC 23 Flux Standard Duration format) that specifies
the maximum age of inactive jobs retained in the KVS. The age is computed
Expand Down
36 changes: 25 additions & 11 deletions src/modules/job-list/job-list.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct list_ctx *ctx = arg;

if (!ctx->jsctx->initialized) {
if (flux_msglist_append (ctx->deferred_requests, msg) < 0)
goto error;
return;
}

int pending = zlistx_size (ctx->jsctx->pending);
int running = zlistx_size (ctx->jsctx->running);
int inactive = zlistx_size (ctx->jsctx->inactive);
Expand All @@ -57,15 +64,12 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
"idsync",
"lookups", idsync_lookups,
"waits", idsync_waits,
"stats_watchers", stats_watchers) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto error;
}

"stats_watchers", stats_watchers) < 0)
flux_log_error (h, "error responding to stats-get request");
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
flux_log_error (h, "error responding to stats-get request");
}

static void purge_cb (flux_t *h,
Expand Down Expand Up @@ -98,6 +102,17 @@ static void purge_cb (flux_t *h,
flux_log (h, LOG_DEBUG, "purged %d inactive jobs", count);
}

void requeue_deferred_requests (struct list_ctx *ctx)
{
const flux_msg_t *msg;

while ((msg = flux_msglist_pop (ctx->deferred_requests))) {
if (flux_requeue (ctx->h, msg, FLUX_RQ_TAIL) < 0)
flux_log_error (ctx->h, "error requeuing deferred request");
flux_msg_decref (msg);
}
}

static void disconnect_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
Expand Down Expand Up @@ -186,6 +201,7 @@ static void list_ctx_destroy (struct list_ctx *ctx)
if (ctx) {
int saved_errno = errno;
flux_msg_handler_delvec (ctx->handlers);
flux_msglist_destroy (ctx->deferred_requests);
if (ctx->jsctx)
job_state_destroy (ctx->jsctx);
if (ctx->isctx)
Expand All @@ -207,7 +223,9 @@ static struct list_ctx *list_ctx_create (flux_t *h)
goto error;
if (!(ctx->isctx = idsync_ctx_create (ctx->h)))
goto error;
if (!(ctx->jsctx = job_state_create (ctx->isctx)))
if (!(ctx->jsctx = job_state_create (ctx)))
goto error;
if (!(ctx->deferred_requests = flux_msglist_create ()))
goto error;
return ctx;
error:
Expand All @@ -224,10 +242,6 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "initialization error");
goto done;
}
if (job_state_init_from_kvs (ctx->jsctx) < 0) {
flux_log_error (h, "initialization from kvs error");
goto done;
}
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
goto done;
rc = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/modules/job-list/job-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ struct list_ctx {
flux_msg_handler_t **handlers;
struct job_state_ctx *jsctx;
struct idsync_ctx *isctx;
struct flux_msglist *deferred_requests;
};

const char **job_attrs (void);

void requeue_deferred_requests (struct list_ctx *ctx);

#endif /* _FLUX_JOB_LIST_H */

/*
Expand Down
35 changes: 22 additions & 13 deletions src/modules/job-list/job_data.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ void job_destroy (void *data)
json_decref (job->exception_context);
json_decref (job->jobspec_updates);
json_decref (job->R_updates);
zlist_destroy (&job->updates);
free (job);
errno = save_errno;
}
Expand All @@ -69,16 +68,8 @@ struct job *job_create (flux_t *h, flux_jobid_t id)
job->expiration = -1.0;
job->wait_status = -1;
job->result = FLUX_JOB_RESULT_FAILED;

if (!(job->updates = zlist_new ())) {
errno = ENOMEM;
job_destroy (job);
return NULL;
}

job->states_mask = FLUX_JOB_STATE_NEW;
job->states_events_mask = FLUX_JOB_STATE_NEW;
job->eventlog_seq = -1;
return job;
}

Expand Down Expand Up @@ -368,15 +359,24 @@ static int parse_jobspec (struct job *job, bool allow_nonfatal)
return allow_nonfatal ? 0 : -1;
}

int job_parse_jobspec (struct job *job, const char *s, json_t *updates)
int job_parse_jobspec_cached (struct job *job, json_t *updates)
{
if (load_jobspec (job, s, true) < 0)
if (!job->jobspec) {
errno = EINVAL;
return -1;
}
if (parse_jobspec (job, true) < 0)
return -1;
return job_jobspec_update (job, updates);
}

int job_parse_jobspec (struct job *job, const char *s, json_t *updates)
{
if (load_jobspec (job, s, true) < 0)
return -1;
return job_parse_jobspec_cached (job, updates);
}

int job_parse_jobspec_fatal (struct job *job, const char *s, json_t *updates)
{
if (load_jobspec (job, s, false) < 0)
Expand Down Expand Up @@ -466,15 +466,24 @@ static int parse_R (struct job *job, bool allow_nonfatal)
return rc;
}

int job_parse_R (struct job *job, const char *s, json_t *updates)
int job_parse_R_cached (struct job *job, json_t *updates)
{
if (load_R (job, s, true) < 0)
if (!job->R) {
errno = EINVAL;
return -1;
}
if (parse_R (job, true) < 0)
return -1;
return job_R_update (job, updates);
}

int job_parse_R (struct job *job, const char *s, json_t *updates)
{
if (load_R (job, s, true) < 0)
return -1;
return job_parse_R_cached (job, updates);
}

int job_parse_R_fatal (struct job *job, const char *s, json_t *updates)
{
if (load_R (job, s, false) < 0)
Expand Down
16 changes: 4 additions & 12 deletions src/modules/job-list/job_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,17 @@ struct job {
json_t *R;
json_t *exception_context;

/* All internal changes (most notably job state transitions) are
* placed on the updates list. We do not immediately update to
* the new state and place onto a new list until we have retrieved
* any necessary data associated to that state. For example, when
* the 'depend' state has been seen, we don't immediately place it
* on the `pending` list. We wait until we've retrieved data such
* as userid, urgency, etc.
*
* Track which states we have seen and have completed transition
/* Track which states we have seen and have completed transition
* to. States we've processed via the states_mask and states seen
* via events stream in states_events_mask.
*/
zlist_t *updates;
unsigned int states_mask;
unsigned int states_events_mask;
void *list_handle;
/* if updates in eventlog before jobspec / R read from KVS */
/* store updates that were received before jobspec/R objects */
json_t *jobspec_updates;
json_t *R_updates;

int eventlog_seq; /* last event seq read */
int submit_version; /* version number in submit context */
};

Expand All @@ -110,6 +100,7 @@ struct job *job_create (flux_t *h, flux_jobid_t id);
* the jobspec.
*/
int job_parse_jobspec (struct job *job, const char *s, json_t *updates);
int job_parse_jobspec_cached (struct job *job, json_t *updates);

/* identical to above, but all nonfatal errors will return error.
* Primarily used for testing.
Expand All @@ -129,6 +120,7 @@ int job_jobspec_update (struct job *job, json_t *updates);
* - ntasks (if necessary)
*/
int job_parse_R (struct job *job, const char *s, json_t *updates);
int job_parse_R_cached (struct job *job, json_t *updates);

/* identical to above, but all nonfatal errors will return error.
* Primarily used for testing.
Expand Down

0 comments on commit 57ac53a

Please sign in to comment.