Skip to content

Commit

Permalink
Merge pull request #5897 from garlick/issue#5889
Browse files Browse the repository at this point in the history
alloc-check: account for resources in scheduler hello failure
  • Loading branch information
mergify[bot] committed Apr 18, 2024
2 parents 21ce5c0 + 811eed4 commit 091fc1f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 55 deletions.
96 changes: 43 additions & 53 deletions src/modules/job-manager/plugins/alloc-check.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
* A fatal exception is raised on jobs that are granted resources already
* granted to another.
*
* In order to be sure that the exception can be raised before a short job
* becomes inactive, R is looked up in the KVS synchronously, causing the
* job manager to be briefly unresponsive. Hence, this plugin is primarily
* suited for debug/test situations.
*
* N.B. This plugin does not account for any jobs that might already have
* allocations when the plugin is loaded.
*/
Expand All @@ -33,6 +28,7 @@
#include "ccan/str/str.h"
#include "src/common/librlist/rlist.h"
#include "src/common/libjob/idf58.h"
#include "src/common/libeventlog/eventlog.h"

#define PLUGIN_NAME "alloc-check"
static const char *auxname = PLUGIN_NAME "::resdb";
Expand Down Expand Up @@ -69,39 +65,27 @@ static struct resdb *resdb_create (void)
return resdb;
}

/* Generate the kvs path to R for a given job
/* When a job is presented to the scheduler via the RFC 27 'hello' handshake
* upon scheduler reload, the scheduler raises a fatal scheduler-restart
* exception if it cannot re-allocate the job's resources and the job manager
* marks resources free without posting a free event. This plugin must
* account for those resources. See flux-framework/flux-core#5889
*/
static int res_makekey (flux_jobid_t id, char *buf, size_t size)
static bool is_hello_failure (json_t *entry)
{
char dir[128];
if (flux_job_id_encode (id, "kvs", dir, sizeof (dir)) < 0)
return -1;
if (snprintf (buf, size, "%s.R", dir) >= size) {
errno = EOVERFLOW;
return -1;
}
return 0;
}

/* Synchronously look up R for a given job and convert it to an rlist object
* which the caller must destroy with rlist_destroy().
*/
static struct rlist *res_lookup (flux_t *h, flux_jobid_t id)
{
char key[128];
flux_future_t *f = NULL;
const char *R;
struct rlist *rlist;

if (res_makekey (id, key, sizeof (key)) < 0
|| !(f = flux_kvs_lookup (h, NULL, 0, key))
|| flux_kvs_lookup_get (f, &R) < 0
|| !(rlist = rlist_from_R (R))) {
flux_future_destroy (f);
return NULL;
}
flux_future_destroy (f);
return rlist;
const char *type;
int severity;
json_t *context;

if (eventlog_entry_parse (entry, NULL, NULL, &context) == 0
&& json_unpack (context,
"{s:i s:s}",
"severity", &severity,
"type", &type) == 0
&& severity == 0
&& streq (type, "scheduler-restart"))
return true;
return false;
}

static int jobtap_cb (flux_plugin_t *p,
Expand All @@ -112,11 +96,15 @@ static int jobtap_cb (flux_plugin_t *p,
struct resdb *resdb = flux_plugin_aux_get (p, auxname);
flux_t *h = flux_jobtap_get_flux (p);
flux_jobid_t id;
json_t *entry = NULL;
json_t *R = NULL;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:I}",
"id", &id) < 0) {
"{s:I s?o s?o}",
"id", &id,
"entry", &entry,
"R", &R) < 0) {
flux_log (h,
LOG_ERR,
"%s %s: unpack: %s",
Expand All @@ -137,29 +125,29 @@ static int jobtap_cb (flux_plugin_t *p,
topic);
}
}
/* Look up R that was just allocated to the job and attach it to the job
* aux container so we don't have to look it up again on free. Call
* rlist_append() to add the resources to resdb->allocated. If that
* fails, some resources are already allocated so raise a fatal exception
* on the job.
/* Attach R that was just allocated to the job to the job aux container
* so we don't have to parse it again on free. Call rlist_append() to add
* the resources to resdb->allocated. If that fails, some resources are
* already allocated so raise a fatal exception on the job.
*/
else if (streq (topic, "job.event.alloc")) {
struct rlist *R;
if (!(R = res_lookup (h, id))
struct rlist *rl = NULL;
if (!R
|| !(rl = rlist_from_json (R, NULL))
|| flux_jobtap_job_aux_set (p,
id,
PLUGIN_NAME "::R",
R,
rl,
(flux_free_f)rlist_destroy) < 0) {
flux_log_error (h,
"%s(%s) %s: failed to lookup or cache R",
"%s(%s) %s: failed to parse or cache R",
PLUGIN_NAME,
idf58 (id),
topic);
rlist_destroy (R);
rlist_destroy (rl);
return -1;
}
if (rlist_append (resdb->allocated, R) < 0) {
if (rlist_append (resdb->allocated, rl) < 0) {
flux_jobtap_raise_exception (p,
id,
"alloc-check",
Expand All @@ -171,11 +159,12 @@ static int jobtap_cb (flux_plugin_t *p,
* from resdb->allocated. Any jobs that had allocations before the module
* will not have the R aux item, so silently return success in that case.
*/
else if (streq (topic, "job.event.free")) {
struct rlist *R = flux_jobtap_job_aux_get (p, id, PLUGIN_NAME "::R");
if (R) {
else if (streq (topic, "job.event.free")
|| (streq (topic, "job.event.exception") && is_hello_failure (entry))) {
struct rlist *rl = flux_jobtap_job_aux_get (p, id, PLUGIN_NAME "::R");
if (rl) {
struct rlist *diff;
if (!(diff = rlist_diff (resdb->allocated, R))) {
if (!(diff = rlist_diff (resdb->allocated, rl))) {
flux_log_error (h,
"%s(%s) %s: rlist_diff",
PLUGIN_NAME,
Expand All @@ -193,6 +182,7 @@ static int jobtap_cb (flux_plugin_t *p,
static const struct flux_plugin_handler tab[] = {
{ "job.event.alloc", jobtap_cb, NULL },
{ "job.event.free", jobtap_cb, NULL },
{ "job.event.exception", jobtap_cb, NULL },
{ "job.new", jobtap_cb, NULL },
{ 0 }
};
Expand Down
57 changes: 55 additions & 2 deletions t/t2303-sched-hello.t
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ receives a fatal exception.
test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile
. $(dirname $0)/sharness.sh

test_under_flux 1 job
test_under_flux 1

test_expect_success 'start a long-running job' '
jobid=$(flux submit -n1 -t1h sleep 3600)
jobid=$(flux submit -N1 sleep inf)
'
test_expect_success 'unload scheduler' '
flux module remove sched-simple &&
Expand All @@ -38,6 +38,59 @@ test_expect_success 'job receives exception' '
test_expect_success 'job receives clean event' '
flux job wait-event -v -t 30 ${jobid} clean
'
test_expect_success 'unload scheduler' '
flux module remove sched-simple &&
flux module remove resource
'
test_expect_success 'restore the empty config' '
flux config load </dev/null
'
test_expect_success 'load scheduler' '
flux module load resource &&
flux module load sched-simple
'

#
# The following ensures that the alloc-check jobtap plugin properly accounts
# for resources "freed" by hello failure.
#
test_expect_success 'load the alloc-check plugin' '
flux jobtap load alloc-check.so
'
test_expect_success 'start a long-running job' '
jobid=$(flux submit -N1 sleep inf)
'
test_expect_success 'unload scheduler' '
flux module remove sched-simple &&
flux module remove resource
'
test_expect_success 'exclude the job node from configuration' '
flux config load <<-EOT
[resource]
exclude = "0"
EOT
'
test_expect_success 'load scheduler' '
flux module load resource &&
flux module load sched-simple
'
test_expect_success 'job receives exception' '
flux job wait-event -t 30 ${jobid} exception
'
test_expect_success 'unload scheduler' '
flux module remove sched-simple &&
flux module remove resource
'
test_expect_success 'restore the empty config' '
flux config load </dev/null
'
test_expect_success 'load scheduler' '
flux module load resource &&
flux module load sched-simple
'
test_expect_success 'run another job that uses the same resources' '
flux run -vv -N1 true
'
test_expect_success 'decrease broker stderr log level' '
flux setattr log-stderr-level 5
'
Expand Down

0 comments on commit 091fc1f

Please sign in to comment.