Skip to content

Commit

Permalink
Merge pull request #5834 from grondo/resource-list-fallback
Browse files Browse the repository at this point in the history
support fallback to the old `sched.resource-status` RPC for tools that use the new `resource.sched-status` RPC
  • Loading branch information
mergify[bot] committed Mar 27, 2024
2 parents 93a6dfc + 9759161 commit 76d0fac
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 14 deletions.
2 changes: 1 addition & 1 deletion doc/man7/flux-environment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ components or writing tests.
commands to resolve URIs to local form. This is useful in test environments
where the remote connector does not work.

.. envvar:: FLUX_RESOURCE_STATUS_RPC
.. envvar:: FLUX_RESOURCE_LIST_RPC

If set, :man1:`flux-resource` uses the specified RPC topic string instead
of ``resource.sched-status``. This is used in test to verify that the
Expand Down
4 changes: 2 additions & 2 deletions src/bindings/python/flux/job/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ def __init__(self, uri=None):
if not uri or SchedResourceList is None:
raise ValueError
handle = flux.Flux(str(uri))
future = handle.rpc("resource.sched-status", nodeid=0)
future = flux.resource.resource_list(handle)
self.stats = StatsInfo(handle).update_sync()
self.resources = SchedResourceList(future.get())
self.resources = future.get()
self.initialized = True
return
except (ValueError, OSError, FileNotFoundError):
Expand Down
32 changes: 28 additions & 4 deletions src/bindings/python/flux/resource/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
# SPDX-License-Identifier: LGPL-3.0
###############################################################

import errno
import json
import os

from flux.future import FutureExt
from flux.idset import IDset
from flux.memoized_property import memoized_property
from flux.resource import ResourceSet
from flux.rpc import RPC


class SchedResourceList:
Expand Down Expand Up @@ -80,9 +82,31 @@ def free(self):
return res


class ResourceListRPC(RPC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class ResourceListRPC(FutureExt):
def __init__(self, flux_handle, topic, nodeid=0):
self.topic = topic
super().__init__(self._init_cb, flux_handle=flux_handle)

def _init_cb(self, future):
future.get_flux().rpc(self.topic, nodeid=0).then(self._list_cb)

def _list_cb(self, future):
try:
self.fulfill(future.get())
except Exception as exc:
if exc.errno == errno.ENOSYS:
# Fall back to sched.resource-status:
future.get_flux().rpc("sched.resource-status", nodeid=0).then(
self._fallback_cb
)
else:
self.fulfill_error(exc.errno, exc.strerror)

def _fallback_cb(self, future):
try:
self.fulfill(json.loads(future.get_str()))
except OSError as exc:
self.fulfill_error(exc.errno, exc.strerror)

def get(self):
"""Return a SchedResourceList corresponding to the request.
Expand Down
63 changes: 56 additions & 7 deletions src/cmd/top/summary_pane.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include "top.h"

#define DEFAULT_RESOURCE_LIST_RPC "resource.sched-status"

static const struct dimension win_dim = { 0, 0, 80, 6 };
static const struct dimension level_dim = { 0, 0, 2, 1 };
static const struct dimension title_dim = { 6, 0, 73, 1 };
Expand Down Expand Up @@ -63,6 +65,7 @@ struct summary_pane {
uid_t owner;
bool show_details;
const char *instance_version;
const char *resource_rpc;
double expiration;
struct stats stats;
struct resource_count node;
Expand Down Expand Up @@ -408,7 +411,7 @@ static void resource_continuation (flux_future_t *f, void *arg)

if (flux_rpc_get_unpack (f, "o", &o) < 0) {
if (errno != ENOSYS) /* Instance may not be up yet */
fatal (errno, "resource.sched-status RPC failed");
fatal (errno, "%s RPC failed", sum->resource_rpc);
}
else {
json_t *queue_constraint;
Expand All @@ -432,7 +435,7 @@ static void resource_continuation (flux_future_t *f, void *arg)
&sum->core.down,
&sum->gpu.down,
queue_constraint) < 0)
fatal (0, "error decoding resource.sched-status RPC response");
fatal (0, "error decoding %s RPC response", sum->resource_rpc);
}
flux_future_destroy (f);
sum->f_resource = NULL;
Expand Down Expand Up @@ -532,17 +535,60 @@ void summary_pane_heartbeat (struct summary_pane *sum)
flux_watcher_start (sum->heartblink);
}

static void resource_retry_cb (flux_future_t *f, void *arg)
{
flux_future_t *result = arg;
flux_future_fulfill_with (result, f);
flux_future_destroy (f);
}

static void resource_enosys_check_cb (flux_future_t *f, void *arg)
{
flux_t *h = flux_future_get_flux (f);
flux_future_t *fretry = NULL;
flux_future_t *result = arg;

if (flux_future_get (f, NULL) == 0 || errno != ENOSYS) {
flux_future_fulfill_with (result, f);
return;
}
/* The RPC failed with ENOSYS. Retry with sched.resource-status:
*/
if (!(fretry = flux_rpc (h, "sched.resource-status", NULL, 0, 0))
|| flux_future_then (fretry, -1., resource_retry_cb, result) < 0) {
flux_future_fulfill_error (result, errno, NULL);
flux_future_destroy (fretry);
}
}

flux_future_t *resource_sched_status (struct summary_pane *sum)
{
flux_future_t *result = NULL;
flux_future_t *f = NULL;

/* Create empty future to contain result from either the default resource
* list topic string or the sched.resource-status RPC:
*/
if (!(result = flux_future_create (NULL, NULL))
|| !(f = flux_rpc (sum->top->h, sum->resource_rpc, NULL, 0, 0))
|| flux_future_then (f, -1., resource_enosys_check_cb, result) < 0)
goto error;

flux_future_set_flux (result, sum->top->h);
return result;
error:
flux_future_destroy (result);
flux_future_destroy (f);
return NULL;
}

/* Send a query.
* If there's already one pending, do nothing.
*/
void summary_pane_query (struct summary_pane *sum)
{
if (!sum->f_resource) {
if (!(sum->f_resource = flux_rpc (sum->top->h,
"resource.sched-status",
NULL,
0,
0))
if (!(sum->f_resource = resource_sched_status (sum))
|| flux_future_then (sum->f_resource,
-1,
resource_continuation,
Expand Down Expand Up @@ -607,6 +653,9 @@ struct summary_pane *summary_pane_create (struct top *top)
if (sum->owner == getuid ())
sum->show_details = true;

if (!(sum->resource_rpc = getenv ("FLUX_RESOURCE_LIST_RPC")))
sum->resource_rpc = DEFAULT_RESOURCE_LIST_RPC;

summary_pane_query (sum);
summary_pane_draw (sum);
summary_pane_refresh (sum);
Expand Down
11 changes: 11 additions & 0 deletions t/t2350-resource-list.t
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ test_expect_success 'FLUX_RESOURCE_LIST_RPC works' '
test_expect_success 'results are the same as before' '
test_cmp default.out sched.out
'
test_expect_success 'make the same query with fallback to sched.resource-status' '
FLUX_RESOURCE_LIST_RPC=foo.status \
FLUX_HANDLE_TRACE=1 \
flux resource list >fallback.out 2>fallback.err
'
test_expect_success 'fallback works' '
grep sched.resource-status fallback.err
'
test_expect_success 'results are the same as before' '
test_cmp default.out fallback.out
'
test_expect_success 'flux resource list works on follower ranks' '
flux exec -r 1 flux resource list >follower.out
'
Expand Down
7 changes: 7 additions & 0 deletions t/t2801-top-cmd.t
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ test_expect_success 'flux-top summary shows no jobs initially' '
grep "0 running" nojobs.out &&
grep "0 failed" nojobs.out
'
test_expect_success 'flux-top falls back to sched.resource-status' '
FLUX_RESOURCE_LIST_RPC=foo.status \
$runpty \
flux top --test-exit --test-exit-dump=fallback.out >/dev/null &&
grep "nodes 0/${nnodes}" nojobs.out &&
grep "cores 0/${ncores}" nojobs.out
'
# Note: jpXCZedGfVQ is the base58 representation of FLUX_JOBID_ANY. We
# grep for this value without f or ƒ in case build environment influences
# presence of one of the other.
Expand Down

0 comments on commit 76d0fac

Please sign in to comment.