Skip to content

Commit

Permalink
Merge pull request #5796 from garlick/issue#5776
Browse files Browse the repository at this point in the history
add a faster way to get resource allocation status than sched.resource-status RPC
  • Loading branch information
mergify[bot] committed Mar 25, 2024
2 parents d115427 + 98dc6b5 commit fea7f25
Show file tree
Hide file tree
Showing 16 changed files with 670 additions and 95 deletions.
10 changes: 5 additions & 5 deletions doc/man1/flux-resource.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ acquires a subset of resources from the resource service to allocate to jobs,
and relies on the resource service to inform it of status changes that affect
the usability of resources by jobs as described in RFC 27.

The :program:`flux resource list` subcommand queries the scheduler for its view
of resources, including allocated/free status.
The :program:`flux resource list` subcommand queries the resource module
for the scheduler view of resources, including allocated/free status.

The other :program:`flux resource` subcommands operate on the resource service
and are primarily of interest to system administrators of a Flux system
Expand Down Expand Up @@ -182,9 +182,9 @@ online
resources that share a state and online/offline state.

.. note::
:program:`flux resource status` queries both the resource service and
the scheduler to identify resources that are available, excluded by
configuration, or administratively drained or draining.
:program:`flux resource status` queries both the administrative and
scheduler view of resources to identify resources that are available,
excluded by configuration, or administratively drained or draining.

.. option:: -s, --states=STATE,...

Expand Down
6 changes: 6 additions & 0 deletions doc/man7/flux-environment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ components or writing tests.
commands to resolve URIs to local form. This is useful in test environments
where the remote connector does not work.

.. envvar:: FLUX_RESOURCE_STATUS_RPC

If set, :man1:`flux-resource` uses the specified RPC topic string instead
of ``resource.sched-status``. This is used in test to verify that the
``sched.resource-status`` RPC used in earlier releases still works for
backwards compatibility.

MISCELLANEOUS
=============
Expand Down
2 changes: 1 addition & 1 deletion src/bindings/python/flux/job/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def __init__(self, uri=None):
if not uri or SchedResourceList is None:
raise ValueError
handle = flux.Flux(str(uri))
future = handle.rpc("sched.resource-status")
future = handle.rpc("resource.sched-status", nodeid=0)
self.stats = StatsInfo(handle).update_sync()
self.resources = SchedResourceList(future.get())
self.initialized = True
Expand Down
7 changes: 5 additions & 2 deletions src/bindings/python/flux/resource/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
# SPDX-License-Identifier: LGPL-3.0
###############################################################

import os

from flux.idset import IDset
from flux.memoized_property import memoized_property
from flux.resource import ResourceSet
Expand All @@ -16,7 +18,7 @@

class SchedResourceList:
"""
Encapsulate response from sched.resource-status query.
Encapsulate response from resource.sched-status query.
The response will contain 3 Rv1 resource sets:
:ivar all: all resources known to scheduler
Expand Down Expand Up @@ -98,4 +100,5 @@ def resource_list(flux_handle):
Returns:
ResourceListRPC: a future representing the request.
"""
return ResourceListRPC(flux_handle, "sched.resource-status")
topic = os.getenv("FLUX_RESOURCE_LIST_RPC") or "resource.sched-status"
return ResourceListRPC(flux_handle, topic, nodeid=0)
6 changes: 3 additions & 3 deletions src/cmd/top/summary_pane.c
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ static void resource_continuation (flux_future_t *f, void *arg)

if (flux_rpc_get_unpack (f, "o", &o) < 0) {
if (errno != ENOSYS) /* Instance may not be up yet */
fatal (errno, "sched.resource-status RPC failed");
fatal (errno, "resource.sched-status RPC failed");
}
else {
json_t *queue_constraint;
Expand All @@ -432,7 +432,7 @@ static void resource_continuation (flux_future_t *f, void *arg)
&sum->core.down,
&sum->gpu.down,
queue_constraint) < 0)
fatal (0, "error decoding sched.resource-status RPC response");
fatal (0, "error decoding resource.sched-status RPC response");
}
flux_future_destroy (f);
sum->f_resource = NULL;
Expand Down Expand Up @@ -539,7 +539,7 @@ void summary_pane_query (struct summary_pane *sum)
{
if (!sum->f_resource) {
if (!(sum->f_resource = flux_rpc (sum->top->h,
"sched.resource-status",
"resource.sched-status",
NULL,
0,
0))
Expand Down
4 changes: 3 additions & 1 deletion src/modules/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ job_manager_la_LIBADD = \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la \
$(top_builddir)/src/common/libflux-optparse.la \
$(JANSSON_LIBS)
$(top_builddir)/src/common/librlist/librlist.la \
$(JANSSON_LIBS) \
$(HWLOC_LIBS)
job_manager_la_LDFLAGS = \
$(fluxlib_ldflags) \
-avoid-version \
Expand Down
5 changes: 4 additions & 1 deletion src/modules/job-manager/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,13 @@ TESTS = \
test_ldadd = \
libjob-manager.la \
$(top_builddir)/src/common/libtap/libtap.la \
$(top_builddir)/src/common/librlist/librlist.la \
$(top_builddir)/src/common/libjob/libjob.la \
$(top_builddir)/src/common/libflux-core.la \
$(top_builddir)/src/common/libflux-internal.la \
$(LIBPTHREAD) $(JANSSON_LIBS)
$(LIBPTHREAD) \
$(JANSSON_LIBS) \
$(HWLOC_LIBS)

test_cppflags = \
$(AM_CPPFLAGS)
Expand Down
96 changes: 84 additions & 12 deletions src/modules/job-manager/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libjob/idf58.h"
#include "src/common/librlist/rlist.h"
#include "src/common/libutil/errprintf.h"
#include "ccan/str/str.h"

#include "job.h"
Expand Down Expand Up @@ -161,8 +163,10 @@ int cancel_request (struct alloc *alloc, struct job *job)
/* Handle a sched.alloc response.
* Update flags.
*/
static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
static void alloc_response_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct job_manager *ctx = arg;
struct alloc *alloc = ctx->alloc;
Expand Down Expand Up @@ -216,8 +220,8 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh,
errno = EPROTO;
goto teardown;
}
(void)json_object_del (R, "scheduling");
job->R_redacted = json_incref (R);
(void)json_object_del (job->R_redacted, "scheduling");
if (annotations_update_and_publish (ctx, job, annotations) < 0)
flux_log_error (h, "annotations_update: id=%s", idf58 (id));

Expand Down Expand Up @@ -349,8 +353,10 @@ int alloc_request (struct alloc *alloc, struct job *job)
/* sched-hello:
* Scheduler obtains jobs that have resources allocated.
*/
static void hello_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
static void hello_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct job_manager *ctx = arg;
struct job *job;
Expand Down Expand Up @@ -392,8 +398,10 @@ static void hello_cb (flux_t *h, flux_msg_handler_t *mh,
* and tells job-manager to start allocations. job-manager tells
* scheduler how many jobs are in the queue.
*/
static void ready_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
static void ready_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct job_manager *ctx = arg;
const char *mode;
Expand Down Expand Up @@ -480,8 +488,10 @@ static bool alloc_work_available (struct job_manager *ctx)
* Runs right before reactor calls poll(2).
* If a job can be scheduled, start idle watcher.
*/
static void prep_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct job_manager *ctx = arg;

Expand All @@ -493,8 +503,10 @@ static void prep_cb (flux_reactor_t *r, flux_watcher_t *w,
* Runs right after reactor calls poll(2).
* Stop idle watcher, and send next alloc request, if available.
*/
static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void check_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct job_manager *ctx = arg;
struct alloc *alloc = ctx->alloc;
Expand Down Expand Up @@ -662,7 +674,8 @@ int alloc_queue_recalc_pending (struct alloc *alloc)
&& tail) {
if (job_priority_comparator (head, tail) < 0) {
if (alloc_cancel_alloc_request (alloc, tail) < 0) {
flux_log_error (alloc->ctx->h, "%s: alloc_cancel_alloc_request",
flux_log_error (alloc->ctx->h,
"%s: alloc_cancel_alloc_request",
__FUNCTION__);
return -1;
}
Expand Down Expand Up @@ -708,6 +721,60 @@ static void alloc_query_cb (flux_t *h,
return;
}

static void resource_status_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct job_manager *ctx = arg;
struct alloc *alloc = ctx->alloc;
struct rlist *rl;
json_t *R = NULL;
flux_error_t error;
struct job *job;

if (!(rl = rlist_create ())) {
errprintf (&error, "error creating rlist object");
goto error;
}
job = zhashx_first (alloc->ctx->active_jobs);
while (job) {
if (job->has_resources && job->R_redacted && !job->alloc_bypass) {
struct rlist *rl2;
json_error_t jerror;

if (!(rl2 = rlist_from_json (job->R_redacted, &jerror))) {
errprintf (&error,
"%s: error converting JSON to rlist: %s",
idf58 (job->id),
jerror.text);
goto error;
}
if (rlist_append (rl, rl2) < 0) {
errprintf (&error, "%s: duplicate allocation", idf58 (job->id));
rlist_destroy (rl2);
goto error;
}
rlist_destroy (rl2);
}
job = zhashx_next (alloc->ctx->active_jobs);
}
if (!(R = rlist_to_R (rl))) {
errprintf (&error, "error converting rlist to JSON");
goto error;
}
if (flux_respond_pack (h, msg, "{s:O}", "allocated", R) < 0)
flux_log_error (h, "error responding to resource-status request");
json_decref (R);
rlist_destroy (rl);
return;
error:
if (flux_respond_error (h, msg, EINVAL, error.text) < 0)
flux_log_error (h, "error responding to resource-status request");
json_decref (R);
rlist_destroy (rl);
}

void alloc_disconnect_rpc (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
Expand Down Expand Up @@ -757,6 +824,11 @@ static const struct flux_msg_handler_spec htab[] = {
alloc_query_cb,
FLUX_ROLE_USER,
},
{ FLUX_MSGTYPE_REQUEST,
"job-manager.resource-status",
resource_status_cb,
0
},
{ FLUX_MSGTYPE_RESPONSE,
"sched.alloc",
alloc_response_cb,
Expand Down
4 changes: 3 additions & 1 deletion src/modules/resource/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ libresource_la_SOURCES = \
inventory.c \
inventory.h \
rutil.c \
rutil.h
rutil.h \
status.c \
status.h

TESTS = test_rutil.t

Expand Down
16 changes: 9 additions & 7 deletions src/modules/resource/acquire.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,15 @@ static void cancel_cb (flux_t *h,
*/
void acquire_disconnect (struct acquire *acquire, const flux_msg_t *msg)
{
flux_t *h = acquire->ctx->h;
int count;

if ((count = flux_msglist_disconnect (acquire->requests, msg)) < 0)
flux_log_error (h, "error handling discnonect request");
if (count > 0)
flux_log (h, LOG_DEBUG, "aborted %d resource.acquire(s)", count);
if (acquire) { // acquire is NULL on rank > 0
flux_t *h = acquire->ctx->h;
int count;

if ((count = flux_msglist_disconnect (acquire->requests, msg)) < 0)
flux_log_error (h, "error handling discnonect request");
if (count > 0)
flux_log (h, LOG_DEBUG, "aborted %d resource.acquire(s)", count);
}
}

/* An event was committed to resource.eventlog.
Expand Down
4 changes: 1 addition & 3 deletions src/modules/resource/monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
* ranks in the initial program for the same reason as above.
* - the 'resource.monitor-waitup' RPC allows a test to wait for some number
* of ranks to be up, where "up" is defined as having had an online event
* posted. Thus, after waiting, resource.status (flux resource status)
* should show those ranks up, while sched.resource-status
* (flux resource list command) may still show them down.
* posted.
*/

#if HAVE_CONFIG_H
Expand Down

0 comments on commit fea7f25

Please sign in to comment.