Skip to content

Commit

Permalink
testsuite: add jobmanager/sched-dummy
Browse files Browse the repository at this point in the history
  • Loading branch information
garlick committed Feb 22, 2019
1 parent 702293b commit 2ad4abf
Show file tree
Hide file tree
Showing 2 changed files with 373 additions and 1 deletion.
10 changes: 9 additions & 1 deletion t/Makefile.am
Expand Up @@ -234,7 +234,8 @@ check_LTLIBRARIES = \
module/parent.la \
module/child.la \
request/req.la \
ingest/job-manager-dummy.la
ingest/job-manager-dummy.la \
job-manager/sched-dummy.la

if HAVE_MPI
check_PROGRAMS += \
Expand Down Expand Up @@ -431,3 +432,10 @@ ingest_submitbench_SOURCES = ingest/submitbench.c
ingest_submitbench_CPPFLAGS = $(test_cppflags)
ingest_submitbench_LDADD = \
$(test_ldadd) $(LIBDL) $(LIBUTIL)

job_manager_sched_dummy_la_SOURCES = job-manager/sched-dummy.c
job_manager_sched_dummy_la_CPPFLAGS = $(test_cppflags)
job_manager_sched_dummy_la_LDFLAGS = $(fluxmod_ldflags) -module -rpath /nowhere
job_manager_sched_dummy_la_LIBADD = \
$(top_builddir)/src/common/libschedutil/libschedutil.la \
$(test_ldadd) $(LIBDL) $(LIBUTIL)
364 changes: 364 additions & 0 deletions t/job-manager/sched-dummy.c
@@ -0,0 +1,364 @@
/************************************************************\
* Copyright 2018 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* Simple scheduler for testing:
* - don't fetch jobspec from KVS when receiving alloc request
* - presume that each job is requesting one core
* - track core counts, not specific core id's
* - don't write R to KVS before responding to alloc request
*
* Command line usage:
* flux module load sched-dummy [--cores=N] [--mode=single|unlimited]
* Options
* --cores=N specifies the total number of cores available (default 16)
* --mode=MODE see job-manager/scheduler.c (default single)
*/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/core.h>
#include <jansson.h>
#include <czmq.h>
#include "src/common/liboptparse/optparse.h"
#include "src/common/libschedutil/schedutil.h"

struct job {
flux_msg_t *msg;
flux_jobid_t id;
int priority;
uint32_t userid;
double t_submit;
char *jobspec;
void *list_handle;
};

struct sched_ctx {
flux_t *h;
struct ops_context *sched_ops;
optparse_t *opt;
zlistx_t *queue;
int cores_total;
int cores_free;
flux_watcher_t *prep;
flux_watcher_t *check;
flux_watcher_t *idle;
};

static void job_destroy (struct job *job);

/* Keep scheduler's internal queue in priority, then submit time order.
*/
#define NUMCMP(a,b) ((a)==(b)?0:((a)<(b)?-1:1))
static int queue_item_cmp (const void *a1, const void *a2)
{
const struct job *j1 = a1;
const struct job *j2 = a2;
int rc;

if ((rc = (-1)*NUMCMP (j1->priority, j2->priority)) == 0)
rc = NUMCMP (j1->t_submit, j2->t_submit);
return rc;
}

static void queue_item_destroy (void **item)
{
job_destroy (*(struct job **)item);
*item = NULL;
}

static void job_destroy (struct job *job)
{
if (job) {
int saved_errno = errno;
free (job->jobspec);
flux_msg_destroy (job->msg);
free (job);
errno = saved_errno;
}
}

/* Create job struct from sched.alloc request.
*/
static struct job *job_create (const flux_msg_t *msg, const char *jobspec)
{
struct job *job;

if (!(job = calloc (1, sizeof (*job))))
return NULL;
if (schedutil_alloc_request_decode (msg, &job->id, &job->priority,
&job->userid, &job->t_submit) < 0)
goto error;
if (!(job->jobspec = strdup (jobspec)))
goto error;
if (!(job->msg = flux_msg_copy (msg, true)))
goto error;
return job;
error:
job_destroy (job);
return NULL;
}


struct job *find_job (struct sched_ctx *sc, flux_jobid_t id)
{
struct job *job;

job = zlistx_first (sc->queue);
while (job) {
if (id == job->id)
return job;
job = zlistx_next (sc->queue);
}
return NULL;
}

void exception_cb (flux_t *h, flux_jobid_t id,
const char *type, int severity, void *arg)
{
struct sched_ctx *sc = arg;
char note[80];
struct job *job;

if (severity > 0 || !(job = find_job (sc, id)))
return;
(void)snprintf (note, sizeof(note),
"alloc aborted due to exception type=%s", type);
if (schedutil_alloc_respond_denied (h, job->msg, note) < 0)
flux_log_error (h, "%s: alloc_respond_denied", __FUNCTION__);
zlistx_delete (sc->queue, job->list_handle);
}

void free_cb (flux_t *h, const flux_msg_t *msg, const char *R, void *arg)
{
struct sched_ctx *sc = arg;
flux_jobid_t id;

if (schedutil_free_request_decode (msg, &id) < 0)
goto error;
flux_log (h, LOG_DEBUG, "free: id=%llu R=%s",
(unsigned long long)id, R);
sc->cores_free++;
if (schedutil_free_respond (h, msg) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

/* sched.alloc
* Enqueue alloc request for processing by check_cb().
*/
void alloc_cb (flux_t *h, const flux_msg_t *msg,
const char *jobspec, void *arg)
{
struct sched_ctx *sc = arg;
struct job *job;

if (!(job = job_create (msg, jobspec))) {
flux_log_error (h, "%s: job_create", __FUNCTION__);
goto error;
}
if (!(job->list_handle = zlistx_insert (sc->queue, job, false))) {
job_destroy (job);
errno = ENOMEM;
flux_log_error (h, "%s: zlistx_insert", __FUNCTION__);
goto error;
}
flux_log (h, LOG_DEBUG, "alloc: id=%llu jobspec=%s",
(unsigned long long)job->id, job->jobspec);
return;
error:
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

/* Send error responses to any pending alloc requests.
* This is called when module is unloading, and will cause the job-manager
* to quiesce its scheduling interface without affecting the jobs.
*/
void sched_respond_all (struct sched_ctx *sc, int errnum, const char *errstr)
{
struct job *job;

while ((job = zlistx_first (sc->queue))) {
if (flux_respond_error (sc->h, job->msg, errnum, "%s", errstr) < 0)
flux_log_error (sc->h, "%s: flux_respond_error", __FUNCTION__);
zlistx_delete (sc->queue, job->list_handle);
}
}

/* prep - runs right before reactor calls poll(2).
* If a job can be scheduled, start idle watcher to avoid blocking.
*/
void prep_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
struct sched_ctx *sc = arg;
struct job *job = zlistx_first (sc->queue);

if (sc->cores_free > 0 && job != NULL)
flux_watcher_start (sc->idle);
}

/* check - runs right after reactor's call to poll(2) returns.
* Send alloc responses to one job, then allow reactor to continue.
*/
void check_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
struct sched_ctx *sc = arg;
struct job *job = zlistx_first (sc->queue);

flux_watcher_stop (sc->idle);

if (sc->cores_free > 0 && job != NULL) {
if (schedutil_alloc_respond_R (sc->h, job->msg, "1core", NULL) < 0)
flux_log_error (sc->h, "schedutil_alloc_respond_R");
zlistx_delete (sc->queue, job->list_handle);
sc->cores_free--;
}
}

/* Called for each resource chunk (R) that is still held by jobs
* during scheduler initialization.
*/
int hello_cb (flux_t *h, const char *R, void *arg)
{
flux_log (h, LOG_DEBUG, "%s: R=%s", __FUNCTION__, R);
return 0;
}

static struct optparse_option dummy_opts[] = {
{ .name = "cores",
.has_arg = 1,
.flags = 0,
.arginfo = "COUNT",
.usage = "Core count (default 16)",
},
{ .name = "mode",
.has_arg = 1,
.flags = 0,
.arginfo = "MODE",
.usage = "Set scheduler interface mode (default single)",
},
OPTPARSE_TABLE_END,
};

/* N.B. module argv[0] is first argument, not module name.
*/
optparse_t *options_parse (int argc, char **argv)
{
optparse_t *opt;
if (!(opt = optparse_create ("sched-dummy"))) {
errno = ENOMEM;
return NULL;
}
if (optparse_add_option_table (opt, dummy_opts) != OPTPARSE_SUCCESS)
goto error;
if (optparse_parse_args (opt, argc + 1, argv - 1) < 0)
goto error;
return opt;
error:
optparse_destroy (opt);
errno = EINVAL;
return NULL;
}

void sched_destroy (struct sched_ctx *sc)
{
if (sc) {
int saved_errno = errno;
zlistx_destroy (&sc->queue); // sends ENOSYS to any queued requests
flux_watcher_destroy (sc->prep);
flux_watcher_destroy (sc->check);
flux_watcher_destroy (sc->idle);
schedutil_ops_unregister (sc->sched_ops);
optparse_destroy (sc->opt);
free (sc);
errno = saved_errno;
}
}

struct sched_ctx *sched_create (flux_t *h, int argc, char **argv)
{
struct sched_ctx *sc;
flux_reactor_t *r = flux_get_reactor (h);

if (!(sc = calloc (1, sizeof (*sc))))
return NULL;
sc->h = h;
if (!(sc->queue = zlistx_new ())) {
errno = ENOMEM;
goto error;
}
if (!(sc->sched_ops = schedutil_ops_register (h,
alloc_cb,
free_cb,
exception_cb,
sc))) {
flux_log_error (h, "schedutil_ops_register");
goto error;
}
zlistx_set_comparator (sc->queue, queue_item_cmp);
zlistx_set_destructor (sc->queue, queue_item_destroy);
if (!(sc->opt = options_parse (argc, argv))) {
errno = EINVAL;
goto error;
}
sc->cores_total = sc->cores_free = optparse_get_int (sc->opt, "cores", 16);
if (!(sc->prep = flux_prepare_watcher_create (r, prep_cb, sc)))
goto error;
if (!(sc->check = flux_check_watcher_create (r, check_cb, sc)))
goto error;
if (!(sc->idle = flux_idle_watcher_create (r, NULL, NULL)))
goto error;
flux_watcher_start (sc->prep);
flux_watcher_start (sc->check);
return sc;
error:
sched_destroy (sc);
return NULL;
}

int mod_main (flux_t *h, int argc, char *argv[])
{
int rc = -1;
struct sched_ctx *sc;
const char *mode;
int count;

if (!(sc = sched_create (h, argc, argv)))
return -1;
flux_log (h, LOG_DEBUG, "res pool is %d cores", sc->cores_total);
if (schedutil_hello (h, hello_cb, sc) < 0) {
flux_log_error (h, "schedutil_hello");
goto done;
}
mode = optparse_get_str (sc->opt, "mode", "single");
if (schedutil_ready (h, mode, &count) < 0) {
flux_log_error (h, "schedutil_ready");
goto done;
}
flux_log (sc->h, LOG_DEBUG, "ready: count=%d", count);

if ((rc = flux_reactor_run (flux_get_reactor (h), 0)) < 0)
flux_log_error (h, "flux_reactor_run");
done:
sched_respond_all (sc, ENOSYS, "sched unloading");
sched_destroy (sc);
return rc;
}
MOD_NAME ("sched-dummy");

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/

0 comments on commit 2ad4abf

Please sign in to comment.