Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qmanager: defer jobs with unmatchable constraints #1188

Merged
merged 8 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
"patch": 0
trws marked this conversation as resolved.
Show resolved Hide resolved
},
"include": [
"otherThings.json",
"moreThings.json"
],
"configurePresets": [
{
Expand All @@ -16,7 +14,9 @@
"description": "Default build using Ninja generator",
trws marked this conversation as resolved.
Show resolved Hide resolved
"generator": "Ninja",
"binaryDir": "${sourceDir}/build/default",
"cacheVariables": {},
"cacheVariables": {
"CMAKE_BUILD_TYPE": "RelWithDebInfo"
},
"environment": {},
"vendor": {}
},
Expand Down
7 changes: 5 additions & 2 deletions qmanager/modules/qmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,11 @@ static void update_on_resource_response (flux_future_t *f, void *arg)
flux_reactor_stop (flux_get_reactor (ctx->h));
trws marked this conversation as resolved.
Show resolved Hide resolved
goto out;
}
for (auto &kv : ctx->queues)
kv.second->set_schedulability (true);
for (auto &[_, queue] : ctx->queues) {
queue->set_schedulability (true);
// constraints must be reconsidered if node status changes
queue->reconsider_blocked_jobs ();
}

out:
flux_future_reset (f);
Expand Down
14 changes: 13 additions & 1 deletion qmanager/policies/base/queue_policy_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ extern "C" {
#include <flux/core.h>
}

#include <cassert>
#include <map>
#include <algorithm>
#include <vector>
Expand Down Expand Up @@ -154,6 +155,12 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
virtual int reconstruct_resource (void *h, std::shared_ptr<job_t> job,
std::string &ret_R) = 0;

/// @brief move any jobs in blocked state to pending
void reconsider_blocked_jobs () {
m_pending.merge (m_blocked);
assert (m_blocked.size () == 0);
}

/*! Set queue parameters. Can be called multiple times.
*
* \param params comma-delimited key-value pairs string
Expand Down Expand Up @@ -465,7 +472,10 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
default:
break;
}

// blocked jobs must be reconsidered after a job completes
// this covers cases where jobs that couldn't run because of an
// existing job's reservation can when it completes early
reconsider_blocked_jobs ();
rc = 0;
out:
return rc;
Expand Down Expand Up @@ -960,6 +970,8 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
uint64_t m_reprio_cnt = 0;
unsigned int m_queue_depth = DEFAULT_QUEUE_DEPTH;
unsigned int m_max_queue_depth = MAX_QUEUE_DEPTH;
/// jobs that need to wait for resource state updates
std::map<std::vector<double>, flux_jobid_t> m_blocked;
std::map<std::vector<double>, flux_jobid_t> m_pending;
std::map<std::vector<double>, flux_jobid_t> m_pending_provisional;
std::map<uint64_t, flux_jobid_t> m_pending_cancel_provisional;
Expand Down
18 changes: 11 additions & 7 deletions qmanager/policies/queue_policy_bf_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,15 @@ queue_policy_bf_base_t<reapi_type>::allocate_orelse_reserve (void *h,
: "match error");
} else {
// This can happen if there are "down" resources.
// The semantics of our backfill policies is to skip this job
iter++;
// The semantics of our backfill policies is to skip this job,
// add it to the blocked list, and re-consider when resource
// status changes

// copy and advance iterator before extract invalidates it
auto next = iter;
++next;
m_blocked.insert (m_pending.extract (iter));
iter = next;
}
}
return iter;
Expand Down Expand Up @@ -129,11 +136,8 @@ int queue_policy_bf_base_t<reapi_type>::allocate_orelse_reserve_jobs (void *h,
std::shared_ptr<job_t> job;

// move jobs in m_pending_provisional queue into
// m_pending. Note that c++11 doesn't have a clean way
// to "move" elements between two std::map objects so
// we use copy for the time being.
m_pending.insert (m_pending_provisional.begin (),
m_pending_provisional.end ());
// m_pending.
m_pending.merge (m_pending_provisional);
m_pending_provisional.clear ();

set_sched_loop_active (true);
Expand Down
53 changes: 29 additions & 24 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ static int run (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,

static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
const char *cmd, const std::string &jstr, int64_t *now,
int64_t *at, double *ov, std::stringstream &o,
int64_t *at, double *overhead, std::stringstream &o,
flux_error_t *errp)
{
int rc = 0;
Expand All @@ -1781,8 +1781,8 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
*at = *now = epoch.count ();
if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov, false);
*overhead = elapsed.count ();
update_match_perf (ctx, *overhead, false);
goto done;
}
if ( (rc = ctx->writers->emit (o)) < 0) {
Expand All @@ -1792,12 +1792,12 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,

rsv = (*now != *at)? true : false;
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov, true);
*overhead = elapsed.count ();
update_match_perf (ctx, *overhead, true);

if (cmd != std::string ("satisfiability")) {
if ( (rc = track_schedule_info (ctx, jobid,
rsv, *at, jstr, o, *ov)) != 0) {
rsv, *at, jstr, o, *overhead)) != 0) {
flux_log_error (ctx->h, "%s: can't add job info (id=%jd)",
__FUNCTION__, (intmax_t)jobid);
goto done;
Expand All @@ -1809,7 +1809,7 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
}

static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
const char *R, int64_t &at, double &ov,
const char *R, int64_t &at, double &overhead,
std::stringstream &o)
{
int rc = 0;
Expand All @@ -1826,8 +1826,8 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
}
if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov, false);
overhead = elapsed.count ();
update_match_perf (ctx, overhead, false);
flux_log_error (ctx->h, "%s: run", __FUNCTION__);
goto done;
}
Expand All @@ -1836,9 +1836,9 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
goto done;
}
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov, true);
if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) {
overhead = elapsed.count ();
update_match_perf (ctx, overhead, true);
if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, overhead)) != 0) {
flux_log_error (ctx->h, "%s: can't add job info (id=%jd)",
__FUNCTION__, (intmax_t)jobid);
goto done;
Expand All @@ -1853,7 +1853,7 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
{
char *R = NULL;
int64_t at = 0;
double ov = 0.0f;
double overhead = 0.0f;
int64_t jobid = 0;
uint64_t duration = 0;
std::string status = "";
Expand Down Expand Up @@ -1883,13 +1883,13 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
}
elapsed = std::chrono::system_clock::now () - start;
// If a jobid with matching R exists, no need to update
ov = elapsed.count ();
overhead = elapsed.count ();
get_jobstate_str (ctx->jobs[jobid]->state, status);
o << ctx->jobs[jobid]->R;
at = ctx->jobs[jobid]->scheduled_at;
flux_log (ctx->h, LOG_DEBUG, "%s: jobid (%jd) with matching R exists",
__FUNCTION__, static_cast<intmax_t> (jobid));
} else if (run_update (ctx, jobid, R, at, ov, o) < 0) {
} else if (run_update (ctx, jobid, R, at, overhead, o) < 0) {
flux_log_error (ctx->h,
"%s: update failed (id=%jd)",
__FUNCTION__, static_cast<intmax_t> (jobid));
Expand All @@ -1902,7 +1902,7 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
if (flux_respond_pack (h, msg, "{s:I s:s s:f s:s s:I}",
"jobid", jobid,
"status", status.c_str (),
"overhead", ov,
"overhead", overhead,
"R", o.str ().c_str (),
"at", at) < 0)
flux_log_error (h, "%s", __FUNCTION__);
Expand Down Expand Up @@ -1947,7 +1947,7 @@ static void match_request_cb (flux_t *h, flux_msg_handler_t *w,
int64_t at = 0;
int64_t now = 0;
int64_t jobid = -1;
double ov = 0.0f;
double overhead = 0.0f;
std::string status = "";
const char *cmd = NULL;
const char *js_str = NULL;
Expand All @@ -1963,19 +1963,24 @@ static void match_request_cb (flux_t *h, flux_msg_handler_t *w,
__FUNCTION__, (intmax_t)jobid);
goto error;
}
if (run_match (ctx, jobid, cmd, js_str, &now, &at, &ov, R, NULL) < 0) {
if (run_match (ctx, jobid, cmd, js_str, &now, &at, &overhead, R, NULL) < 0) {
if (errno != EBUSY && errno != ENODEV)
flux_log_error (ctx->h,
"%s: match failed due to match error (id=%jd)",
__FUNCTION__, (intmax_t)jobid);
// The resources couldn't be allocated *or reserved*
// Kicking back to qmanager, remove from tracking
if (errno == EBUSY) {
ctx->jobs.erase (jobid);
}
goto error;
}

status = get_status_string (now, at);
if (flux_respond_pack (h, msg, "{s:I s:s s:f s:s s:I}",
"jobid", jobid,
"status", status.c_str (),
"overhead", ov,
"overhead", overhead,
"R", R.str ().c_str (),
"at", at) < 0)
flux_log_error (h, "%s", __FUNCTION__);
Expand Down Expand Up @@ -2018,7 +2023,7 @@ static void match_multi_request_cb (flux_t *h, flux_msg_handler_t *w,
const char *js_str;
int64_t at = 0;
int64_t now = 0;
double ov = 0.0f;
double overhead = 0.0f;
std::string status = "";
std::stringstream R;

Expand All @@ -2032,7 +2037,7 @@ static void match_multi_request_cb (flux_t *h, flux_msg_handler_t *w,
__FUNCTION__, static_cast<intmax_t> (jobid));
goto error;
}
if (run_match (ctx, jobid, cmd, js_str, &now, &at, &ov, R, NULL) < 0) {
if (run_match (ctx, jobid, cmd, js_str, &now, &at, &overhead, R, NULL) < 0) {
if (errno != EBUSY && errno != ENODEV)
flux_log_error (ctx->h,
"%s: match failed due to match error (id=%jd)",
Expand All @@ -2044,7 +2049,7 @@ static void match_multi_request_cb (flux_t *h, flux_msg_handler_t *w,
if (flux_respond_pack (h, msg, "{s:I s:s s:f s:s s:I}",
"jobid", jobid,
"status", status.c_str (),
"overhead", ov,
"overhead", overhead,
"R", R.str ().c_str (),
"at", at) < 0) {
flux_log_error (h, "%s", __FUNCTION__);
Expand Down Expand Up @@ -2704,7 +2709,7 @@ static void satisfiability_request_cb (flux_t *h, flux_msg_handler_t *w,
{
int64_t at = 0;
int64_t now = 0;
double ov = 0.0f;
double overhead = 0.0f;
int saved_errno = 0;
std::stringstream R;
json_t *jobspec = nullptr;
Expand All @@ -2726,7 +2731,7 @@ static void satisfiability_request_cb (flux_t *h, flux_msg_handler_t *w,
js_str,
&now,
&at,
&ov,
&overhead,
R,
&error) < 0) {
if (errno == ENODEV)
Expand Down
9 changes: 8 additions & 1 deletion resource/traversers/dfu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,14 @@ int dfu_traverser_t::run (Jobspec::Jobspec &jobspec,
detail::dfu_impl_t::update ();
} else if ( (rc = schedule (jobspec, meta, x, op, root, dfv)) == 0) {
*at = meta.at;
if (*at < 0 or *at >= graph_end) {
if (*at == graph_end) {
detail::dfu_impl_t::reset_exclusive_resource_types
(exclusive_types);
// no schedulable point found even at the end of the time, return EBUSY
errno = EBUSY;
return -1;
}
if (*at < 0 or *at > graph_end) {
detail::dfu_impl_t::reset_exclusive_resource_types
(exclusive_types);
errno = EINVAL;
Expand Down
5 changes: 3 additions & 2 deletions resource/traversers/dfu_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ struct jobmeta_t {
errno = EINVAL;
return -1;
}
if (jobspec.attributes.system.duration == 0.0f)
if (jobspec.attributes.system.duration == 0.0f) {
duration = g_duration;
else
} else {
duration = (int64_t)jobspec.attributes.system.duration;
}
if (jobspec.attributes.system.queue != "") {
m_queue = jobspec.attributes.system.queue;
m_queue_set = true;
Expand Down
1 change: 1 addition & 0 deletions t/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ set(ALL_TESTS
t4010-match-conf.t
t4011-match-duration.t
t4012-set-status.t
t4013-unreservable.sh
t5000-valgrind.t
t5100-issues-test-driver.t
t6000-graph-size.t
Expand Down
2 changes: 1 addition & 1 deletion t/scripts/maybe-installtest
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ if test -n "$FLUX_SCHED_TEST_INSTALLED" -o -n "$FLUX_TEST_INSTALLED_PATH"; then
export FLUX_MODULE_PATH_PREPEND="${FLUX_SCHED_MOD_DIR}"
export PYTHONPATH="${FLUX_SCHED_PYTHON_SITELIB}${PYTHONPATH:+:${PYTHONPATH}}"
fi
exec $@
exec flux env $@