Skip to content

Commit

Permalink
Merge pull request #3207 from garlick/system_inst
Browse files Browse the repository at this point in the history
resource: mark nodes down when they are stopped
  • Loading branch information
mergify[bot] committed Sep 16, 2020
2 parents 09998a3 + 4470ed8 commit 0f18d03
Show file tree
Hide file tree
Showing 21 changed files with 592 additions and 261 deletions.
1 change: 1 addition & 0 deletions etc/flux.service.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ TimeoutStopSec=90
KillMode=mixed
ExecStart=@X_BINDIR@/flux broker \
--config-path=@X_SYSCONFDIR@/flux/system/conf.d \
--k-ary=256 \
-Srundir=@X_RUNSTATEDIR@/flux \
-Slocal-uri=local://@X_RUNSTATEDIR@/flux/local \
-Slog-stderr-level=6 \
Expand Down
2 changes: 1 addition & 1 deletion etc/rc1
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ modload all aggregator
modload all kvs
modload all kvs-watch

modload 0 resource
modload all resource
modload 0 job-info
modload 0 cron sync=hb
modload 0 job-manager
Expand Down
2 changes: 1 addition & 1 deletion etc/rc3
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ done
shopt -u nullglob

modrm 0 sched-simple
modrm 0 resource
modrm all resource
modrm 0 job-exec
modrm 0 job-manager
modrm all job-ingest
Expand Down
128 changes: 34 additions & 94 deletions src/broker/state_machine.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
static const double quorum_batch_timeout = 0.1;

struct quorum {
zlist_t *requests;

struct idset *want;
struct idset *have; // cumulative on rank 0, batch buffer on rank > 0
flux_watcher_t *batch_timer;
Expand Down Expand Up @@ -92,10 +90,6 @@ static void monitor_update (flux_t *h, zlist_t *requests, broker_state_t state);
static void join_check_parent (struct state_machine *s);
static int quorum_add_self (struct state_machine *s);
static void quorum_check_parent (struct state_machine *s);
static bool test_idset_subset (struct idset *idset1, struct idset *idset2);
static void quorum_monitor_update (flux_t *h,
zlist_t *requests,
struct idset *idset);
static void run_check_parent (struct state_machine *s);

static struct state statetab[] = {
Expand Down Expand Up @@ -196,9 +190,8 @@ static void action_quorum (struct state_machine *s)
return;
}
if (s->ctx->rank == 0) {
if (test_idset_subset (s->quorum.want, s->quorum.have))
if (idset_equal (s->quorum.want, s->quorum.have))
state_machine_post (s, "quorum-full");
quorum_monitor_update (s->ctx->h, s->quorum.requests, s->quorum.have);
}
else
quorum_check_parent (s);
Expand Down Expand Up @@ -507,20 +500,6 @@ static int merge_idset (struct idset *idset1, const char *s)
return rc;
}

/* Return true if idset2 is a subset of idset1.
*/
static bool test_idset_subset (struct idset *idset1, struct idset *idset2)
{
unsigned int id;
id = idset_first (idset1);
while (id != IDSET_INVALID_ID) {
if (!idset_test (idset2, id))
return false;
id = idset_next (idset1, id);
}
return true;
}

/* Assumes local state is STATE_QUORUM.
* If parent has left STATE_QUORUM, post quorum-full or quorum-fail.
*/
Expand Down Expand Up @@ -561,9 +540,8 @@ static void quorum_batch (flux_reactor_t *r,

if (s->ctx->rank == 0) {
if (s->state == STATE_QUORUM
&& test_idset_subset (s->quorum.want, s->quorum.have))
&& idset_equal (s->quorum.want, s->quorum.have))
state_machine_post (s, "quorum-full");
quorum_monitor_update (s->ctx->h, s->quorum.requests, s->quorum.have);
}
else {
flux_future_t *f;
Expand Down Expand Up @@ -624,31 +602,33 @@ static void quorum_cb (flux_t *h,
flux_log_error (s->ctx->h, "error handling quorum request");
}

/* Add this broker rank to quorum.
* On rank == 0, add my rank directly to the 'got' idset.
/* Add this broker rank to quorum, if its rank is in the 'want' set.
* On rank == 0, add my rank directly to the 'have' idset.
* On rank > 0, send a fire-and-forget RPC to parent containing my rank.
*/
static int quorum_add_self (struct state_machine *s)
{
if (s->ctx->rank == 0) {
if (idset_set (s->quorum.have, s->ctx->rank) < 0)
return -1;
}
else {
flux_future_t *f;
flux_log (s->ctx->h,
LOG_DEBUG,
"quorum send %lu",
(long unsigned)s->ctx->rank);
if (!(f = flux_rpc_pack (s->ctx->h,
"state-machine.quorum",
FLUX_NODEID_UPSTREAM,
FLUX_RPC_NORESPONSE,
"{s:i}",
"rank",
s->ctx->rank)))
return -1;
flux_future_destroy (f);
if (idset_test (s->quorum.want, s->ctx->rank)) {
if (s->ctx->rank == 0) {
if (idset_set (s->quorum.have, s->ctx->rank) < 0)
return -1;
}
else {
flux_future_t *f;
flux_log (s->ctx->h,
LOG_DEBUG,
"quorum send %lu",
(long unsigned)s->ctx->rank);
if (!(f = flux_rpc_pack (s->ctx->h,
"state-machine.quorum",
FLUX_NODEID_UPSTREAM,
FLUX_RPC_NORESPONSE,
"{s:i}",
"rank",
s->ctx->rank)))
return -1;
flux_future_destroy (f);
}
}
return 0;
}
Expand Down Expand Up @@ -708,34 +688,10 @@ static flux_watcher_t *quorum_create_batch_timer (struct state_machine *s)
s);
}

/* (rank 0 only) Update any queued 'state-machine.quorum-monitor' requests.
*
*/
static void quorum_monitor_update (flux_t *h,
zlist_t *requests,
struct idset *idset)
{
const flux_msg_t *msg;
char *tmp;

if (!(tmp = idset_encode (idset,
IDSET_FLAG_RANGE | IDSET_FLAG_BRACKETS))) {
flux_log_error (h, "error responding to quorum-monitor requests");
return;
}
msg = zlist_first (requests);
while (msg) {
if (flux_respond_pack (h, msg, "{s:s}", "idset", tmp) < 0)
flux_log_error (h, "error responding to quorum-monitor request");
msg = zlist_next (requests);
}
free (tmp);
}

static void quorum_monitor_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
static void quorum_get_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct state_machine *s = arg;
const char *errstr = NULL;
Expand All @@ -744,7 +700,7 @@ static void quorum_monitor_cb (flux_t *h,
if (flux_request_decode (msg, NULL, NULL) < 0)
goto error;
if (s->ctx->rank != 0) {
errstr = "quorum-monitor RPC is only available on rank 0";
errstr = "quorum-get RPC is only available on rank 0";
errno = EPROTO;
goto error;
}
Expand All @@ -753,19 +709,11 @@ static void quorum_monitor_cb (flux_t *h,
goto error;
if (flux_respond_pack (h, msg, "{s:s}", "idset", tmp) < 0)
goto error;
if (flux_msg_is_streaming (msg)) {
if (zlist_append (s->quorum.requests,
(flux_msg_t *)flux_msg_incref (msg)) < 0) {
flux_msg_decref (msg);
errno = ENOMEM;
goto error;
}
}
free (tmp);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to quorum-monitor request");
flux_log_error (h, "error responding to quorum-get request");
free (tmp);
}

Expand Down Expand Up @@ -903,10 +851,8 @@ static bool msglist_drop_sender (zlist_t *l, const char *sender)
return false;
}

/* If a disconnect is received for streaming monitor or quorum-monitor
* requests, drop the request. Example use cases:
* - monitor module unloads while still watching quorum-monitor
* - tests (potentially)
/* If a disconnect is received for streaming monitor request,
* drop the request.
*/
static void disconnect_cb (flux_t *h,
flux_msg_handler_t *mh,
Expand All @@ -918,8 +864,6 @@ static void disconnect_cb (flux_t *h,
int count = 0;

if (flux_msg_get_route_first (msg, &sender) == 0) {
while (msglist_drop_sender (s->quorum.requests, sender))
count++;
while (msglist_drop_sender (s->monitor.requests, sender))
count++;
free (sender);
Expand All @@ -935,8 +879,7 @@ static void disconnect_cb (flux_t *h,
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "state-machine.monitor", monitor_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "state-machine.quorum", quorum_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "state-machine.quorum-monitor",
quorum_monitor_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "state-machine.quorum-get", quorum_get_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "state-machine.disconnect", disconnect_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};
Expand All @@ -962,7 +905,6 @@ void state_machine_destroy (struct state_machine *s)
flux_msg_handler_delvec (s->handlers);
flux_future_destroy (s->monitor.f);
msglist_destroy (s->monitor.requests);
msglist_destroy (s->quorum.requests);
idset_destroy (s->quorum.want);
idset_destroy (s->quorum.have);
flux_watcher_destroy (s->quorum.batch_timer);
Expand Down Expand Up @@ -999,8 +941,6 @@ struct state_machine *state_machine_create (struct broker *ctx)
if (!(s->monitor.f = monitor_parent (ctx->h, s)))
goto error;
}
if (!(s->quorum.requests = zlist_new ()))
goto nomem;
if (!(s->quorum.have = idset_create (ctx->size, 0)))
goto error;
if (quorum_configure (s) < 0)
Expand Down
5 changes: 3 additions & 2 deletions src/modules/resource/acquire.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ static void reslog_cb (struct reslog *reslog, const char *name, void *arg)
}

static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, MODULE_NAME ".acquire", acquire_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, MODULE_NAME ".acquire-cancel", cancel_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "resource.acquire", acquire_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "resource.acquire-cancel", cancel_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -379,6 +379,7 @@ void acquire_destroy (struct acquire *acquire)
if (acquire) {
int saved_errno = errno;
flux_msg_handler_delvec (acquire->handlers);
reslog_set_callback (acquire->ctx->reslog, NULL, NULL);
if (acquire->request) {
if (flux_respond_error (acquire->ctx->h,
acquire->request->msg,
Expand Down
1 change: 1 addition & 0 deletions src/modules/resource/discover.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ void discover_destroy (struct discover *discover)
{
if (discover) {
int saved_errno = errno;
monitor_set_callback (discover->ctx->monitor, NULL, NULL);
flux_subprocess_destroy (discover->p);
flux_future_destroy (discover->f);
flux_msg_handler_delvec (discover->handlers);
Expand Down
4 changes: 2 additions & 2 deletions src/modules/resource/drain.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ static int replay_eventlog (struct drain *drain, const json_t *eventlog)
}

static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, MODULE_NAME ".drain", drain_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, MODULE_NAME ".undrain", undrain_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "resource.drain", drain_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "resource.undrain", undrain_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down

0 comments on commit 0f18d03

Please sign in to comment.