From e0f70706a5c1c3308b7e7860bd35662cc41b6b92 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 5 Nov 2021 15:55:38 +0100 Subject: [PATCH] restart a HTTP server on schema updates (#5) Fixes #41 In a previous PR, the schema that was shared between the state machine and the HTTP server behind a lock was removed, so schema update was broken. We want to fix it without reintroducing a lock in the middle of the hot path for queries. Our solution here is to launch a new HTTP server with the new schema configuration on schema updates, as launching a server is very cheap. We need to replace warp's HTTP server with our own loop though, to get the ability to reuse the TCP listener socket from one server to the next and avoid losing TCP connections. ## graceful shutdown Since we do not have access to the private hyper structs and traits used to implement it (Graceful, Watch, Exec, ConnStreamExec...), it is challenging to make a struct wrapper for a Connection, especially if we want to satisfy the bounds of https://docs.rs/hyper/0.14.13/hyper/server/conn/struct.Connection.html#method.graceful_shutdown What we can do, though, is to select over the shutdown watcher and the connection: - if the connection finishes first, exit there - if the shutdown watcher exits first, call graceful_shutdown() on the connection then await on the connection Additional fixes: * make server handle member as private (this helps in isolating the TcpListener dance) * the graph fetcher must regenerate on config update (Example case: the new configuration sets up override addresses for backend services, so the HttpServiceRegistry used by the graph fetcher must be recreated) --- Cargo.lock | 1 + crates/apollo-router/Cargo.toml | 1 + .../apollo-router/src/http_server_factory.rs | 88 ++++++++++-- crates/apollo-router/src/lib.rs | 3 + crates/apollo-router/src/state_machine.rs | 121 ++++++++++------ .../src/warp_http_server_factory.rs | 131 ++++++++++++++---- 6 files changed, 264 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbddb2208e..4d9d10e63f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,7 @@ dependencies = [ "futures", "hotwatch", "httpmock", + "hyper", "insta", "maplit", "mockall", diff --git a/crates/apollo-router/Cargo.toml b/crates/apollo-router/Cargo.toml index 48886eb766..1a9e44bd53 100644 --- a/crates/apollo-router/Cargo.toml +++ b/crates/apollo-router/Cargo.toml @@ -32,6 +32,7 @@ directories = "4.0.1" displaydoc = "0.2" futures = { version = "0.3.17", features = ["thread-pool"] } hotwatch = "0.4.6" +hyper = { version = "0.14.13", features = ["server"] } once_cell = "1.8.0" opentelemetry = { version = "0.16.0", features = ["rt-tokio", "serialize"] } opentelemetry-jaeger = { version = "0.15.0", features = [ diff --git a/crates/apollo-router/src/http_server_factory.rs b/crates/apollo-router/src/http_server_factory.rs index b57590a192..26aa6760ec 100644 --- a/crates/apollo-router/src/http_server_factory.rs +++ b/crates/apollo-router/src/http_server_factory.rs @@ -9,6 +9,7 @@ use mockall::{automock, predicate::*}; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; +use tokio::net::TcpListener; /// Factory for creating the http server component. /// @@ -20,7 +21,8 @@ pub(crate) trait HttpServerFactory { &self, graph: Arc, configuration: Arc, - ) -> Pin + Send>> + listener: Option, + ) -> Pin> + Send>> where F: graphql::Fetcher + 'static; } @@ -33,48 +35,110 @@ pub(crate) trait HttpServerFactory { #[derivative(Debug)] pub(crate) struct HttpServerHandle { /// Sender to use to notify of shutdown - pub(crate) shutdown_sender: oneshot::Sender<()>, + shutdown_sender: oneshot::Sender<()>, /// Future to wait on for graceful shutdown #[derivative(Debug = "ignore")] - pub(crate) server_future: - Pin> + Send>>, + server_future: Pin> + Send>>, /// The listen address that the server is actually listening on. /// If the socket address specified port zero the OS will assign a random free port. #[allow(dead_code)] - pub(crate) listen_address: SocketAddr, + listen_address: SocketAddr, } impl HttpServerHandle { + pub(crate) fn new( + shutdown_sender: oneshot::Sender<()>, + server_future: Pin< + Box> + Send>, + >, + listen_address: SocketAddr, + ) -> Self { + Self { + shutdown_sender, + server_future, + listen_address, + } + } + pub(crate) async fn shutdown(self) -> Result<(), FederatedServerError> { if let Err(_err) = self.shutdown_sender.send(()) { tracing::error!("Failed to notify http thread of shutdown") }; - self.server_future.await + self.server_future.await.map(|_| ()) + } + + pub(crate) async fn restart( + self, + factory: &ServerFactory, + graph: Arc, + configuration: Arc, + ) -> Result + where + Fetcher: graphql::Fetcher + 'static, + ServerFactory: HttpServerFactory, + { + // we tell the currently running server to stop + if let Err(_err) = self.shutdown_sender.send(()) { + tracing::error!("Failed to notify http thread of shutdown") + }; + + // when the server receives the shutdown signal, it stops accepting new + // connections, and returns the TCP listener, to reuse it in the next server + // it is necessary to keep the queue of new TCP sockets associated with + // the listener instead of dropping them + let listener = self.server_future.await; + tracing::info!("previous server is closed"); + + // we keep the TCP listener if it is compatible with the new configuration + let listener = if self.listen_address != configuration.server.listen { + None + } else { + match listener { + Ok(listener) => Some(listener), + Err(e) => { + tracing::error!("the previous listen socket failed: {}", e); + None + } + } + }; + + let handle = factory + .create(Arc::clone(&graph), Arc::clone(&configuration), listener) + .await?; + tracing::debug!("Restarted on {}", handle.listen_address()); + + Ok(handle) + } + + pub(crate) fn listen_address(&self) -> SocketAddr { + self.listen_address } } #[cfg(test)] mod tests { use super::*; + use futures::channel::oneshot; use std::str::FromStr; use test_env_log::test; #[test(tokio::test)] async fn sanity() { - let (shutdown_sender, shutdown_receiver) = futures::channel::oneshot::channel(); - HttpServerHandle { - listen_address: SocketAddr::from_str("127.0.0.1:0").unwrap(), + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + + HttpServerHandle::new( shutdown_sender, - server_future: futures::future::ready(Ok(())).boxed(), - } + futures::future::ready(Ok(listener)).boxed(), + SocketAddr::from_str("127.0.0.1:0").unwrap(), + ) .shutdown() .await .expect("Should have waited for shutdown"); shutdown_receiver - .into_future() .await .expect("Should have been send notification to shutdown"); } diff --git a/crates/apollo-router/src/lib.rs b/crates/apollo-router/src/lib.rs index 3b9f0532f3..43123dacf1 100644 --- a/crates/apollo-router/src/lib.rs +++ b/crates/apollo-router/src/lib.rs @@ -66,6 +66,9 @@ pub enum FederatedServerError { /// Could not read schema: {0} ReadSchemaError(graphql::SchemaError), + + /// Could not create the HTTP server: {0} + ServerCreationError(std::io::Error), } /// The user supplied schema. Either a static instance or a stream for hot reloading. diff --git a/crates/apollo-router/src/state_machine.rs b/crates/apollo-router/src/state_machine.rs index 00e0ac9c49..3df09c83ee 100644 --- a/crates/apollo-router/src/state_machine.rs +++ b/crates/apollo-router/src/state_machine.rs @@ -62,7 +62,7 @@ where schema, .. } => State::Running { - address: server_handle.listen_address, + address: server_handle.listen_address(), schema: schema.as_str().to_string(), }, Stopped => State::Stopped, @@ -173,6 +173,7 @@ where } } Ok(()) => { + tracing::info!("Reloading schema"); let configuration = Arc::new(new_configuration); let schema = Arc::new(new_schema); @@ -182,11 +183,21 @@ where .await, ); - Running { - configuration, - schema, - graph, - server_handle, + match server_handle + .restart( + &self.http_server_factory, + Arc::clone(&graph), + Arc::clone(&configuration), + ) + .await + { + Ok(server_handle) => Running { + configuration, + schema, + graph, + server_handle, + }, + Err(err) => Errored(err), } } } @@ -202,7 +213,7 @@ where }, UpdateConfiguration(mut new_configuration), ) => { - tracing::debug!("Reloading configuration"); + tracing::info!("Reloading configuration"); match new_configuration.load_subgraphs(schema.as_ref()) { Err(e) => { @@ -220,27 +231,27 @@ where } Ok(()) => { let configuration = Arc::new(new_configuration); - let server_handle = - if server_handle.listen_address != configuration.server.listen { - tracing::debug!("Restarting http"); - if let Err(_err) = server_handle.shutdown().await { - tracing::error!("Failed to notify shutdown") - } - let new_handle = self - .http_server_factory - .create(Arc::clone(&graph), Arc::clone(&configuration)) - .await; - tracing::debug!("Restarted on {}", new_handle.listen_address); - new_handle - } else { - server_handle - }; + let graph = Arc::new( + self.graph_factory + .create(&configuration, Arc::clone(&schema)) + .await, + ); - Running { - configuration, - schema, - graph, - server_handle, + match server_handle + .restart( + &self.http_server_factory, + Arc::clone(&graph), + Arc::clone(&configuration), + ) + .await + { + Ok(server_handle) => Running { + configuration, + schema, + graph, + server_handle, + }, + Err(err) => Errored(err), } } } @@ -320,16 +331,25 @@ where ); let configuration = Arc::new(configuration); - let server_handle = self + match self .http_server_factory - .create(Arc::clone(&graph), Arc::clone(&configuration)) - .await; - tracing::debug!("Started on {}", server_handle.listen_address); - Running { - configuration, - schema, - graph, - server_handle, + .create(Arc::clone(&graph), Arc::clone(&configuration), None) + .await + { + Ok(server_handle) => { + tracing::debug!("Started on {}", server_handle.listen_address()); + Running { + configuration, + schema, + graph, + server_handle, + } + } + + Err(err) => { + tracing::error!("Cannot start the router: {}", err); + Errored(err) + } } } } @@ -353,6 +373,7 @@ mod tests { use std::pin::Pin; use std::str::FromStr; use test_env_log::test; + use tokio::net::TcpListener; #[test(tokio::test)] async fn no_configuration() { @@ -438,7 +459,7 @@ mod tests { #[test(tokio::test)] async fn startup_reload_schema() { let graph_factory = create_mock_graph_factory(2); - let (server_factory, shutdown_receivers) = create_mock_server_factory(1); + let (server_factory, shutdown_receivers) = create_mock_server_factory(2); let schema = include_str!("testdata/supergraph.graphql"); assert!(matches!( @@ -471,12 +492,12 @@ mod tests { .await, Ok(()), )); - assert_eq!(shutdown_receivers.lock().len(), 1); + assert_eq!(shutdown_receivers.lock().len(), 2); } #[test(tokio::test)] async fn startup_reload_configuration() { - let graph_factory = create_mock_graph_factory(1); + let graph_factory = create_mock_graph_factory(2); let (server_factory, shutdown_receivers) = create_mock_server_factory(2); assert!(matches!( @@ -666,16 +687,26 @@ mod tests { .expect_create() .times(expect_times_called) .returning( - move |_: Arc, configuration: Arc| { - let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); + move |_: Arc, + configuration: Arc, + listener: Option| { + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); shutdown_receivers_clone.lock().push(shutdown_receiver); + let server = async move { + Ok(if let Some(l) = listener { + l + } else { + tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap() + }) + }; + Box::pin(async move { - HttpServerHandle { + Ok(HttpServerHandle::new( shutdown_sender, - server_future: future::ready(Ok(())).boxed(), - listen_address: configuration.server.listen, - } + Box::pin(server), + configuration.server.listen, + )) }) }, ); diff --git a/crates/apollo-router/src/warp_http_server_factory.rs b/crates/apollo-router/src/warp_http_server_factory.rs index e9a1f9cc24..c6aa75d39f 100644 --- a/crates/apollo-router/src/warp_http_server_factory.rs +++ b/crates/apollo-router/src/warp_http_server_factory.rs @@ -3,11 +3,13 @@ use crate::http_server_factory::{HttpServerFactory, HttpServerHandle}; use crate::FederatedServerError; use apollo_router_core::prelude::*; use bytes::Bytes; -use futures::channel::oneshot; -use futures::prelude::*; +use futures::{channel::oneshot, prelude::*}; +use hyper::server::conn::Http; use opentelemetry::propagation::Extractor; use std::pin::Pin; use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::sync::Notify; use tracing::instrument::WithSubscriber; use tracing::{Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -36,12 +38,13 @@ impl HttpServerFactory for WarpHttpServerFactory { &self, graph: Arc, configuration: Arc, - ) -> Pin + Send>> + listener: Option, + ) -> Pin> + Send>> where F: graphql::Fetcher + 'static, { - let f = async { - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + Box::pin(async { + let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); let listen_address = configuration.server.listen; let cors = configuration @@ -56,24 +59,103 @@ impl HttpServerFactory for WarpHttpServerFactory { .or(post_graphql_request(graph, configuration)) .with(cors); - let (actual_listen_address, server) = - warp::serve(routes).bind_with_graceful_shutdown(listen_address, async { - shutdown_receiver.await.ok(); - }); + // generate a hyper service from warp routes + let svc = warp::service(routes); + + // if we received a TCP listener, reuse it, otherwise create a new one + let tcp_listener = if let Some(listener) = listener { + listener + } else { + TcpListener::bind(listen_address) + .await + .map_err(FederatedServerError::ServerCreationError)? + }; + let actual_listen_address = tcp_listener + .local_addr() + .map_err(FederatedServerError::ServerCreationError)?; + + // this server reproduces most of hyper::server::Server's behaviour + // we select over the stop_listen_receiver channel and the listener's + // accept future. If the channel received something or the sender + // was dropped, we stop using the listener and send it back through + // listener_receiver + let server = async move { + tokio::pin!(shutdown_receiver); + + let connection_shutdown = Arc::new(Notify::new()); + + loop { + tokio::select! { + _ = &mut shutdown_receiver => { + break; + } + res = tcp_listener.accept() => { + let svc = svc.clone(); + let connection_shutdown = connection_shutdown.clone(); + + tokio::task::spawn(async move { + // we unwrap the result of accept() here to avoid stopping + // the entire server on an issue with that socket + // Unfortunately, the error here could also be linked + // to the listen socket (no RAM for kernel buffers, no + // more file descriptors, network interface is down...) + // ideally we'd want to handle the errors in the server task + // with varying behaviours + let (tcp_stream, _) = res.unwrap(); + + let connection = Http::new() + .http1_keep_alive(true) + .serve_connection(tcp_stream, svc); + + tokio::pin!(connection); + tokio::select! { + // the connection finished first + _res = &mut connection => { + /*if let Err(http_err) = res { + tracing::error!( + "Error while serving HTTP connection: {}", + http_err + ); + }*/ + } + // the shutdown receiver was triggered first, + // so we tell the connection to do a graceful shutdown + // on the next request, then we wait for it to finish + _ = connection_shutdown.notified() => { + let c = connection.as_mut(); + c.graceful_shutdown(); + + if let Err(_http_err) = connection.await { + /*tracing::error!( + "Error while serving HTTP connection: {}", + http_err + );*/ + } + } + } + }); + } + } + } + + // the shutdown receiver was triggered so we break out of + // the server loop, tell the currently active connections to stop + // then return the TCP listen socket + connection_shutdown.notify_waiters(); + tcp_listener + }; // Spawn the server into a runtime let server_future = tokio::task::spawn(server) .map_err(|_| FederatedServerError::HttpServerLifecycleError) .boxed(); - HttpServerHandle { + Ok(HttpServerHandle::new( shutdown_sender, server_future, - listen_address: actual_listen_address, - } - }; - - Box::pin(f) + actual_listen_address, + )) + }) } } @@ -339,8 +421,9 @@ mod tests { .subgraphs(Default::default()) .build(), ), + None, ) - .await; + .await?; let client = reqwest::Client::builder() .redirect(Policy::none()) .build() @@ -354,8 +437,8 @@ mod tests { let (server, client) = init!("127.0.0.1:0", fetcher => {}); for url in vec![ - format!("http://{}/", server.listen_address), - format!("http://{}/graphql", server.listen_address), + format!("http://{}/", server.listen_address()), + format!("http://{}/graphql", server.listen_address()), ] { // Regular studio redirect let response = client @@ -375,7 +458,7 @@ mod tests { LOCATION, vec![format!( "https://studio.apollographql.com/sandbox?endpoint=http://{}", - server.listen_address + server.listen_address() )], "Incorrect redirect url" ); @@ -403,7 +486,7 @@ mod tests { let (server, client) = init!("127.0.0.1:0", fetcher => {}); let response = client - .post(format!("http://{}/graphql", server.listen_address)) + .post(format!("http://{}/graphql", server.listen_address())) .body("Garbage") .send() .await @@ -427,7 +510,7 @@ mod tests { future::ready(futures::stream::iter(vec![actual_response]).boxed()).boxed() }) }); - let url = format!("http://{}/graphql", server.listen_address); + let url = format!("http://{}/graphql", server.listen_address()); // Post query let response = client .post(url.as_str()) @@ -477,7 +560,7 @@ mod tests { }) }); let response = client - .post(format!("http://{}/graphql", server.listen_address)) + .post(format!("http://{}/graphql", server.listen_address())) .body( json!( { @@ -509,8 +592,8 @@ mod tests { let (server, client) = init!("127.0.0.1:0", fetcher => {}); for url in vec![ - format!("http://{}/", server.listen_address), - format!("http://{}/graphql", server.listen_address), + format!("http://{}/", server.listen_address()), + format!("http://{}/graphql", server.listen_address()), ] { let response = client .request(Method::OPTIONS, &url)