-
Notifications
You must be signed in to change notification settings - Fork 933
/
server.rs
154 lines (130 loc) · 5.33 KB
/
server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}
use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};
use pb::{EchoRequest, EchoResponse};
type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;
fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn Error + 'static) = err_status;
loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}
// h2::Error do not expose std::io::Error with `source()`
// https://github.com/hyperium/h2/pull/462
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
if let Some(io_err) = h2_err.get_io() {
return Some(io_err);
}
}
err = match err.source() {
Some(err) => err,
None => return None,
};
}
}
#[derive(Debug)]
pub struct EchoServer {}
#[tonic::async_trait]
impl pb::echo_server::Echo for EchoServer {
async fn unary_echo(&self, _: Request<EchoRequest>) -> EchoResult<EchoResponse> {
Err(Status::unimplemented("not implemented"))
}
type ServerStreamingEchoStream = ResponseStream;
async fn server_streaming_echo(
&self,
req: Request<EchoRequest>,
) -> EchoResult<Self::ServerStreamingEchoStream> {
println!("EchoServer::server_streaming_echo");
println!("\tclient connected from: {:?}", req.remote_addr());
// creating infinite stream with requested message
let repeat = std::iter::repeat(EchoResponse {
message: req.into_inner().message,
});
let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));
// spawn and channel are required if you want handle "disconnect" functionality
// the `out_stream` will not be polled after client disconnect
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let Some(item) = stream.next().await {
match tx.send(Result::<_, Status>::Ok(item)).await {
Ok(_) => {
// item (server response) was queued to be send to client
}
Err(_item) => {
// output_stream was build from rx and both are dropped
break;
}
}
}
println!("\tclient disconnected");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ServerStreamingEchoStream
))
}
async fn client_streaming_echo(
&self,
_: Request<Streaming<EchoRequest>>,
) -> EchoResult<EchoResponse> {
Err(Status::unimplemented("not implemented"))
}
type BidirectionalStreamingEchoStream = ResponseStream;
async fn bidirectional_streaming_echo(
&self,
req: Request<Streaming<EchoRequest>>,
) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
println!("EchoServer::bidirectional_streaming_echo");
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
// this spawn here is required if you want to handle connection error.
// If we just map `in_stream` and write it back as `out_stream` the `out_stream`
// will be dropped when connection error occurs and error will never be propagated
// to mapped version of `in_stream`.
tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
match result {
Ok(v) => tx
.send(Ok(EchoResponse { message: v.message }))
.await
.expect("working rx"),
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
// here you can handle special case when client
// disconnected in unexpected way
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was droped
}
}
}
}
println!("\tstream ended");
});
// echo just write the same data that was received
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = EchoServer {};
Server::builder()
.add_service(pb::echo_server::EchoServer::new(server))
.serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap())
.await
.unwrap();
Ok(())
}