Skip to content

Commit

Permalink
job-list: defer requests until after init
Browse files Browse the repository at this point in the history
Problem: job-list requests should not be processed until the
module is fully initialized.

While jsctx->initialized is false, requests are placed on a queue.
When jsctx->initialized is set true after the initial journal backlog
processing, messages on the queue are requeued in the flux_t handle
for reactive processing.
  • Loading branch information
garlick committed Mar 27, 2024
1 parent 36598b3 commit a873cdd
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
25 changes: 25 additions & 0 deletions src/modules/job-list/job-list.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct list_ctx *ctx = arg;

if (!ctx->jsctx->initialized) {
if (flux_msglist_append (ctx->deferred_requests, msg) < 0)
goto error;
return;
}

int pending = zlistx_size (ctx->jsctx->pending);
int running = zlistx_size (ctx->jsctx->running);
int inactive = zlistx_size (ctx->jsctx->inactive);
Expand All @@ -59,6 +66,10 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
"waits", idsync_waits,
"stats_watchers", stats_watchers) < 0)
flux_log_error (h, "error responding to stats-get request");
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "error responding to stats-get request");
}

static void purge_cb (flux_t *h,
Expand Down Expand Up @@ -91,6 +102,17 @@ static void purge_cb (flux_t *h,
flux_log (h, LOG_DEBUG, "purged %d inactive jobs", count);
}

void requeue_deferred_requests (struct list_ctx *ctx)
{
const flux_msg_t *msg;

while ((msg = flux_msglist_pop (ctx->deferred_requests))) {
if (flux_requeue (ctx->h, msg, FLUX_RQ_TAIL) < 0)
flux_log_error (ctx->h, "error requeuing deferred request");
flux_msg_decref (msg);
}
}

static void disconnect_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
Expand Down Expand Up @@ -179,6 +201,7 @@ static void list_ctx_destroy (struct list_ctx *ctx)
if (ctx) {
int saved_errno = errno;
flux_msg_handler_delvec (ctx->handlers);
flux_msglist_destroy (ctx->deferred_requests);
if (ctx->jsctx)
job_state_destroy (ctx->jsctx);
if (ctx->isctx)
Expand All @@ -202,6 +225,8 @@ static struct list_ctx *list_ctx_create (flux_t *h)
goto error;
if (!(ctx->jsctx = job_state_create (ctx)))
goto error;
if (!(ctx->deferred_requests = flux_msglist_create ()))
goto error;
return ctx;
error:
list_ctx_destroy (ctx);
Expand Down
3 changes: 3 additions & 0 deletions src/modules/job-list/job-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ struct list_ctx {
flux_msg_handler_t **handlers;
struct job_state_ctx *jsctx;
struct idsync_ctx *isctx;
struct flux_msglist *deferred_requests;
};

const char **job_attrs (void);

void requeue_deferred_requests (struct list_ctx *ctx);

#endif /* _FLUX_JOB_LIST_H */

/*
Expand Down
16 changes: 15 additions & 1 deletion src/modules/job-list/job_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -666,11 +666,19 @@ void job_state_pause_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct list_ctx *ctx = arg;

if (!ctx->jsctx->initialized) {
if (flux_msglist_append (ctx->deferred_requests, msg) < 0)
goto error;
return;
}
ctx->jsctx->pause = true;

if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to pause request");
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "error responding to pause request");
}

void job_state_unpause_cb (flux_t *h, flux_msg_handler_t *mh,
Expand All @@ -679,6 +687,11 @@ void job_state_unpause_cb (flux_t *h, flux_msg_handler_t *mh,
struct list_ctx *ctx = arg;
const flux_msg_t *resp;

if (!ctx->jsctx->initialized) {
if (flux_msglist_append (ctx->deferred_requests, msg) < 0)
goto error;
return;
}
resp = flux_msglist_first (ctx->jsctx->backlog);
while (resp) {
if (journal_process_events (ctx->jsctx, resp) < 0)
Expand Down Expand Up @@ -1328,6 +1341,7 @@ static void job_events_journal_continuation (flux_future_t *f, void *arg)
}
}
jsctx->initialized = true;
requeue_deferred_requests (jsctx->ctx);
flux_future_reset (f);
return;
}
Expand Down
18 changes: 17 additions & 1 deletion src/modules/job-list/list.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ void list_cb (flux_t *h, flux_msg_handler_t *mh,
struct state_constraint *statec = NULL;
flux_error_t error;

if (!ctx->jsctx->initialized) {
if (flux_msglist_append (ctx->deferred_requests, msg) < 0)
goto error;
return;
}
if (flux_request_unpack (msg, NULL, "{s:i s:o s?F s?o}",
"max_entries", &max_entries,
"attrs", &attrs,
Expand Down Expand Up @@ -482,6 +487,11 @@ void list_id_cb (flux_t *h, flux_msg_handler_t *mh,
int valid_states = FLUX_JOB_STATE_ACTIVE | FLUX_JOB_STATE_INACTIVE;
bool stall = false;

if (!ctx->jsctx->initialized) {
if (flux_msglist_append (ctx->deferred_requests, msg) < 0)
goto error;
return;
}
if (flux_request_unpack (msg, NULL, "{s:I s:o s?i}",
"id", &id,
"attrs", &attrs,
Expand Down Expand Up @@ -546,10 +556,16 @@ static int list_attrs_append (json_t *a, const char *attr)
void list_attrs_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct list_ctx *ctx = arg;
const char **attrs;
json_t *a;
json_t *a = NULL;
int i;

if (!ctx->jsctx->initialized) {
if (flux_msglist_append (ctx->deferred_requests, msg) < 0)
goto error;
return;
}
if (!(a = json_array ())) {
errno = ENOMEM;
goto error;
Expand Down

0 comments on commit a873cdd

Please sign in to comment.