-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathhttp_server.rs
147 lines (133 loc) · 4.74 KB
/
http_server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! # Serving
//! Restate services run as an HTTP endpoint.
//!
//! ## Creating an HTTP endpoint
//! 1. Create the endpoint
//! 2. Bind one or multiple services to it.
//! 3. Listen on the specified port (default `9080`) for connections and requests.
//!
//! ```rust,no_run
//! # #[path = "../examples/services/mod.rs"]
//! # mod services;
//! # use services::my_service::{MyService, MyServiceImpl};
//! # use services::my_virtual_object::{MyVirtualObject, MyVirtualObjectImpl};
//! # use services::my_workflow::{MyWorkflow, MyWorkflowImpl};
//! use restate_sdk::endpoint::Endpoint;
//! use restate_sdk::http_server::HttpServer;
//!
//! #[tokio::main]
//! async fn main() {
//! HttpServer::new(
//! Endpoint::builder()
//! .bind(MyServiceImpl.serve())
//! .bind(MyVirtualObjectImpl.serve())
//! .bind(MyWorkflowImpl.serve())
//! .build(),
//! )
//! .listen_and_serve("0.0.0.0:9080".parse().unwrap())
//! .await;
//! }
//! ```
//!
//!
//! ## Validating request identity
//!
//! SDKs can validate that incoming requests come from a particular Restate
//! instance. You can find out more about request identity in the [Security docs](https://docs.restate.dev/operate/security#locking-down-service-access).
//! Add the identity key to your endpoint as follows:
//!
//! ```rust,no_run
//! # #[path = "../examples/services/mod.rs"]
//! # mod services;
//! # use services::my_service::{MyService, MyServiceImpl};
//! # use restate_sdk::endpoint::Endpoint;
//! # use restate_sdk::http_server::HttpServer;
//! #
//! # #[tokio::main]
//! # async fn main() {
//! HttpServer::new(
//! Endpoint::builder()
//! .bind(MyServiceImpl.serve())
//! .identity_key("publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f")
//! .unwrap()
//! .build(),
//! )
//! .listen_and_serve("0.0.0.0:9080".parse().unwrap())
//! .await;
//! # }
//! ```
use crate::endpoint::Endpoint;
use crate::hyper::HyperEndpoint;
use futures::FutureExt;
use hyper::server::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpListener;
use tracing::{info, warn};
/// Http server to expose your Restate services.
pub struct HttpServer {
endpoint: Endpoint,
}
impl From<Endpoint> for HttpServer {
fn from(endpoint: Endpoint) -> Self {
Self { endpoint }
}
}
impl HttpServer {
/// Create new [`HttpServer`] from an [`Endpoint`].
pub fn new(endpoint: Endpoint) -> Self {
Self { endpoint }
}
/// Listen on the given address and serve.
///
/// The future will be completed once `SIGTERM` is sent to the process.
pub async fn listen_and_serve(self, addr: SocketAddr) {
let listener = TcpListener::bind(addr).await.expect("listener can bind");
self.serve(listener).await;
}
/// Serve on the given listener.
///
/// The future will be completed once `SIGTERM` is sent to the process.
pub async fn serve(self, listener: TcpListener) {
self.serve_with_cancel(listener, tokio::signal::ctrl_c().map(|_| ()))
.await;
}
/// Serve on the given listener, and cancel the execution with the given future.
pub async fn serve_with_cancel(self, listener: TcpListener, cancel_signal_future: impl Future) {
let endpoint = HyperEndpoint::new(self.endpoint);
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
// when this signal completes, start shutdown
let mut signal = std::pin::pin!(cancel_signal_future);
info!("Starting listening on {}", listener.local_addr().unwrap());
// Our server accept loop
loop {
tokio::select! {
Ok((stream, remote)) = listener.accept() => {
let endpoint = endpoint.clone();
let conn = http2::Builder::new(TokioExecutor::default())
.serve_connection(TokioIo::new(stream), endpoint);
let fut = graceful.watch(conn);
tokio::spawn(async move {
if let Err(e) = fut.await {
warn!("Error serving connection {remote}: {:?}", e);
}
});
},
_ = &mut signal => {
info!("Shutting down");
// stop the accept loop
break;
}
}
}
// Wait graceful shutdown
tokio::select! {
_ = graceful.shutdown() => {},
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("Timed out waiting for all connections to close");
}
}
}
}