Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server: Provide a closed connection notification #707

Closed
gyscos opened this issue Dec 22, 2015 · 13 comments
Closed

Server: Provide a closed connection notification #707

gyscos opened this issue Dec 22, 2015 · 13 comments

Comments

@gyscos
Copy link

gyscos commented Dec 22, 2015

Similar to what golang provides with CloseNotifier, when a request takes a while to process (or is just waiting/long-polling), it is often convenient to detect when the client closed the connection.
I did not find a way to get this information with hyper (maybe I just missed it?).

@gyscos gyscos changed the title Provide a closed connection notification Server: Provide a closed connection notification Dec 22, 2015
@seanmonstar
Copy link
Member

While hyper uses std::net, the blocking io is used. Without select, the
only way to know a Connection was closed is to try to write to it (or read,
which will block until keep alive notices it's dead).

So you can only either read or write on it to determine if it's closed.

On Tue, Dec 22, 2015, 1:42 PM Alexandre Bury notifications@github.com
wrote:

Similar to what golang provides with CloseNotifier
https://golang.org/pkg/net/http/#CloseNotifier, when a request takes a
while to process (or is just waiting/long-polling), it is often convenient
to detect when the client closed the connection.
I did not find a way to get this information with hyper (maybe I just
missed it?).


Reply to this email directly or view it on GitHub
#707.

@gyscos
Copy link
Author

gyscos commented Dec 23, 2015

I see. That's actually the way golang implement it, by spawning a thread (well, a goroutine) to try to read from the body, and notify when the call completes.

I guess channels in rust are not as widespread as in golang, making an idiomatic API for this trickier (though perhaps using std::sync::CondVar?...), and it'd also probably add the need for an Arc in there... :-/

@gyscos
Copy link
Author

gyscos commented Jan 1, 2016

Also, I tried this simple program:

extern crate hyper;

use std::io::Read;

use hyper::Server;
use hyper::server::Request;
use hyper::server::Response;

fn main() {
    Server::http("127.0.0.1:3000").unwrap().handle(|mut req: Request, _: Response| {
        let mut b = [0];
        // Attempt to block until the client disconnects
        println!("{}", req.read(&mut b).unwrap_or(42));
        println!("{:?}", b);
    });
}

And them I simply run curl http://localhost:3000

Unfortunately, the read comand returns immediately here (it returns 0 and writes nothing to b), and does not wait for the client to cancel the connection.

Trying to write is not always an option, it would be really nice to have this working (and golang also uses a read attempt, so it should be possible?). Maybe hyper does something on the connection?

Edit: indeed, a Request.body is a SizedReader or an EmptyReader: request.rs. And those don't forward the request to the underlying connection most of the time: h1.rs.

@seanmonstar
Copy link
Member

If the request has no body, then a read will return EOF.

@gyscos
Copy link
Author

gyscos commented Jan 2, 2016

Yes, so in the current condition it doesn't look possible to detect cancellation of a simple GET request?
The fact that it works in golang makes me believe it would work if reading on the underlying socket, but hyper does not expose that.

@seanmonstar
Copy link
Member

That presents a different problem though, since std::net uses blocking IO. If hyper let you try to read on the socket after having read all the declared request body, the read would block until 1) the client sent more data, or 2) the client disconnects. If it's a GET request, the client does not expect to send any more data, so you would block yourself until the client timed out. No further code would execute in that thread.

You might be able to get something if you duplicated the socket, put it in another thread, and let it readblock there...

Either way, the move to async IO should help, since epoll gives notifications when the client hangs up.

@gyscos
Copy link
Author

gyscos commented Jan 2, 2016

Exactly, the blocking read in another thread is what golang's standard library does to detect client disconnection. It also uses blocking IO, just like here.

Async IO is another solution to this problem, but I'm not sure it's ready just yet.

@seanmonstar
Copy link
Member

The problem is you would need to be sure that no other data would be coming from the connection. Otherwise, the other thread would get it, instead of your main thread.

You could do this yourself, for now, anyways:

let mut tcp = req.downcast_ref::<hyper::net::HttpStream>().unwrap().0.try_clone().unwrap();
let (tx, rx) = std::sync::mspc::channel();
thread::spawn(move || {
    tx.send(tcp.read(&[0u8]))
});

// else where
match rx.try_recv() {
    Ok(Ok(0)) => eof(),
    Ok(Err(io_error)) => error(),
    Err(TryRecvError::Disconnected) => thread_is_dead(),
    Err(TryRecvError::Empty) => still_connected()
}

@gyscos
Copy link
Author

gyscos commented Jan 2, 2016

Golang implements it by actually copying from the original socket to the one visible by the user (they use a io.Pipe for that) - it also means they keep reading until EOF is found. That way, the user still has access to the data in the body , and the library can detect when the socket is closed.

It should be doable using downcast_ref indeed, thanks. Now onto exposing this from iron...

@nwtgck
Copy link

nwtgck commented Sep 2, 2020

@seanmonstar Hi! Can we detect client close in server side in the current version 0.13.x?

@DCjanus
Copy link

DCjanus commented Apr 22, 2021

ping
It's been 6 years, and with tokio::select and async/await, is there anything changed, and if someone want to help, I wish there would be some guide.

@sharksforarms
Copy link

Older issue, but found a solution which works for me so figured I'd share.

This solution involves implementing Accept ourselves, so that we can return our own ClientConnection , which we implement Drop on. The Drop implementation contains the signaling (over a CancellationToken in this case) to get a notification in the spawned task of the client connection drop.

tokio = { version = "1.28.2", features = ["full"] }
hyper = { version = "0.14.26", features = ["full"] }
tokio-util = "0.7.8"
futures-util = "0.3.28"
use hyper::{
    body::Bytes,
    server::accept::Accept,
    service::{make_service_fn, service_fn},
    Body, Request, Response, StatusCode,
};
use std::{
    convert::Infallible,
    net::SocketAddr,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};
use tokio::{
    io::{AsyncRead, AsyncWrite, ReadBuf},
    net::{TcpListener, TcpStream},
};
use tokio_util::sync::CancellationToken;

async fn handle(
    _req: Request<Body>,
    client_connection_cancel: CancellationToken,
) -> Result<Response<Body>, hyper::http::Error> {
    let (mut tx, rx) = Body::channel();

    // spawn background task, end when client connection is dropped
    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            tokio::select! {
                _ = client_connection_cancel.cancelled() => {
                    println!("client connection is dropped, exiting loop");
                    break;
                },
                _ = tokio::time::sleep(Duration::from_secs(1)) => {
                    tx.send_data(Bytes::from(format!("{counter}\n"))).await.unwrap();
                    counter += 1;
                }
            }
        }
    });

    Response::builder().status(StatusCode::OK).body(rx)
}

struct ServerListener(TcpListener);

struct ClientConnection {
    conn: TcpStream,
    cancel: CancellationToken,
}

impl Drop for ClientConnection {
    fn drop(&mut self) {
        self.cancel.cancel()
    }
}

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 3000)))
        .await
        .unwrap();

    let make_service = make_service_fn(|conn: &ClientConnection| {
        let client_connection_cancel = conn.cancel.clone();
        async move {
            Ok::<_, Infallible>(service_fn(move |req| {
                handle(req, client_connection_cancel.clone())
            }))
        }
    });

    let server = hyper::server::Server::builder(ServerListener(listener)).serve(make_service);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

impl AsyncRead for ClientConnection {
    fn poll_read(
        self: Pin<&mut Self>,
        context: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<tokio::io::Result<()>> {
        Pin::new(&mut Pin::into_inner(self).conn).poll_read(context, buf)
    }
}

impl AsyncWrite for ClientConnection {
    fn poll_write(
        self: Pin<&mut Self>,
        context: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, tokio::io::Error>> {
        Pin::new(&mut Pin::into_inner(self).conn).poll_write(context, buf)
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        context: &mut Context<'_>,
    ) -> Poll<Result<(), tokio::io::Error>> {
        Pin::new(&mut Pin::into_inner(self).conn).poll_flush(context)
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        context: &mut Context<'_>,
    ) -> Poll<Result<(), tokio::io::Error>> {
        Pin::new(&mut Pin::into_inner(self).conn).poll_shutdown(context)
    }
}

impl Accept for ServerListener {
    type Conn = ClientConnection;

    type Error = std::io::Error;

    fn poll_accept(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Result<Self::Conn, Self::Error>>> {
        let (conn, _addr) = futures_util::ready!(self.0.poll_accept(cx))?;
        Poll::Ready(Some(Ok(ClientConnection {
            conn,
            cancel: CancellationToken::new(),
        })))
    }
}

@nwtgck
Copy link

nwtgck commented Jun 7, 2023

@sharksforarms Thank you so much for sharing! I'll try it.

edit: It worked!!:

out.mp4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants