From a14c2db56d30ee5a53d701a399887f40957ab485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sylwester=20R=C4=85pa=C5=82a?= Date: Sun, 27 Feb 2022 04:30:29 +0100 Subject: [PATCH 1/4] Add example for streaming with handling connection drops from client --- examples/Cargo.toml | 33 +++++---- examples/src/streaming/client.rs | 74 ++++++++++++++++--- examples/src/streaming/server.rs | 117 +++++++++++++++++++++++++------ 3 files changed, 183 insertions(+), 41 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 3857d6952..76be24e7c 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -180,37 +180,46 @@ path = "src/streaming/server.rs" [dependencies] async-stream = "0.3" -futures = {version = "0.3", default-features = false, features = ["alloc"]} +futures = { version = "0.3", default-features = false, features = ["alloc"] } prost = "0.9" -tokio = {version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"]} -tokio-stream = {version = "0.1", features = ["net"]} -tonic = {path = "../tonic", features = ["tls", "compression"]} -tower = {version = "0.4"} +tokio = { version = "1.0", features = [ + "rt-multi-thread", + "time", + "fs", + "macros", + "net", +] } +tokio-stream = { version = "0.1", features = ["net"] } +tonic = { path = "../tonic", features = ["tls", "compression"] } +tower = { version = "0.4" } # Required for routeguide rand = "0.8" -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" # Tracing tracing = "0.1.16" tracing-attributes = "0.1" tracing-futures = "0.2" -tracing-subscriber = {version = "0.3", features = ["tracing-log"]} +tracing-subscriber = { version = "0.3", features = ["tracing-log"] } # Required for wellknown types prost-types = "0.9" # Hyper example http = "0.2" http-body = "0.4.2" -hyper = {version = "0.14", features = ["full"]} +hyper = { version = "0.14", features = ["full"] } pin-project = "1.0" warp = "0.3" # Health example -tonic-health = {path = "../tonic-health"} +tonic-health = { path = "../tonic-health" } # Reflection example listenfd = "0.3" -tonic-reflection = {path = "../tonic-reflection"} +tonic-reflection = { path = "../tonic-reflection" } # grpc-web example bytes = "1" -tonic-web = {path = "../tonic-web"} +tonic-web = { path = "../tonic-web" } +# streaming example +h2 = "0.3" + [build-dependencies] -tonic-build = {path = "../tonic-build", features = ["prost", "compression"]} +tonic-build = { path = "../tonic-build", features = ["prost", "compression"] } diff --git a/examples/src/streaming/client.rs b/examples/src/streaming/client.rs index 67a876a63..52e64f8c9 100644 --- a/examples/src/streaming/client.rs +++ b/examples/src/streaming/client.rs @@ -2,28 +2,84 @@ pub mod pb { tonic::include_proto!("grpc.examples.echo"); } +use futures::stream::Stream; use pb::{echo_client::EchoClient, EchoRequest}; +use std::time::Duration; +use tokio_stream::StreamExt; +use tonic::transport::Channel; -#[tokio::main] -async fn main() -> Result<(), Box> { - let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap(); +fn echo_requests_iter() -> impl Stream { + tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { + message: format!("msg {:02}", i), + }) +} +async fn streaming_echo(client: &mut EchoClient, num: usize) { let stream = client .server_streaming_echo(EchoRequest { message: "foo".into(), }) .await + .unwrap() + .into_inner(); + + let rev_task = tokio::spawn(async move { + // stream is infinite - take just 5 elements and then disconnect + let mut stream = stream.take(num); + while let Some(item) = stream.next().await { + println!("\trecived: {}", item.unwrap().message); + } + // stream is droped here and the disconnect info is send to server + }); + + rev_task.await.unwrap(); +} + +async fn bidirectional_streaming_echo(client: &mut EchoClient, num: usize) { + let in_stream = echo_requests_iter().take(num); + + let response = client + .bidirectional_streaming_echo(in_stream) + .await .unwrap(); - println!("Connected...now sleeping for 2 seconds..."); + let mut resp_stream = response.into_inner(); + + while let Some(recived) = resp_stream.next().await { + let recived = recived.unwrap(); + println!("\trecived message: `{}`", recived.message); + } +} + +async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient, dur: Duration) { + let in_stream = echo_requests_iter().throttle(dur); + + let response = client + .bidirectional_streaming_echo(in_stream) + .await + .unwrap(); + + let mut resp_stream = response.into_inner(); + + while let Some(recived) = resp_stream.next().await { + let recived = recived.unwrap(); + println!("\trecived message: `{}`", recived.message); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; + println!("Streaming echo:"); + streaming_echo(&mut client, 5).await; + tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions - // Disconnect - drop(stream); - drop(client); + println!("\r\nBidirectional stream echo:"); + bidirectional_streaming_echo(&mut client, 17).await; - println!("Disconnected..."); + println!("\r\nBidirectional stream echo (kill client with CTLR+C):"); + bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await; Ok(()) } diff --git a/examples/src/streaming/server.rs b/examples/src/streaming/server.rs index 9c0738704..5ff6764f4 100644 --- a/examples/src/streaming/server.rs +++ b/examples/src/streaming/server.rs @@ -3,10 +3,14 @@ pub mod pb { } use futures::Stream; +use std::error::Error; +use std::io::ErrorKind; use std::net::ToSocketAddrs; use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::sync::oneshot; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::StreamExt; use tonic::{transport::Server, Request, Response, Status, Streaming}; use pb::{EchoRequest, EchoResponse}; @@ -14,6 +18,29 @@ use pb::{EchoRequest, EchoResponse}; type EchoResult = Result, Status>; type ResponseStream = Pin> + Send>>; +fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { + let mut err: &(dyn Error + 'static) = err_status; + + loop { + if let Some(io_err) = err.downcast_ref::() { + return Some(io_err); + } + + // h2::Error do not expose std::io::Error with `source()` + // https://github.com/hyperium/h2/pull/462 + if let Some(h2_err) = err.downcast_ref::() { + if let Some(io_err) = h2_err.get_io() { + return Some(io_err); + } + } + + err = match err.source() { + Some(err) => err, + None => return None, + }; + } +} + #[derive(Debug)] pub struct EchoServer {} @@ -29,28 +56,37 @@ impl pb::echo_server::Echo for EchoServer { &self, req: Request, ) -> EchoResult { - println!("Client connected from: {:?}", req.remote_addr()); + println!("EchoServer::server_streaming_echo"); + println!("\tclient connected from: {:?}", req.remote_addr()); - let (tx, rx) = oneshot::channel::<()>(); - - tokio::spawn(async move { - let _ = rx.await; - println!("The rx resolved therefore the client disconnected!"); + // creating infinite stream with requested message + let repeat = std::iter::repeat(EchoResponse { + message: req.into_inner().message, }); + let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200))); - struct ClientDisconnect(oneshot::Sender<()>); - - impl Stream for ClientDisconnect { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - // A stream that never resolves to anything.... - Poll::Pending + // spawn and channel is required if you want handle "disconnected" functionality + let (tx, rx) = mpsc::channel(1); + tokio::spawn(async move { + while let Some(item) = stream.next().await { + match tx.send(Result::<_, Status>::Ok(item)).await { + Ok(_) => { + // the item was send to channel but there is no guarntee it was send to client! + // If you need such guarntee the solution can be "rendezvous channel" + // (channel with 0 capacity), tokio::sync::mpsc do not support this special case + // but you can use `flume::bounded` https://docs.rs/flume/latest/flume/fn.bounded.html + } + Err(_item) => { + println!("\tclient disconnected"); //out_stream (build on rx) was droped + break; + } + } } - } + }); + let output_stream = ReceiverStream::new(rx); Ok(Response::new( - Box::pin(ClientDisconnect(tx)) as Self::ServerStreamingEchoStream + Box::pin(output_stream) as Self::ServerStreamingEchoStream )) } @@ -65,9 +101,50 @@ impl pb::echo_server::Echo for EchoServer { async fn bidirectional_streaming_echo( &self, - _: Request>, + req: Request>, ) -> EchoResult { - Err(Status::unimplemented("not implemented")) + println!("EchoServer::Bidirectional_streaming_echo"); + + let mut in_stream = req.into_inner(); + let (tx, rx) = mpsc::channel(2); + + // this spawn here is required if you want to handle connection error. + // If we just map `in_stream` and write it back as `out_stream` the `out_stream` + // will be droped when connection error occure and would be never propagated + // to `in_stream`. + tokio::spawn(async move { + while let Some(result) = in_stream.next().await { + match result { + Ok(v) => tx + .send(Ok(EchoResponse { message: v.message })) + .await + .expect("working rx"), + Err(err) => { + if let Some(io_err) = match_for_io_error(&err) { + if io_err.kind() == ErrorKind::BrokenPipe { + // here you can handle special case when client + // disconnected in unexpected way + eprintln!("\tclient disconnected: broken pipe"); + break; + } + } + + match tx.send(Err(err)).await { + Ok(_) => (), + Err(_err) => break, // response was droped + } + } + } + } + println!("\tstream ended"); + }); + + // echo just write the same data that was received + let out_stream = ReceiverStream::new(rx); + + Ok(Response::new( + Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream + )) } } From 5c3ddb4fd1e5373089dc08a692ced9936a5fb80a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sylwester=20R=C4=85pa=C5=82a?= Date: Wed, 2 Mar 2022 00:49:29 +0100 Subject: [PATCH 2/4] revert some of autoformat changes --- examples/Cargo.toml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 76be24e7c..6a61e1007 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -182,13 +182,7 @@ path = "src/streaming/server.rs" async-stream = "0.3" futures = { version = "0.3", default-features = false, features = ["alloc"] } prost = "0.9" -tokio = { version = "1.0", features = [ - "rt-multi-thread", - "time", - "fs", - "macros", - "net", -] } +tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net",] } tokio-stream = { version = "0.1", features = ["net"] } tonic = { path = "../tonic", features = ["tls", "compression"] } tower = { version = "0.4" } From af6765d0f5fc8dcf898ac96f3966aa8900f1d947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sylwester=20R=C4=85pa=C5=82a?= Date: Wed, 2 Mar 2022 00:50:20 +0100 Subject: [PATCH 3/4] apply changes according to review --- examples/src/streaming/client.rs | 23 ++++++++++++----------- examples/src/streaming/server.rs | 26 ++++++++++---------------- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/examples/src/streaming/client.rs b/examples/src/streaming/client.rs index 52e64f8c9..20e711729 100644 --- a/examples/src/streaming/client.rs +++ b/examples/src/streaming/client.rs @@ -3,11 +3,12 @@ pub mod pb { } use futures::stream::Stream; -use pb::{echo_client::EchoClient, EchoRequest}; use std::time::Duration; use tokio_stream::StreamExt; use tonic::transport::Channel; +use pb::{echo_client::EchoClient, EchoRequest}; + fn echo_requests_iter() -> impl Stream { tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { message: format!("msg {:02}", i), @@ -23,16 +24,12 @@ async fn streaming_echo(client: &mut EchoClient, num: usize) { .unwrap() .into_inner(); - let rev_task = tokio::spawn(async move { - // stream is infinite - take just 5 elements and then disconnect - let mut stream = stream.take(num); - while let Some(item) = stream.next().await { - println!("\trecived: {}", item.unwrap().message); - } - // stream is droped here and the disconnect info is send to server - }); - - rev_task.await.unwrap(); + // stream is infinite - take just 5 elements and then disconnect + let mut stream = stream.take(num); + while let Some(item) = stream.next().await { + println!("\trecived: {}", item.unwrap().message); + } + // stream is droped here and the disconnect info is send to server } async fn bidirectional_streaming_echo(client: &mut EchoClient, num: usize) { @@ -75,9 +72,13 @@ async fn main() -> Result<(), Box> { streaming_echo(&mut client, 5).await; tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions + // Echo stream that sends 17 requests then gracefull end that conection println!("\r\nBidirectional stream echo:"); bidirectional_streaming_echo(&mut client, 17).await; + // Echo stream that sends up to `usize::MAX` requets. One request each 2s. + // Exiting client with CTRL+C demostrate how to distinguise broken pipe from + //gracefull client disconnection (above example) on the server side. println!("\r\nBidirectional stream echo (kill client with CTLR+C):"); bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await; diff --git a/examples/src/streaming/server.rs b/examples/src/streaming/server.rs index 5ff6764f4..a0c37147b 100644 --- a/examples/src/streaming/server.rs +++ b/examples/src/streaming/server.rs @@ -3,14 +3,9 @@ pub mod pb { } use futures::Stream; -use std::error::Error; -use std::io::ErrorKind; -use std::net::ToSocketAddrs; -use std::pin::Pin; -use std::time::Duration; +use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration}; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::StreamExt; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; use pb::{EchoRequest, EchoResponse}; @@ -65,23 +60,22 @@ impl pb::echo_server::Echo for EchoServer { }); let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200))); - // spawn and channel is required if you want handle "disconnected" functionality + // spawn and channel are required if you want handle "disconnect" functionality + // the `out_stream` will not be polled after client disconnect let (tx, rx) = mpsc::channel(1); tokio::spawn(async move { while let Some(item) = stream.next().await { match tx.send(Result::<_, Status>::Ok(item)).await { Ok(_) => { - // the item was send to channel but there is no guarntee it was send to client! - // If you need such guarntee the solution can be "rendezvous channel" - // (channel with 0 capacity), tokio::sync::mpsc do not support this special case - // but you can use `flume::bounded` https://docs.rs/flume/latest/flume/fn.bounded.html + // item (server response) was queued to be send to client } Err(_item) => { - println!("\tclient disconnected"); //out_stream (build on rx) was droped + // output_stream was build from rx and both are dropped break; } } } + println!("\tclient disconnected"); }); let output_stream = ReceiverStream::new(rx); @@ -103,15 +97,15 @@ impl pb::echo_server::Echo for EchoServer { &self, req: Request>, ) -> EchoResult { - println!("EchoServer::Bidirectional_streaming_echo"); + println!("EchoServer::bidirectional_streaming_echo"); let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(2); // this spawn here is required if you want to handle connection error. // If we just map `in_stream` and write it back as `out_stream` the `out_stream` - // will be droped when connection error occure and would be never propagated - // to `in_stream`. + // will be drooped when connection error occurs and error will never be propagated + // to mapped version of `in_stream`. tokio::spawn(async move { while let Some(result) = in_stream.next().await { match result { From 6ec28c7a2c2398a5be1ce563bd24abaa81b20767 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sylwester=20R=C4=85pa=C5=82a?= Date: Wed, 2 Mar 2022 00:56:13 +0100 Subject: [PATCH 4/4] use more correct size for channels --- examples/src/streaming/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/streaming/server.rs b/examples/src/streaming/server.rs index a0c37147b..864626072 100644 --- a/examples/src/streaming/server.rs +++ b/examples/src/streaming/server.rs @@ -62,7 +62,7 @@ impl pb::echo_server::Echo for EchoServer { // spawn and channel are required if you want handle "disconnect" functionality // the `out_stream` will not be polled after client disconnect - let (tx, rx) = mpsc::channel(1); + let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(item) = stream.next().await { match tx.send(Result::<_, Status>::Ok(item)).await { @@ -100,7 +100,7 @@ impl pb::echo_server::Echo for EchoServer { println!("EchoServer::bidirectional_streaming_echo"); let mut in_stream = req.into_inner(); - let (tx, rx) = mpsc::channel(2); + let (tx, rx) = mpsc::channel(128); // this spawn here is required if you want to handle connection error. // If we just map `in_stream` and write it back as `out_stream` the `out_stream`