Skip to content

Commit

Permalink
Merge pull request #5969 from garlick/issue#5964
Browse files Browse the repository at this point in the history
job-manager: misc cleanup in scheduler interface
  • Loading branch information
mergify[bot] committed May 15, 2024
2 parents 9f5ad97 + 819f0f1 commit be2fcf4
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 98 deletions.
148 changes: 50 additions & 98 deletions src/modules/job-manager/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <jansson.h>
#include <flux/core.h>
#include <flux/schedutil.h>
#include <assert.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libjob/idf58.h"
Expand All @@ -42,46 +41,41 @@ struct alloc {
struct job_manager *ctx;
flux_msg_handler_t **handlers;
zlistx_t *queue;
zlistx_t *pending_jobs;
bool ready;
bool stopped;
char *stopped_reason;
zlistx_t *sent; // track jobs w/ alloc reqs, mode=limited only
bool scheduler_is_online;
flux_watcher_t *prep;
flux_watcher_t *check;
flux_watcher_t *idle;
// e.g. for mode limited w/ limit=1, is 1, for mode unlimited set to 0
unsigned int alloc_limit;
// e.g. for mode limited w/ limit=1, max of 1
unsigned int alloc_pending_count;
char *sched_sender; // for disconnect
unsigned int alloc_limit; // will have a value of 0 in mode=unlimited
unsigned int sent_count; // track number of jobs w/ alloc reqs
char *sched_sender; // scheduler uuid for disconnect processing
};

static void requeue_pending (struct alloc *alloc, struct job *job)
{
struct job_manager *ctx = alloc->ctx;
bool fwd = job->priority > (FLUX_JOB_PRIORITY_MAX / 2);

assert (job->alloc_pending);
if (job->handle) {
if (zlistx_delete (alloc->pending_jobs, job->handle) < 0)
if (!job->alloc_pending)
return;
if (alloc->alloc_limit) {
if (job_priority_queue_delete (alloc->sent, job) < 0)
flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job");
job->handle = NULL;
}
job->alloc_pending = 0;
if (queue_started (alloc->ctx->queue, job)) {
if (!(job->handle = zlistx_insert (alloc->queue, job, fwd)))
if (job_priority_queue_insert (alloc->queue, job) < 0)
flux_log (ctx->h, LOG_ERR, "failed to enqueue job for scheduling");
job->alloc_queued = 1;
}
annotations_clear_and_publish (ctx, job, "sched");
}

/* Initiate teardown. Clear any alloc/free requests, and clear
* the alloc->ready flag to stop prep/check from allocating.
* the alloc->scheduler_is_online flag to stop prep/check from allocating.
*/
static void interface_teardown (struct alloc *alloc, char *s, int errnum)
{
if (alloc->ready) {
if (alloc->scheduler_is_online) {
struct job *job;
struct job_manager *ctx = alloc->ctx;

Expand All @@ -97,8 +91,8 @@ static void interface_teardown (struct alloc *alloc, char *s, int errnum)
requeue_pending (alloc, job);
job = zhashx_next (ctx->active_jobs);
}
alloc->ready = false;
alloc->alloc_pending_count = 0;
alloc->scheduler_is_online = false;
alloc->sent_count = 0;
free (alloc->sched_sender);
alloc->sched_sender = NULL;
drain_check (alloc->ctx->drain);
Expand Down Expand Up @@ -188,16 +182,15 @@ static void alloc_response_cb (flux_t *h,
goto teardown;
}
(void)json_object_del (R, "scheduling");
alloc->alloc_pending_count--;
alloc->sent_count--;

if (!job) {
(void)free_request (alloc, id, R);
break;
}
if (alloc->alloc_limit) {
if (zlistx_delete (alloc->pending_jobs, job->handle) < 0)
if (job_priority_queue_delete (alloc->sent, job) < 0)
flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job");
job->handle = NULL;
}
if (job->has_resources || job->R_redacted) {
flux_log (h,
Expand Down Expand Up @@ -239,14 +232,13 @@ static void alloc_response_cb (flux_t *h,
flux_log_error (h, "annotations_update: id=%s", idf58 (id));
break;
case FLUX_SCHED_ALLOC_DENY: // error
alloc->alloc_pending_count--;
alloc->sent_count--;
if (!job)
break;
job->alloc_pending = 0;
if (alloc->alloc_limit) {
if (zlistx_delete (alloc->pending_jobs, job->handle) < 0)
if (job_priority_queue_delete (alloc->sent, job) < 0)
flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job");
job->handle = NULL;
}
annotations_clear_and_publish (ctx, job, NULL);
if (raise_job_exception (ctx,
Expand All @@ -258,16 +250,15 @@ static void alloc_response_cb (flux_t *h,
goto teardown;
break;
case FLUX_SCHED_ALLOC_CANCEL:
alloc->alloc_pending_count--;
alloc->sent_count--;
if (!job)
break;
if (job->state == FLUX_JOB_STATE_SCHED)
requeue_pending (alloc, job);
else {
if (alloc->alloc_limit) {
if (zlistx_delete (alloc->pending_jobs, job->handle) < 0)
if (job_priority_queue_delete (alloc->sent, job) < 0)
flux_log (ctx->h, LOG_ERR, "failed to dequeue pending job");
job->handle = NULL;
}
annotations_clear_and_publish (ctx, job, NULL);
}
Expand Down Expand Up @@ -405,7 +396,7 @@ static void ready_cb (flux_t *h,
if (!(ctx->alloc->sched_sender = strdup (sender)))
goto error;
}
ctx->alloc->ready = true;
ctx->alloc->scheduler_is_online = true;
flux_log (h, LOG_DEBUG, "scheduler: ready %s", mode);
count = zlistx_size (ctx->alloc->queue);
if (flux_respond_pack (h, msg, "{s:i}", "count", count) < 0)
Expand Down Expand Up @@ -435,12 +426,12 @@ static bool alloc_work_available (struct job_manager *ctx)
{
struct job *job;

if (!ctx->alloc->ready) // scheduler protocol is not ready for alloc
if (!ctx->alloc->scheduler_is_online) // scheduler is not ready for alloc
return false;
if (!(job = zlistx_first (ctx->alloc->queue))) // queue is empty
return false;
if (ctx->alloc->alloc_limit > 0 // alloc limit reached
&& ctx->alloc->alloc_pending_count >= ctx->alloc->alloc_limit)
&& ctx->alloc->sent_count >= ctx->alloc->alloc_limit)
return false;
/* The alloc->queue is sorted from highest to lowest priority, so if the
* first job has priority=MIN (held), all other jobs must have the same
Expand Down Expand Up @@ -491,18 +482,16 @@ static void check_cb (flux_reactor_t *r,
flux_reactor_stop_error (flux_get_reactor (ctx->h));
return;
}
zlistx_delete (alloc->queue, job->handle);
job->handle = NULL;
job_priority_queue_delete (alloc->queue, job);
job->alloc_pending = 1;
job->alloc_queued = 0;
alloc->alloc_pending_count++;
/* Add job to alloc->pending_jobs if there is an alloc limit, so
alloc->sent_count++;
/* Add job to alloc->sent if there is an alloc limit, so
* that those requests can be canceled if the queue is reprioritized
* and higher priority requests need to preempt lower priority ones.
*/
if (alloc->alloc_limit) {
bool fwd = job->priority > (FLUX_JOB_PRIORITY_MAX / 2);
if (!(job->handle = zlistx_insert (alloc->pending_jobs, job, fwd)))
if (job_priority_queue_insert (alloc->sent, job) < 0)
flux_log (ctx->h, LOG_ERR, "failed to enqueue pending job");
}
/* Post event for debugging if job was submitted FLUX_JOB_DEBUG flag.
Expand All @@ -518,8 +507,9 @@ static void check_cb (flux_reactor_t *r,
/* called from event_job_action() FLUX_JOB_STATE_CLEANUP */
int alloc_send_free_request (struct alloc *alloc, struct job *job)
{
assert (job->state == FLUX_JOB_STATE_CLEANUP);
if (alloc->ready) {
if (job->state != FLUX_JOB_STATE_CLEANUP)
return -1;
if (alloc->scheduler_is_online) {
if (free_request (alloc, job->id, job->R_redacted) < 0)
return -1;
if ((job->flags & FLUX_JOB_DEBUG))
Expand All @@ -535,15 +525,14 @@ int alloc_send_free_request (struct alloc *alloc, struct job *job)
/* called from event_job_action() FLUX_JOB_STATE_SCHED */
int alloc_enqueue_alloc_request (struct alloc *alloc, struct job *job)
{
assert (job->state == FLUX_JOB_STATE_SCHED);
if (job->state != FLUX_JOB_STATE_SCHED)
return -1;
if (!job->alloc_bypass
&& !job->alloc_queued
&& !job->alloc_pending
&& job->priority != FLUX_JOB_PRIORITY_MIN
&& queue_started (alloc->ctx->queue, job)) {
bool fwd = job->priority > (FLUX_JOB_PRIORITY_MAX / 2);
assert (job->handle == NULL);
if (!(job->handle = zlistx_insert (alloc->queue, job, fwd)))
if (job_priority_queue_insert (alloc->queue, job) < 0)
return -1;
job->alloc_queued = 1;
}
Expand All @@ -556,8 +545,7 @@ int alloc_enqueue_alloc_request (struct alloc *alloc, struct job *job)
void alloc_dequeue_alloc_request (struct alloc *alloc, struct job *job)
{
if (job->alloc_queued) {
zlistx_delete (alloc->queue, job->handle);
job->handle = NULL;
job_priority_queue_delete (alloc->queue, job);
job->alloc_queued = 0;
}
}
Expand Down Expand Up @@ -587,10 +575,8 @@ int alloc_cancel_alloc_request (struct alloc *alloc,
return -1;
if (finalize) {
job->alloc_pending = 0;
if (alloc->alloc_limit) {
(void)zlistx_delete (alloc->pending_jobs, job->handle);
job->handle = NULL;
}
if (alloc->alloc_limit)
job_priority_queue_delete (alloc->sent, job);
annotations_clear_and_publish (alloc->ctx, job, NULL);
}
}
Expand All @@ -611,54 +597,30 @@ struct job *alloc_queue_next (struct alloc *alloc)
/* called from reprioritize_job() */
void alloc_queue_reorder (struct alloc *alloc, struct job *job)
{
bool fwd = job->priority > (FLUX_JOB_PRIORITY_MAX / 2);

zlistx_reorder (alloc->queue, job->handle, fwd);
job_priority_queue_reorder (alloc->queue, job);
}

void alloc_pending_reorder (struct alloc *alloc, struct job *job)
{
if (alloc->alloc_limit) {
bool fwd = job->priority > (FLUX_JOB_PRIORITY_MAX / 2);
zlistx_reorder (alloc->pending_jobs, job->handle, fwd);
}
if (alloc->alloc_limit)
job_priority_queue_reorder (alloc->sent, job);
}

int alloc_queue_reprioritize (struct alloc *alloc)
{
struct job *job;
zlistx_sort (alloc->queue);

/* N.B.: zlistx_sort() invalidates all list handles since
* the sort swaps contents of nodes, not the nodes themselves.
* Therefore, job handles into the list must be re-acquired here:
*/

job = zlistx_first (alloc->queue);
while (job) {
job->handle = zlistx_cursor (alloc->queue);
job = zlistx_next (alloc->queue);
}

zlistx_sort (alloc->pending_jobs);

job = zlistx_first (alloc->pending_jobs);
while (job) {
job->handle = zlistx_cursor (alloc->pending_jobs);
job = zlistx_next (alloc->pending_jobs);
}
job_priority_queue_sort (alloc->queue);
job_priority_queue_sort (alloc->sent);

if (alloc->alloc_limit)
return alloc_queue_recalc_pending (alloc);
else
return 0;
return 0;
}

/* called if highest priority job may have changed */
int alloc_queue_recalc_pending (struct alloc *alloc)
{
struct job *head = zlistx_first (alloc->queue);
struct job *tail = zlistx_last (alloc->pending_jobs);
struct job *tail = zlistx_last (alloc->sent);
while (alloc->alloc_limit
&& head
&& tail) {
Expand All @@ -673,7 +635,7 @@ int alloc_queue_recalc_pending (struct alloc *alloc)
else
break;
head = zlistx_next (alloc->queue);
tail = zlistx_prev (alloc->pending_jobs);
tail = zlistx_prev (alloc->sent);
}
return 0;
}
Expand All @@ -685,12 +647,12 @@ int alloc_queue_count (struct alloc *alloc)

int alloc_pending_count (struct alloc *alloc)
{
return alloc->alloc_pending_count;
return alloc->sent_count;
}

bool alloc_sched_ready (struct alloc *alloc)
{
return alloc->ready;
return alloc->scheduler_is_online;
}

static void alloc_query_cb (flux_t *h,
Expand All @@ -705,7 +667,7 @@ static void alloc_query_cb (flux_t *h,
msg,
"{s:i s:i s:i}",
"queue_length", zlistx_size (alloc->queue),
"alloc_pending", alloc->alloc_pending_count,
"alloc_pending", alloc->sent_count,
"running", alloc->ctx->running_jobs) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
return;
Expand Down Expand Up @@ -790,8 +752,7 @@ void alloc_ctx_destroy (struct alloc *alloc)
flux_watcher_destroy (alloc->check);
flux_watcher_destroy (alloc->idle);
zlistx_destroy (&alloc->queue);
zlistx_destroy (&alloc->pending_jobs);
free (alloc->stopped_reason);
zlistx_destroy (&alloc->sent);
free (alloc->sched_sender);
free (alloc);
errno = saved_errno;
Expand Down Expand Up @@ -835,18 +796,9 @@ struct alloc *alloc_ctx_create (struct job_manager *ctx)
if (!(alloc = calloc (1, sizeof (*alloc))))
return NULL;
alloc->ctx = ctx;
if (!(alloc->queue = zlistx_new()))
if (!(alloc->queue = job_priority_queue_create ())
|| !(alloc->sent = job_priority_queue_create ()))
goto error;
zlistx_set_destructor (alloc->queue, job_destructor);
zlistx_set_comparator (alloc->queue, job_priority_comparator);
zlistx_set_duplicator (alloc->queue, job_duplicator);

if (!(alloc->pending_jobs = zlistx_new()))
goto error;
zlistx_set_destructor (alloc->pending_jobs, job_destructor);
zlistx_set_comparator (alloc->pending_jobs, job_priority_comparator);
zlistx_set_duplicator (alloc->pending_jobs, job_duplicator);

if (flux_msg_handler_addvec (ctx->h, htab, ctx, &alloc->handlers) < 0)
goto error;
alloc->prep = flux_prepare_watcher_create (r, prep_cb, ctx);
Expand Down

0 comments on commit be2fcf4

Please sign in to comment.