Skip to content

Commit

Permalink
Merge pull request #5914 from garlick/issue#4791
Browse files Browse the repository at this point in the history
cleanup resource.eventlog and handle remapped ranks
  • Loading branch information
mergify[bot] committed May 2, 2024
2 parents 1c299b9 + e3fc18f commit 86d905f
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 202 deletions.
270 changes: 189 additions & 81 deletions src/modules/resource/drain.c

Large diffs are not rendered by default.

5 changes: 0 additions & 5 deletions src/modules/resource/drain.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ json_t *drain_get_info (struct drain *drain);
*/
int drain_rank (struct drain *drain, uint32_t rank, const char *reason);

/* Undrain 'ranks'. Call this on rank 0 only, otherwise use resource.drain RPC
* It is not an error if any rank in 'ranks' is already drained.
*/
int undrain_ranks (struct drain *drain, const struct idset *ranks);

#endif /* !_FLUX_RESOURCE_DRAIN_H */

/*
Expand Down
2 changes: 2 additions & 0 deletions src/modules/resource/inventory.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ static int inventory_put_finalize (struct inventory *inv)
NULL,
0.,
"resource-define",
0,
"{s:s}",
"method",
method) < 0) {
Expand Down Expand Up @@ -421,6 +422,7 @@ static void inventory_put_update_cb (flux_future_t *f, void *arg)
NULL,
0.,
"resource-update",
0,
"{s:f}",
"expiration",
expiration) < 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/modules/resource/monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ static void broker_online_cb (flux_future_t *f, void *arg)
NULL,
0.,
"online",
EVENT_NO_COMMIT,
"{s:s}",
"idset",
online) < 0) {
Expand All @@ -124,6 +125,7 @@ static void broker_online_cb (flux_future_t *f, void *arg)
NULL,
0.,
"offline",
EVENT_NO_COMMIT,
"{s:s}",
"idset",
offline) < 0) {
Expand Down
50 changes: 33 additions & 17 deletions src/modules/resource/reslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ static const char *auxkey = "flux::event_info";
*/
static void notify_callback (struct reslog *reslog, json_t *event)
{
if (reslog->cb) {
const char *name;
json_t *context;
const char *name;
json_t *context;

if (reslog->cb) {
if (json_unpack (event,
"{s:s s:o}",
"name", &name,
Expand Down Expand Up @@ -78,7 +78,7 @@ int post_handler (struct reslog *reslog, flux_future_t *f)
struct event_info *info = flux_future_aux_get (f, auxkey);
int rc;

if ((rc = flux_rpc_get (f, NULL)) < 0) {
if ((rc = flux_future_get (f, NULL)) < 0) {
flux_log_error (reslog->h, "committing to %s", RESLOG_KEY);
if (info->msg) {
if (flux_respond_error (reslog->h, info->msg, errno, NULL) < 0)
Expand All @@ -96,6 +96,12 @@ int post_handler (struct reslog *reslog, flux_future_t *f)
done:
zlist_remove (reslog->pending, f);
flux_future_destroy (f);

if ((f = zlist_first (reslog->pending))
&& (info = flux_future_aux_get (f, auxkey))
&& info->msg == NULL)
flux_future_fulfill (f, NULL, NULL);

return rc;
}

Expand All @@ -120,6 +126,7 @@ int reslog_post_pack (struct reslog *reslog,
const flux_msg_t *request,
double timestamp,
const char *name,
int flags,
const char *fmt,
...)
{
Expand All @@ -136,14 +143,23 @@ int reslog_post_pack (struct reslog *reslog,

if (!event)
return -1;
if (!(val = eventlog_entry_encode (event)))
goto error;
if (!(txn = flux_kvs_txn_create ()))
goto error;
if (flux_kvs_txn_put (txn, FLUX_KVS_APPEND, RESLOG_KEY, val) < 0)
goto error;
if (!(f = flux_kvs_commit (reslog->h, NULL, 0, txn)))
goto error;
if ((flags & EVENT_NO_COMMIT)) {
if (!(f = flux_future_create (NULL, NULL)))
goto error;
flux_future_set_flux (f, reslog->h);
if (zlist_size (reslog->pending) == 0)
flux_future_fulfill (f, NULL, NULL);
}
else {
if (!(val = eventlog_entry_encode (event)))
goto error;
if (!(txn = flux_kvs_txn_create ()))
goto error;
if (flux_kvs_txn_put (txn, FLUX_KVS_APPEND, RESLOG_KEY, val) < 0)
goto error;
if (!(f = flux_kvs_commit (reslog->h, NULL, 0, txn)))
goto error;
}
if (!(info = event_info_create (event, request)))
goto error;
if (flux_future_aux_set (f,
Expand All @@ -155,14 +171,14 @@ int reslog_post_pack (struct reslog *reslog,
}
if (flux_future_then (f, -1, post_continuation, reslog) < 0)
goto error;
if (zlist_append (reslog->pending, f) < 0) {
errno = ENOMEM;
goto error;
}
flux_kvs_txn_destroy (txn);
if (zlist_append (reslog->pending, f) < 0)
goto nomem;
free (val);
flux_kvs_txn_destroy (txn);
json_decref (event);
return 0;
nomem:
errno = ENOMEM;
error:
flux_future_destroy (f);
flux_kvs_txn_destroy (txn);
Expand Down
5 changes: 5 additions & 0 deletions src/modules/resource/reslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

struct reslog;

enum reslog_flags {
EVENT_NO_COMMIT = 1,
};

typedef void (*reslog_cb_f)(struct reslog *reslog,
const char *name,
json_t *context,
Expand All @@ -30,6 +34,7 @@ int reslog_post_pack (struct reslog *reslog,
const flux_msg_t *request,
double timestamp,
const char *name,
int flags,
const char *fmt,
...);

Expand Down
72 changes: 1 addition & 71 deletions src/modules/resource/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,65 +253,6 @@ static const struct flux_msg_handler_spec htab[] = {
FLUX_MSGHANDLER_TABLE_END,
};

/* Post 'resource-init' event that summarizes the current monitor,
* drain, and exclude state. For replay purposes, all events prior to the
* most recent 'resource-init' can be ignored.
*/
int post_restart_event (struct resource_ctx *ctx, int restart)
{
json_t *o;
json_t *drain;

if (!(drain = drain_get_info (ctx->drain)))
return -1;
if (!(o = json_pack ("{s:b s:o}", "restart", restart, "drain", drain))) {
json_decref (drain);
goto nomem;
}
if (rutil_set_json_idset (o, "online", monitor_get_up (ctx->monitor)) < 0)
goto error;
if (rutil_set_json_idset (o, "exclude", exclude_get (ctx->exclude)) < 0)
goto error;
if (reslog_post_pack (ctx->reslog,
NULL,
0.,
"resource-init",
"O",
o) < 0)
goto error;
json_decref (o);
return 0;
nomem:
errno = ENOMEM;
error:
ERRNO_SAFE_WRAP (json_decref, o);
return -1;
}

/* Remove entries prior to the most recent 'resource-init' event from
* 'eventlog'. N.B. they remain in the KVS.
*/
static int prune_eventlog (json_t *eventlog)
{
size_t index;
json_t *entry;
size_t last_entry = json_array_size (eventlog);
const char *name;

json_array_foreach (eventlog, index, entry) {
if (eventlog_entry_parse (entry, NULL, &name, NULL) == 0
&& streq (name, "resource-init"))
last_entry = index;
}
if (last_entry < json_array_size (eventlog)) {
for (index = 0; index < last_entry; index++) {
if (json_array_remove (eventlog, 0) < 0)
return -1;
}
}
return 0;
}

/* Synchronously read resource.eventlog, and parse into
* a JSON array for replay by the various subsystems.
* 'eventlog' is set to NULL if it doesn't exist (no error).
Expand All @@ -333,12 +274,7 @@ static int reload_eventlog (flux_t *h, json_t **eventlog)
}
else {
if (!(o = eventlog_decode (s))) {
flux_log_error (h, "%s: decode error", RESLOG_KEY);
goto error;
}
if (prune_eventlog (o) < 0) {
flux_log (h, LOG_ERR, "%s: pruning error", RESLOG_KEY);
ERRNO_SAFE_WRAP (json_decref, o);
flux_log (h, LOG_ERR, "%s: decode error", RESLOG_KEY);
goto error;
}
}
Expand Down Expand Up @@ -458,12 +394,6 @@ int mod_main (flux_t *h, int argc, char **argv)
goto error;
if (!(ctx->status = status_create (ctx)))
goto error;
if (ctx->rank == 0) {
if (post_restart_event (ctx, eventlog ? 1 : 0) < 0)
goto error;
if (reslog_sync (ctx->reslog) < 0)
goto error;
}
if (flux_msg_handler_addvec (h, htab, ctx, &ctx->handlers) < 0)
goto error;
if (flux_reactor_run (flux_get_reactor (h), 0) < 0) {
Expand Down
13 changes: 0 additions & 13 deletions t/t2310-resource-module.t
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,6 @@ test_expect_success 'resource.eventlog exists' '
flux kvs eventlog get -u resource.eventlog >eventlog.out
'

test_expect_success 'resource-init context says restart=false' '
test "$(grep_event resource-init <eventlog.out|jq .restart)" = "false"
'

test_expect_success 'resource-init context says online is empty set' '
test "$(grep_event resource-init <eventlog.out|jq .online)" = "\"\""
'

test_expect_success 'wait until resource-define event is posted' '
flux kvs eventlog wait-event -t 5 resource.eventlog resource-define
'
Expand All @@ -88,11 +80,6 @@ test_expect_success 'reload resource module and re-capture eventlog' '
tail -$(($post-$pre)) restart.out > post_restart.out
'

test_expect_success 'new resource-init context says restart=true' '
test "$(grep_event resource-init <post_restart.out \
|jq .restart)" = "true"
'

test_expect_success 'reconfig with extra key fails' '
cat >resource.toml <<-EOT &&
[resource]
Expand Down

0 comments on commit 86d905f

Please sign in to comment.