Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown never stops the server when there is an open sse connection #2787

Open
SpyrosRoum opened this issue Mar 20, 2022 · 4 comments
Labels
C-bug Category: bug. Something is wrong. This is bad!

Comments

@SpyrosRoum
Copy link

Version
hyper v0.14.17
axum v0.4.8

Platform
Linux generation 5.16.15-zen1-1-zen #1 ZEN SMP PREEMPT Thu, 17 Mar 2022 00:30:11 +0000 x86_64 GNU/Linux

Description
When the future passed to Server::with_graceful_shutdown completes, the server won't stop while there is an active sse connection.
I stumbled onto this using axum, so here is hwo to reproduce (essentially axum's sse example + the graceful shutdown example)

use std::{convert::Infallible, time::Duration};

use {
    axum::{
        response::sse::{Event, KeepAlive, Sse},
        routing::get,
        Router,
    },
    futures::stream::{self, Stream},
    tokio::signal,
    tokio_stream::StreamExt as _,
};

#[tokio::main]
async fn main() {
    let app = Router::new().route("/sse", get(sse_handler));

    axum::Server::bind(&"127.0.0.1:3300".parse().unwrap())
        .serve(app.into_make_service())
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::repeat_with(|| Event::default().data("Hi!"))
        .map(Ok)
        .throttle(Duration::from_secs(1));

    Sse::new(stream).keep_alive(KeepAlive::default())
}

async fn shutdown_signal() {
    let ctrl_c = async {
        signal::ctrl_c().await.unwrap();
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }

    println!("Shutting down");
}

Run the server, connect with an sse client (I used curl -N localhost:3300/sse), try pressing Ctrl+c on the terminal running the server. Shutting down gets printed but the server doesn't shutdown until the sse client disconnects by it self. (new connections are not allowed, which is expected)

From my understanding the server keeps running until all connections finish, which is normally fine, but a little problematic when we are talking about sse, since an sse connection might not end at all.
Note that with websocket that issue doesn't exist, the server shuts down normally closing the ws connections.

@SpyrosRoum SpyrosRoum added the C-bug Category: bug. Something is wrong. This is bad! label Mar 20, 2022
@davidpdrsn
Copy link
Member

From my understanding the server keeps running until all connections finish, which is normally fine, but a little problematic when we are talking about sse, since an sse connection might not end at all.

Yeah thats right. Graceful shutdown will stop accepting new connections and wait for all existing connections to close. Which is problematic for infinite streams.

One solution is to wrap your SSE stream such that it ends when it receives a shutdown signal:

async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::repeat_with(|| Event::default().data("hi!"))
        .map(Ok)
        .throttle(Duration::from_secs(1));
    let stream = or_until_shutdown(stream);

    Sse::new(stream)
}

/// Run a stream until it completes or we receive the shutdown signal.
///
/// Uses the `async-stream` to make things easier to write.
fn or_until_shutdown<S>(stream: S) -> impl Stream<Item = S::Item>
where
    S: Stream,
{
    async_stream::stream! {
        futures::pin_mut!(stream);

        let shutdown_signal = shutdown_signal();
        futures::pin_mut!(shutdown_signal);

        loop {
            tokio::select! {
                Some(item) = stream.next() => {
                    yield item
                }
                _ = &mut shutdown_signal => {
                    break;
                }
            }
        }
    }
}

Note that with websocket that issue doesn't exist, the server shuts down normally closing the ws connections.

Thats because when a regular HTTP connection is upgraded to a WebSocket connection it leaves hyper's connection pool and is handed over to whatever WebSocket library your using (axum uses tungstenite). So that means from hyper's perspective the connection is already gone with graceful shutdown is triggered.

@SpyrosRoum
Copy link
Author

I understand why it happens, and the wrapper work around is not to bad, but I'm wondering if this should be handled by hyper (not sure if it's even possible?), or at least make it explicit that it's expected behavior

@davidpdrsn
Copy link
Member

not sure if it's even possible?

Not sure that it is. Hyper doesn't know anything about SSE as it's just a long running response body.

@DoumanAsh
Copy link
Contributor

DoumanAsh commented May 10, 2022

I'm not sure it is all that simple, we have case where we do not use any sort of weird shit like SSE, but occasionally stumble into such problem if server is or was under heavy load.
The problem is that it is not clear what exactly is wrong because we close all clients connected to the server, hence my suspicion is that there is some sort of bug in how hyper determines if it can finally shutdowns (I suspect there might be issue because we also use hyper's Client and connection pooling is buggy)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-bug Category: bug. Something is wrong. This is bad!
Projects
None yet
Development

No branches or pull requests

3 participants