Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker: improve handling of overlay network during shutdown #5883

Merged
merged 5 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion etc/flux.service.in
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ ExecStart=/bin/bash -c '\
SyslogIdentifier=flux
ExecReload=@X_BINDIR@/flux config reload
Restart=always
RestartSec=5s
RestartSec=30s
RestartPreventExitStatus=42
SuccessExitStatus=42
User=flux
Expand Down
32 changes: 21 additions & 11 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@

struct parent parent;

bool shutdown_in_progress; // no new downstream connections permitted
void *bind_zsock; // NULL if no downstream peers
char *bind_uri;
flux_watcher_t *bind_w;
Expand Down Expand Up @@ -776,9 +777,12 @@
&& (child = child_lookup_online (ov, uuid))) {
flux_log (ov->h,
LOG_ERR,
"%s (rank %d) transitioning to LOST due to %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message: till ➡️ still?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed that and another typo. Doing some manual testing now.

"%s (rank %d) transitioning to %s->LOST"
" after %ds due to %s",
flux_get_hostbyrank (ov->h, child->rank),
(int)child->rank,
subtree_status_str (child->status),
(int)(monotime_since (child->status_timestamp) / 1000.0),
"EHOSTUNREACH error on send");
overlay_child_status_update (ov, child, SUBTREE_STATUS_LOST);
}
Expand Down Expand Up @@ -900,7 +904,8 @@
*/
if (type == FLUX_MSGTYPE_REQUEST
&& flux_msg_get_topic (msg, &topic) == 0
&& streq (topic, "overlay.hello")) {
&& streq (topic, "overlay.hello")
&& !ov->shutdown_in_progress) {
hello_request_handler (ov, msg);
}
/* Or one of the following cases occurred that requires (or at least
Expand All @@ -912,6 +917,7 @@
* the DISCONNECT and connectivity has been restored.
* 3) This is a new-to-us peer because *we* restarted without getting
* a message through (e.g. crash)
* 4) A peer said hello while shutdown is in progress
* Control send failures may occur, see flux-framework/flux-core#4464.
* Don't log here, see flux-framework/flux-core#4180.
*/
Expand Down Expand Up @@ -993,12 +999,13 @@
&& flux_msg_get_topic (msg, &topic) == 0
&& streq (topic, "overlay.hello")) {
hello_response_handler (ov, msg);
goto done;
}
else {
else if (type != FLUX_MSGTYPE_CONTROL) {

Check warning on line 1004 in src/broker/overlay.c

View check run for this annotation

Codecov / codecov/patch

src/broker/overlay.c#L1004

Added line #L1004 was not covered by tests
logdrop (ov, OVERLAY_UPSTREAM, msg,
"message received before hello handshake completed");
goto done;

Check warning on line 1007 in src/broker/overlay.c

View check run for this annotation

Codecov / codecov/patch

src/broker/overlay.c#L1007

Added line #L1007 was not covered by tests
}
goto done;
}
switch (type) {
case FLUX_MSGTYPE_RESPONSE:
Expand All @@ -1013,11 +1020,11 @@
flux_msg_route_disable (msg);
break;
case FLUX_MSGTYPE_CONTROL: {
int type, reason;
if (flux_control_decode (msg, &type, &reason) < 0) {
int ctrl_type, reason;
if (flux_control_decode (msg, &ctrl_type, &reason) < 0) {
logdrop (ov, OVERLAY_UPSTREAM, msg, "malformed control");
}
else if (type == CONTROL_DISCONNECT) {
else if (ctrl_type == CONTROL_DISCONNECT) {
flux_log (ov->h, LOG_CRIT,
"%s (rank %lu) sent disconnect control message",
flux_get_hostbyrank (ov->h, ov->parent.rank),
Expand Down Expand Up @@ -1409,11 +1416,14 @@

/* Don't allow downstream peers to reconnect while we are shutting down.
*/
void overlay_shutdown (struct overlay *overlay)
void overlay_shutdown (struct overlay *overlay, bool unbind)
{
if (overlay->bind_zsock && overlay->bind_uri)
if (zmq_unbind (overlay->bind_zsock, overlay->bind_uri) < 0)
flux_log (overlay->h, LOG_ERR, "zmq_unbind failed");
overlay->shutdown_in_progress = true;
if (unbind) {
if (overlay->bind_zsock && overlay->bind_uri)
if (zmq_unbind (overlay->bind_zsock, overlay->bind_uri) < 0)
flux_log (overlay->h, LOG_ERR, "zmq_unbind failed");

Check warning on line 1425 in src/broker/overlay.c

View check run for this annotation

Codecov / codecov/patch

src/broker/overlay.c#L1425

Added line #L1425 was not covered by tests
}
}

/* Call after overlay bootstrap (bind/connect),
Expand Down
5 changes: 4 additions & 1 deletion src/broker/overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ int overlay_set_monitor_cb (struct overlay *ov,
int overlay_register_attrs (struct overlay *overlay);

/* Stop allowing new connections from downstream peers.
* If unbind is false, stop all communication on the socket.
* Otherwise arrange to send a disconnect control message in response
* to all messages.
*/
void overlay_shutdown (struct overlay *overlay);
void overlay_shutdown (struct overlay *overlay, bool unbind);

#endif /* !_BROKER_OVERLAY_H */

Expand Down
12 changes: 11 additions & 1 deletion src/broker/state_machine.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ static void action_run (struct state_machine *s)

static void action_cleanup (struct state_machine *s)
{
/* Prevent new downstream clients from saying hello, but
* let existing ones continue to communiate so they can
* shut down and disconnect.
*/
overlay_shutdown (s->ctx->overlay, false);

if (runat_is_defined (s->ctx->runat, "cleanup")) {
if (runat_start (s->ctx->runat, "cleanup", runat_completion_cb, s) < 0) {
flux_log_error (s->ctx->h, "runat_start cleanup");
Expand All @@ -372,7 +378,11 @@ static void action_cleanup (struct state_machine *s)

static void action_finalize (struct state_machine *s)
{
overlay_shutdown (s->ctx->overlay);
/* Now that all clients have disconnected, finalize all
* downstream communication.
*/
overlay_shutdown (s->ctx->overlay, true);

if (runat_is_defined (s->ctx->runat, "rc3")) {
if (runat_start (s->ctx->runat, "rc3", runat_completion_cb, s) < 0) {
flux_log_error (s->ctx->h, "runat_start rc3");
Expand Down