Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eliminate duplicate KVS restart in job-list and job-manager #5837

Merged
merged 22 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4f21fee
job-list: fix handling of respond failure
garlick Mar 26, 2024
3b26000
job-manager: keep in-memory copy of job eventlog
garlick Mar 26, 2024
1241e04
job-manager: allow job-manager.getattr eventlog
garlick Mar 26, 2024
2e9ad25
job-manager: send complete journal to consumers
garlick Mar 26, 2024
a999469
job-list: get jobs from journal not KVS
garlick Mar 26, 2024
830fab9
testsuite: udpate events_journal_stream test prog
garlick Mar 3, 2022
4203b2f
testsuite: drop seq specific test from t2210
garlick Mar 26, 2024
d55a940
testsuite: drop journal-size-limit test from t2210
garlick Mar 26, 2024
8f53d4e
testsuite: add full:true to journal test in t2210
garlick Mar 27, 2024
75b28b1
testsuite: fix test that modifies eventlog in kvs
garlick Mar 27, 2024
dab5030
testsuite: drop issue test for purge + journal
garlick Mar 27, 2024
6365e90
flux-config-job-manager(5): nix journal-size-limit
garlick Mar 27, 2024
930d9da
job-list: use a separate flag during init
garlick Mar 27, 2024
4973782
job-list: pass list_ctx into job_state_create()
garlick Mar 27, 2024
e588991
job-list: defer requests until after init
garlick Mar 27, 2024
e5e2b09
job-manager: include R, jobspec in journal stream
garlick Mar 28, 2024
689ab54
job-list: improve journal error handling
garlick Mar 28, 2024
5fb3a24
job-list: add job_parse_jobspec and _R variants
garlick Mar 29, 2024
3dd36bd
job-list: process R, jobspec from journal not KVS
garlick Mar 29, 2024
7c23756
job-list: drop futures list
garlick Mar 29, 2024
3724d77
testsuite: drop job-list tests for illegal KVS content
garlick Mar 29, 2024
abdb9b6
job-list: drop code for deferring job data updates
garlick Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions doc/man5/flux-config-job-manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ table, which may contain the following keys:
KEYS
====

journal-size-limit
(optional) Integer value that determines the maximum number of job events to
be retained in the in-memory journal used to answer queries. The default
is 1000.

inactive-age-limit
(optional) String (in RFC 23 Flux Standard Duration format) that specifies
the maximum age of inactive jobs retained in the KVS. The age is computed
Expand Down
36 changes: 25 additions & 11 deletions src/modules/job-list/job-list.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@
const flux_msg_t *msg, void *arg)
{
struct list_ctx *ctx = arg;

if (!ctx->jsctx->initialized) {
if (flux_msglist_append (ctx->deferred_requests, msg) < 0)
goto error;

Check warning on line 49 in src/modules/job-list/job-list.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job-list.c#L48-L49

Added lines #L48 - L49 were not covered by tests
return;
}

int pending = zlistx_size (ctx->jsctx->pending);
int running = zlistx_size (ctx->jsctx->running);
int inactive = zlistx_size (ctx->jsctx->inactive);
Expand All @@ -57,15 +64,12 @@
"idsync",
"lookups", idsync_lookups,
"waits", idsync_waits,
"stats_watchers", stats_watchers) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto error;
}

"stats_watchers", stats_watchers) < 0)
flux_log_error (h, "error responding to stats-get request");

Check warning on line 68 in src/modules/job-list/job-list.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job-list.c#L68

Added line #L68 was not covered by tests
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
flux_log_error (h, "error responding to stats-get request");

Check warning on line 72 in src/modules/job-list/job-list.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job-list.c#L72

Added line #L72 was not covered by tests
}

static void purge_cb (flux_t *h,
Expand Down Expand Up @@ -98,6 +102,17 @@
flux_log (h, LOG_DEBUG, "purged %d inactive jobs", count);
}

void requeue_deferred_requests (struct list_ctx *ctx)
{
const flux_msg_t *msg;

while ((msg = flux_msglist_pop (ctx->deferred_requests))) {
if (flux_requeue (ctx->h, msg, FLUX_RQ_TAIL) < 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever!

flux_log_error (ctx->h, "error requeuing deferred request");
flux_msg_decref (msg);

Check warning on line 112 in src/modules/job-list/job-list.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job-list.c#L110-L112

Added lines #L110 - L112 were not covered by tests
}
}

static void disconnect_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
Expand Down Expand Up @@ -186,6 +201,7 @@
if (ctx) {
int saved_errno = errno;
flux_msg_handler_delvec (ctx->handlers);
flux_msglist_destroy (ctx->deferred_requests);
if (ctx->jsctx)
job_state_destroy (ctx->jsctx);
if (ctx->isctx)
Expand All @@ -207,7 +223,9 @@
goto error;
if (!(ctx->isctx = idsync_ctx_create (ctx->h)))
goto error;
if (!(ctx->jsctx = job_state_create (ctx->isctx)))
if (!(ctx->jsctx = job_state_create (ctx)))
goto error;

Check warning on line 227 in src/modules/job-list/job-list.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job-list.c#L227

Added line #L227 was not covered by tests
if (!(ctx->deferred_requests = flux_msglist_create ()))
goto error;
return ctx;
error:
Expand All @@ -224,10 +242,6 @@
flux_log_error (h, "initialization error");
goto done;
}
if (job_state_init_from_kvs (ctx->jsctx) < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit message typo: s/sentinal/sentinel/

flux_log_error (h, "initialization from kvs error");
goto done;
}
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
goto done;
rc = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/modules/job-list/job-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ struct list_ctx {
flux_msg_handler_t **handlers;
struct job_state_ctx *jsctx;
struct idsync_ctx *isctx;
struct flux_msglist *deferred_requests;
};

const char **job_attrs (void);

void requeue_deferred_requests (struct list_ctx *ctx);

#endif /* _FLUX_JOB_LIST_H */

/*
Expand Down
35 changes: 22 additions & 13 deletions src/modules/job-list/job_data.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
json_decref (job->exception_context);
json_decref (job->jobspec_updates);
json_decref (job->R_updates);
zlist_destroy (&job->updates);
free (job);
errno = save_errno;
}
Expand All @@ -69,16 +68,8 @@
job->expiration = -1.0;
job->wait_status = -1;
job->result = FLUX_JOB_RESULT_FAILED;

if (!(job->updates = zlist_new ())) {
errno = ENOMEM;
job_destroy (job);
return NULL;
}

job->states_mask = FLUX_JOB_STATE_NEW;
job->states_events_mask = FLUX_JOB_STATE_NEW;
job->eventlog_seq = -1;
return job;
}

Expand Down Expand Up @@ -368,15 +359,24 @@
return allow_nonfatal ? 0 : -1;
}

int job_parse_jobspec (struct job *job, const char *s, json_t *updates)
int job_parse_jobspec_cached (struct job *job, json_t *updates)
{
if (load_jobspec (job, s, true) < 0)
if (!job->jobspec) {
errno = EINVAL;

Check warning on line 365 in src/modules/job-list/job_data.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job_data.c#L365

Added line #L365 was not covered by tests
return -1;
}
if (parse_jobspec (job, true) < 0)
return -1;
return job_jobspec_update (job, updates);
}

int job_parse_jobspec (struct job *job, const char *s, json_t *updates)

Check warning on line 373 in src/modules/job-list/job_data.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job_data.c#L373

Added line #L373 was not covered by tests
{
if (load_jobspec (job, s, true) < 0)

Check warning on line 375 in src/modules/job-list/job_data.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job_data.c#L375

Added line #L375 was not covered by tests
return -1;
return job_parse_jobspec_cached (job, updates);

Check warning on line 377 in src/modules/job-list/job_data.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job_data.c#L377

Added line #L377 was not covered by tests
}

int job_parse_jobspec_fatal (struct job *job, const char *s, json_t *updates)
{
if (load_jobspec (job, s, false) < 0)
Expand Down Expand Up @@ -466,15 +466,24 @@
return rc;
}

int job_parse_R (struct job *job, const char *s, json_t *updates)
int job_parse_R_cached (struct job *job, json_t *updates)
{
if (load_R (job, s, true) < 0)
if (!job->R) {
errno = EINVAL;

Check warning on line 472 in src/modules/job-list/job_data.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job_data.c#L472

Added line #L472 was not covered by tests
return -1;
}
if (parse_R (job, true) < 0)
return -1;
return job_R_update (job, updates);
}

int job_parse_R (struct job *job, const char *s, json_t *updates)
{
if (load_R (job, s, true) < 0)
return -1;
return job_parse_R_cached (job, updates);
}

int job_parse_R_fatal (struct job *job, const char *s, json_t *updates)
{
if (load_R (job, s, false) < 0)
Expand Down
16 changes: 4 additions & 12 deletions src/modules/job-list/job_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,17 @@ struct job {
json_t *R;
json_t *exception_context;

/* All internal changes (most notably job state transitions) are
* placed on the updates list. We do not immediately update to
* the new state and place onto a new list until we have retrieved
* any necessary data associated to that state. For example, when
* the 'depend' state has been seen, we don't immediately place it
* on the `pending` list. We wait until we've retrieved data such
* as userid, urgency, etc.
*
* Track which states we have seen and have completed transition
/* Track which states we have seen and have completed transition
* to. States we've processed via the states_mask and states seen
* via events stream in states_events_mask.
*/
zlist_t *updates;
unsigned int states_mask;
unsigned int states_events_mask;
void *list_handle;
/* if updates in eventlog before jobspec / R read from KVS */
/* store updates that were received before jobspec/R objects */
json_t *jobspec_updates;
json_t *R_updates;

int eventlog_seq; /* last event seq read */
int submit_version; /* version number in submit context */
};

Expand All @@ -110,6 +100,7 @@ struct job *job_create (flux_t *h, flux_jobid_t id);
* the jobspec.
*/
int job_parse_jobspec (struct job *job, const char *s, json_t *updates);
int job_parse_jobspec_cached (struct job *job, json_t *updates);

/* identical to above, but all nonfatal errors will return error.
* Primarily used for testing.
Expand All @@ -129,6 +120,7 @@ int job_jobspec_update (struct job *job, json_t *updates);
* - ntasks (if necessary)
*/
int job_parse_R (struct job *job, const char *s, json_t *updates);
int job_parse_R_cached (struct job *job, json_t *updates);

/* identical to above, but all nonfatal errors will return error.
* Primarily used for testing.
Expand Down