Skip to content

Commit

Permalink
Merge pull request #5877 from garlick/issue#5876
Browse files Browse the repository at this point in the history
job-manager: canceled job need not wait for sched
  • Loading branch information
mergify[bot] committed Apr 11, 2024
2 parents cba28f0 + c053cab commit 4bd465d
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 91 deletions.
38 changes: 38 additions & 0 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,38 @@ static void broker_lsmod_cb (flux_t *h,
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void broker_module_debug_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
broker_ctx_t *ctx = arg;
const char *name;
int defer = -1;
module_t *p;

if (flux_request_unpack (msg,
NULL,
"{s:s s?b}",
"name", &name,
"defer", &defer) < 0)
goto error;
if (!(p = modhash_lookup_byname (ctx->modhash, name))) {
errno = ENOENT;
goto error;
}
if (defer != -1) {
if (module_set_defer (p, defer) < 0)
goto error;
}
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to module-debug request");
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "error responding to module-debug request");
}

/* This is a message handler for status messages from modules, not to be
* confused with module_status_cb().
*/
Expand Down Expand Up @@ -1632,6 +1664,12 @@ static const struct flux_msg_handler_spec htab[] = {
broker_lsmod_cb,
FLUX_ROLE_USER,
},
{
FLUX_MSGTYPE_REQUEST,
"broker.module-debug",
broker_module_debug_cb,
0,
},
{
FLUX_MSGTYPE_REQUEST,
"broker.module-status",
Expand Down
29 changes: 29 additions & 0 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct broker_module {

struct flux_msglist *rmmod_requests;
struct flux_msglist *insmod_requests;
struct flux_msglist *deferred_messages;

flux_t *h; /* module's handle */
struct subhash *sub;
Expand Down Expand Up @@ -439,6 +440,13 @@ int module_sendmsg_new (module_t *p, flux_msg_t **msg)
return -1;
}
}
if (p->deferred_messages) {
if (flux_msglist_append (p->deferred_messages, *msg) < 0)
return -1;
flux_msg_decref (*msg);
*msg = NULL;
return 0;
}
return flux_send_new (p->h_broker, msg, 0);
}

Expand Down Expand Up @@ -498,6 +506,7 @@ void module_destroy (module_t *p)
json_decref (p->attr_cache);
flux_msglist_destroy (p->rmmod_requests);
flux_msglist_destroy (p->insmod_requests);
flux_msglist_destroy (p->deferred_messages);
subhash_destroy (p->sub);
free (p);
errno = saved_errno;
Expand Down Expand Up @@ -531,6 +540,26 @@ void module_mute (module_t *p)
p->muted = true;
}

int module_set_defer (module_t *p, bool flag)
{
if (flag && !p->deferred_messages) {
if (!(p->deferred_messages = flux_msglist_create ()))
return -1;
}
if (!flag && p->deferred_messages) {
const flux_msg_t *msg;
while ((msg = flux_msglist_pop (p->deferred_messages))) {
if (flux_send_new (p->h_broker, (flux_msg_t **)&msg, 0) < 0) {
flux_msg_decref (msg);
return -1;
}
}
flux_msglist_destroy (p->deferred_messages);
p->deferred_messages = NULL;
}
return 0;
}

int module_start (module_t *p)
{
int errnum;
Expand Down
5 changes: 5 additions & 0 deletions src/broker/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ int module_start (module_t *p);
*/
int module_stop (module_t *p, flux_t *h);

/* Defer all messages that would be sent to module if flag=true.
* Stop deferring them and send backlog if flag=false.
*/
int module_set_defer (module_t *p, bool flag);

/* Mute module. Do not send any more messages.
*/
void module_mute (module_t *p);
Expand Down
124 changes: 57 additions & 67 deletions src/modules/job-manager/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ static void requeue_pending (struct alloc *alloc, struct job *job)
{
struct job_manager *ctx = alloc->ctx;
bool fwd = job->priority > (FLUX_JOB_PRIORITY_MAX / 2);
bool cleared = false;

assert (job->alloc_pending);
if (job->handle) {
Expand All @@ -74,18 +73,7 @@ static void requeue_pending (struct alloc *alloc, struct job *job)
flux_log (ctx->h, LOG_ERR, "failed to enqueue job for scheduling");
job->alloc_queued = 1;
}
annotations_sched_clear (job, &cleared);
if (cleared) {
if (event_job_post_pack (ctx->event,
job,
"annotations",
EVENT_NO_COMMIT,
"{s:n}",
"annotations") < 0)
flux_log_error (ctx->h,
"%s: event_job_post_pack",
__FUNCTION__);
}
annotations_clear_and_publish (ctx, job, "sched");
}

/* Initiate teardown. Clear any alloc/free requests, and clear
Expand Down Expand Up @@ -120,16 +108,16 @@ static void interface_teardown (struct alloc *alloc, char *s, int errnum)
/* Send sched.free request for job.
* Update flags.
*/
int free_request (struct alloc *alloc, struct job *job)
int free_request (struct alloc *alloc, flux_jobid_t id, json_t *R)
{
flux_msg_t *msg;

if (!(msg = flux_request_encode ("sched.free", NULL)))
return -1;
if (flux_msg_pack (msg,
"{s:I s:O}",
"id", job->id,
"R", job->R_redacted) < 0)
"id", id,
"R", R) < 0)
goto error;
if (flux_send (alloc->ctx->h, msg, 0) < 0)
goto error;
Expand Down Expand Up @@ -176,7 +164,6 @@ static void alloc_response_cb (flux_t *h,
json_t *annotations = NULL;
json_t *R = NULL;
struct job *job;
bool cleared = false;

if (flux_response_decode (msg, NULL, NULL) < 0)
goto teardown; // ENOSYS here if scheduler not loaded/shutting down
Expand All @@ -188,20 +175,25 @@ static void alloc_response_cb (flux_t *h,
"annotations", &annotations,
"R", &R) < 0)
goto teardown;
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
flux_log (h, LOG_ERR, "sched.alloc-response: id=%s not active",
idf58 (id));
errno = EINVAL;
goto teardown;
}
if (!job->alloc_pending) {
flux_log (h, LOG_ERR, "sched.alloc-response: id=%s not requested",
idf58 (id));
errno = EINVAL;
goto teardown;
}

job = zhashx_lookup (ctx->active_jobs, &id);
if (job && !job->alloc_pending)
job = NULL;

switch (type) {
case FLUX_SCHED_ALLOC_SUCCESS:
if (!R) {
flux_log (h, LOG_ERR, "sched.alloc-response: protocol error");
errno = EPROTO;
goto teardown;
}
(void)json_object_del (R, "scheduling");
alloc->alloc_pending_count--;

if (!job) {
(void)free_request (alloc, id, R);
break;
}
if (alloc->alloc_limit) {
if (zlistx_delete (alloc->pending_jobs, job->handle) < 0)
flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job");
Expand All @@ -215,19 +207,12 @@ static void alloc_response_cb (flux_t *h,
errno = EEXIST;
goto teardown;
}
if (!R) {
flux_log (h, LOG_ERR, "sched.alloc-response: protocol error");
errno = EPROTO;
goto teardown;
}
(void)json_object_del (R, "scheduling");
job->R_redacted = json_incref (R);
if (annotations_update_and_publish (ctx, job, annotations) < 0)
flux_log_error (h, "annotations_update: id=%s", idf58 (id));

/* Only modify job state after annotation event is published
*/
alloc->alloc_pending_count--;
job->alloc_pending = 0;
if (job->annotations) {
if (event_job_post_pack (ctx->event,
Expand All @@ -248,30 +233,22 @@ static void alloc_response_cb (flux_t *h,
errno = EPROTO;
goto teardown;
}
if (!job)
break;
if (annotations_update_and_publish (ctx, job, annotations) < 0)
flux_log_error (h, "annotations_update: id=%s", idf58 (id));
break;
case FLUX_SCHED_ALLOC_DENY: // error
alloc->alloc_pending_count--;
if (!job)
break;
job->alloc_pending = 0;
if (alloc->alloc_limit) {
if (zlistx_delete (alloc->pending_jobs, job->handle) < 0)
flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job");
job->handle = NULL;
}
annotations_clear (job, &cleared);
if (cleared) {
if (event_job_post_pack (ctx->event,
job,
"annotations",
EVENT_NO_COMMIT,
"{s:n}",
"annotations") < 0)
flux_log_error (ctx->h,
"%s: event_job_post_pack: id=%s",
__FUNCTION__,
idf58 (id));
}
annotations_clear_and_publish (ctx, job, NULL);
if (raise_job_exception (ctx,
job,
"alloc",
Expand All @@ -282,6 +259,8 @@ static void alloc_response_cb (flux_t *h,
break;
case FLUX_SCHED_ALLOC_CANCEL:
alloc->alloc_pending_count--;
if (!job)
break;
if (job->state == FLUX_JOB_STATE_SCHED)
requeue_pending (alloc, job);
else {
Expand All @@ -290,21 +269,9 @@ static void alloc_response_cb (flux_t *h,
flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job");
job->handle = NULL;
}
annotations_clear (job, &cleared);
annotations_clear_and_publish (ctx, job, NULL);
}
job->alloc_pending = 0;
if (cleared) {
if (event_job_post_pack (ctx->event,
job,
"annotations",
EVENT_NO_COMMIT,
"{s:n}",
"annotations") < 0)
flux_log_error (ctx->h,
"%s: event_job_post_pack: id=%s",
__FUNCTION__,
idf58 (id));
}
if (queue_started (alloc->ctx->queue, job)) {
if (event_job_action (ctx->event, job) < 0) {
flux_log_error (h,
Expand Down Expand Up @@ -553,7 +520,7 @@ int alloc_send_free_request (struct alloc *alloc, struct job *job)
{
assert (job->state == FLUX_JOB_STATE_CLEANUP);
if (alloc->ready) {
if (free_request (alloc, job) < 0)
if (free_request (alloc, job->id, job->R_redacted) < 0)
return -1;
if ((job->flags & FLUX_JOB_DEBUG))
(void)event_job_post_pack (alloc->ctx->event,
Expand Down Expand Up @@ -595,14 +562,37 @@ void alloc_dequeue_alloc_request (struct alloc *alloc, struct job *job)
}
}

/* called from event_job_action() FLUX_JOB_STATE_CLEANUP
* or alloc_queue_recalc_pending() if queue order has changed.
/* Send a sched.cancel request for job. This RPC receives no direct response.
* Instead, the sched.alloc request receives a FLUX_SCHED_ALLOC_CANCEL or a
* FLUX_SCHED_ALLOC_SUCCESS response.
*
* As described in RFC 27, sched.alloc requests are canceled when:
* 1) a job in SCHED state is canceled
* 2) a queue is administratively disabled
* 3) when repriortizing jobs in limited mode
*
* The finalize flag is for the first case. It allows the job to continue
* through CLEANUP without waiting for the scheduler to respond to the cancel.
* The sched.alloc response handler must handle the case where the job is
* no longer active or its alloc_pending flag is clear. Essentially 'finalize'
* causes the job related finalization stuff to happen here rather than
* in the sched.alloc response handler.
*/
int alloc_cancel_alloc_request (struct alloc *alloc, struct job *job)
int alloc_cancel_alloc_request (struct alloc *alloc,
struct job *job,
bool finalize)
{
if (job->alloc_pending) {
if (cancel_request (alloc, job) < 0)
return -1;
if (finalize) {
job->alloc_pending = 0;
if (alloc->alloc_limit) {
(void)zlistx_delete (alloc->pending_jobs, job->handle);
job->handle = NULL;
}
annotations_clear_and_publish (alloc->ctx, job, NULL);
}
}
return 0;
}
Expand Down Expand Up @@ -673,7 +663,7 @@ int alloc_queue_recalc_pending (struct alloc *alloc)
&& head
&& tail) {
if (job_priority_comparator (head, tail) < 0) {
if (alloc_cancel_alloc_request (alloc, tail) < 0) {
if (alloc_cancel_alloc_request (alloc, tail, false) < 0) {
flux_log_error (alloc->ctx->h,
"%s: alloc_cancel_alloc_request",
__FUNCTION__);
Expand Down
7 changes: 6 additions & 1 deletion src/modules/job-manager/alloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ void alloc_dequeue_alloc_request (struct alloc *alloc, struct job *job);

/* Send a request to cancel pending alloc request.
* This function is a no-op if job->alloc_pending is not set.
* If finalize is true, update the job as though the cancelation
* request has already been handled, so the job can progress through
* CLEANUP without waiting for the scheduler response.
*/
int alloc_cancel_alloc_request (struct alloc *alloc, struct job *job);
int alloc_cancel_alloc_request (struct alloc *alloc,
struct job *job,
bool finalize);

/* Accessor for the count of queued alloc requests.
*/
Expand Down

0 comments on commit 4bd465d

Please sign in to comment.