Skip to content

Commit

Permalink
Merge pull request #5883 from garlick/issue#5881
Browse files Browse the repository at this point in the history
broker: improve handling of overlay network during shutdown
  • Loading branch information
mergify[bot] committed Apr 13, 2024
2 parents 4bd465d + 61bb419 commit 5186f08
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 14 deletions.
2 changes: 1 addition & 1 deletion etc/flux.service.in
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ ExecStart=/bin/bash -c '\
SyslogIdentifier=flux
ExecReload=@X_BINDIR@/flux config reload
Restart=always
RestartSec=5s
RestartSec=30s
RestartPreventExitStatus=42
SuccessExitStatus=42
User=flux
Expand Down
32 changes: 21 additions & 11 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ struct overlay {

struct parent parent;

bool shutdown_in_progress; // no new downstream connections permitted
void *bind_zsock; // NULL if no downstream peers
char *bind_uri;
flux_watcher_t *bind_w;
Expand Down Expand Up @@ -776,9 +777,12 @@ static int overlay_sendmsg_child (struct overlay *ov, const flux_msg_t *msg)
&& (child = child_lookup_online (ov, uuid))) {
flux_log (ov->h,
LOG_ERR,
"%s (rank %d) transitioning to LOST due to %s",
"%s (rank %d) transitioning to %s->LOST"
" after %ds due to %s",
flux_get_hostbyrank (ov->h, child->rank),
(int)child->rank,
subtree_status_str (child->status),
(int)(monotime_since (child->status_timestamp) / 1000.0),
"EHOSTUNREACH error on send");
overlay_child_status_update (ov, child, SUBTREE_STATUS_LOST);
}
Expand Down Expand Up @@ -900,7 +904,8 @@ static void child_cb (flux_reactor_t *r,
*/
if (type == FLUX_MSGTYPE_REQUEST
&& flux_msg_get_topic (msg, &topic) == 0
&& streq (topic, "overlay.hello")) {
&& streq (topic, "overlay.hello")
&& !ov->shutdown_in_progress) {
hello_request_handler (ov, msg);
}
/* Or one of the following cases occurred that requires (or at least
Expand All @@ -912,6 +917,7 @@ static void child_cb (flux_reactor_t *r,
* the DISCONNECT and connectivity has been restored.
* 3) This is a new-to-us peer because *we* restarted without getting
* a message through (e.g. crash)
* 4) A peer said hello while shutdown is in progress
* Control send failures may occur, see flux-framework/flux-core#4464.
* Don't log here, see flux-framework/flux-core#4180.
*/
Expand Down Expand Up @@ -993,12 +999,13 @@ static void parent_cb (flux_reactor_t *r,
&& flux_msg_get_topic (msg, &topic) == 0
&& streq (topic, "overlay.hello")) {
hello_response_handler (ov, msg);
goto done;
}
else {
else if (type != FLUX_MSGTYPE_CONTROL) {
logdrop (ov, OVERLAY_UPSTREAM, msg,
"message received before hello handshake completed");
goto done;
}
goto done;
}
switch (type) {
case FLUX_MSGTYPE_RESPONSE:
Expand All @@ -1013,11 +1020,11 @@ static void parent_cb (flux_reactor_t *r,
flux_msg_route_disable (msg);
break;
case FLUX_MSGTYPE_CONTROL: {
int type, reason;
if (flux_control_decode (msg, &type, &reason) < 0) {
int ctrl_type, reason;
if (flux_control_decode (msg, &ctrl_type, &reason) < 0) {
logdrop (ov, OVERLAY_UPSTREAM, msg, "malformed control");
}
else if (type == CONTROL_DISCONNECT) {
else if (ctrl_type == CONTROL_DISCONNECT) {
flux_log (ov->h, LOG_CRIT,
"%s (rank %lu) sent disconnect control message",
flux_get_hostbyrank (ov->h, ov->parent.rank),
Expand Down Expand Up @@ -1409,11 +1416,14 @@ int overlay_bind (struct overlay *ov, const char *uri)

/* Don't allow downstream peers to reconnect while we are shutting down.
*/
void overlay_shutdown (struct overlay *overlay)
void overlay_shutdown (struct overlay *overlay, bool unbind)
{
if (overlay->bind_zsock && overlay->bind_uri)
if (zmq_unbind (overlay->bind_zsock, overlay->bind_uri) < 0)
flux_log (overlay->h, LOG_ERR, "zmq_unbind failed");
overlay->shutdown_in_progress = true;
if (unbind) {
if (overlay->bind_zsock && overlay->bind_uri)
if (zmq_unbind (overlay->bind_zsock, overlay->bind_uri) < 0)
flux_log (overlay->h, LOG_ERR, "zmq_unbind failed");
}
}

/* Call after overlay bootstrap (bind/connect),
Expand Down
5 changes: 4 additions & 1 deletion src/broker/overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ int overlay_set_monitor_cb (struct overlay *ov,
int overlay_register_attrs (struct overlay *overlay);

/* Stop allowing new connections from downstream peers.
* If unbind is false, stop all communication on the socket.
* Otherwise arrange to send a disconnect control message in response
* to all messages.
*/
void overlay_shutdown (struct overlay *overlay);
void overlay_shutdown (struct overlay *overlay, bool unbind);

#endif /* !_BROKER_OVERLAY_H */

Expand Down
12 changes: 11 additions & 1 deletion src/broker/state_machine.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ static void action_run (struct state_machine *s)

static void action_cleanup (struct state_machine *s)
{
/* Prevent new downstream clients from saying hello, but
* let existing ones continue to communiate so they can
* shut down and disconnect.
*/
overlay_shutdown (s->ctx->overlay, false);

if (runat_is_defined (s->ctx->runat, "cleanup")) {
if (runat_start (s->ctx->runat, "cleanup", runat_completion_cb, s) < 0) {
flux_log_error (s->ctx->h, "runat_start cleanup");
Expand All @@ -372,7 +378,11 @@ static void action_cleanup (struct state_machine *s)

static void action_finalize (struct state_machine *s)
{
overlay_shutdown (s->ctx->overlay);
/* Now that all clients have disconnected, finalize all
* downstream communication.
*/
overlay_shutdown (s->ctx->overlay, true);

if (runat_is_defined (s->ctx->runat, "rc3")) {
if (runat_start (s->ctx->runat, "rc3", runat_completion_cb, s) < 0) {
flux_log_error (s->ctx->h, "runat_start rc3");
Expand Down

0 comments on commit 5186f08

Please sign in to comment.