Skip to content

Commit

Permalink
restart a HTTP server on schema updates (#5)
Browse files Browse the repository at this point in the history
Fixes #41

In a previous PR, the schema that was shared between the state machine
and the HTTP server behind a lock was removed, so schema update was
broken.
We want to fix it without reintroducing a lock in the middle of the hot
path for queries. Our solution here is to launch a new HTTP server with
the new schema configuration on schema updates, as launching a server is
very cheap.

We need to replace warp's HTTP server with our own loop though, to get
the ability to reuse the TCP listener socket from one server to the next
and avoid losing TCP connections.

## graceful shutdown

Since we do not have access to the private hyper structs and traits used
to implement it (Graceful, Watch, Exec, ConnStreamExec...), it is
challenging to make a struct wrapper for a Connection, especially if we
want to satisfy the bounds of
https://docs.rs/hyper/0.14.13/hyper/server/conn/struct.Connection.html#method.graceful_shutdown

What we can do, though, is to select over the shutdown watcher and the
connection:
- if the connection finishes first, exit there
- if the shutdown watcher exits first, call graceful_shutdown() on the
  connection then await on the connection

Additional fixes:
* make server handle member as private (this helps in isolating the TcpListener dance)
* the graph fetcher must regenerate on config update (Example case: the new configuration sets up override addresses for backend services, so the HttpServiceRegistry used by the graph fetcher must be recreated)
  • Loading branch information
Geal committed Nov 5, 2021
1 parent 61b263d commit e0f7070
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ directories = "4.0.1"
displaydoc = "0.2"
futures = { version = "0.3.17", features = ["thread-pool"] }
hotwatch = "0.4.6"
hyper = { version = "0.14.13", features = ["server"] }
once_cell = "1.8.0"
opentelemetry = { version = "0.16.0", features = ["rt-tokio", "serialize"] }
opentelemetry-jaeger = { version = "0.15.0", features = [
Expand Down
88 changes: 76 additions & 12 deletions crates/apollo-router/src/http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use mockall::{automock, predicate::*};
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tokio::net::TcpListener;

/// Factory for creating the http server component.
///
Expand All @@ -20,7 +21,8 @@ pub(crate) trait HttpServerFactory {
&self,
graph: Arc<F>,
configuration: Arc<Configuration>,
) -> Pin<Box<dyn Future<Output = HttpServerHandle> + Send>>
listener: Option<TcpListener>,
) -> Pin<Box<dyn Future<Output = Result<HttpServerHandle, FederatedServerError>> + Send>>
where
F: graphql::Fetcher + 'static;
}
Expand All @@ -33,48 +35,110 @@ pub(crate) trait HttpServerFactory {
#[derivative(Debug)]
pub(crate) struct HttpServerHandle {
/// Sender to use to notify of shutdown
pub(crate) shutdown_sender: oneshot::Sender<()>,
shutdown_sender: oneshot::Sender<()>,

/// Future to wait on for graceful shutdown
#[derivative(Debug = "ignore")]
pub(crate) server_future:
Pin<Box<dyn Future<Output = Result<(), FederatedServerError>> + Send>>,
server_future: Pin<Box<dyn Future<Output = Result<TcpListener, FederatedServerError>> + Send>>,

/// The listen address that the server is actually listening on.
/// If the socket address specified port zero the OS will assign a random free port.
#[allow(dead_code)]
pub(crate) listen_address: SocketAddr,
listen_address: SocketAddr,
}

impl HttpServerHandle {
pub(crate) fn new(
shutdown_sender: oneshot::Sender<()>,
server_future: Pin<
Box<dyn Future<Output = Result<TcpListener, FederatedServerError>> + Send>,
>,
listen_address: SocketAddr,
) -> Self {
Self {
shutdown_sender,
server_future,
listen_address,
}
}

pub(crate) async fn shutdown(self) -> Result<(), FederatedServerError> {
if let Err(_err) = self.shutdown_sender.send(()) {
tracing::error!("Failed to notify http thread of shutdown")
};
self.server_future.await
self.server_future.await.map(|_| ())
}

pub(crate) async fn restart<Fetcher, ServerFactory>(
self,
factory: &ServerFactory,
graph: Arc<Fetcher>,
configuration: Arc<Configuration>,
) -> Result<Self, FederatedServerError>
where
Fetcher: graphql::Fetcher + 'static,
ServerFactory: HttpServerFactory,
{
// we tell the currently running server to stop
if let Err(_err) = self.shutdown_sender.send(()) {
tracing::error!("Failed to notify http thread of shutdown")
};

// when the server receives the shutdown signal, it stops accepting new
// connections, and returns the TCP listener, to reuse it in the next server
// it is necessary to keep the queue of new TCP sockets associated with
// the listener instead of dropping them
let listener = self.server_future.await;
tracing::info!("previous server is closed");

// we keep the TCP listener if it is compatible with the new configuration
let listener = if self.listen_address != configuration.server.listen {
None
} else {
match listener {
Ok(listener) => Some(listener),
Err(e) => {
tracing::error!("the previous listen socket failed: {}", e);
None
}
}
};

let handle = factory
.create(Arc::clone(&graph), Arc::clone(&configuration), listener)
.await?;
tracing::debug!("Restarted on {}", handle.listen_address());

Ok(handle)
}

pub(crate) fn listen_address(&self) -> SocketAddr {
self.listen_address
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::channel::oneshot;
use std::str::FromStr;
use test_env_log::test;

#[test(tokio::test)]
async fn sanity() {
let (shutdown_sender, shutdown_receiver) = futures::channel::oneshot::channel();
HttpServerHandle {
listen_address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

HttpServerHandle::new(
shutdown_sender,
server_future: futures::future::ready(Ok(())).boxed(),
}
futures::future::ready(Ok(listener)).boxed(),
SocketAddr::from_str("127.0.0.1:0").unwrap(),
)
.shutdown()
.await
.expect("Should have waited for shutdown");

shutdown_receiver
.into_future()
.await
.expect("Should have been send notification to shutdown");
}
Expand Down
3 changes: 3 additions & 0 deletions crates/apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub enum FederatedServerError {

/// Could not read schema: {0}
ReadSchemaError(graphql::SchemaError),

/// Could not create the HTTP server: {0}
ServerCreationError(std::io::Error),
}

/// The user supplied schema. Either a static instance or a stream for hot reloading.
Expand Down
121 changes: 76 additions & 45 deletions crates/apollo-router/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
schema,
..
} => State::Running {
address: server_handle.listen_address,
address: server_handle.listen_address(),
schema: schema.as_str().to_string(),
},
Stopped => State::Stopped,
Expand Down Expand Up @@ -173,6 +173,7 @@ where
}
}
Ok(()) => {
tracing::info!("Reloading schema");
let configuration = Arc::new(new_configuration);

let schema = Arc::new(new_schema);
Expand All @@ -182,11 +183,21 @@ where
.await,
);

Running {
configuration,
schema,
graph,
server_handle,
match server_handle
.restart(
&self.http_server_factory,
Arc::clone(&graph),
Arc::clone(&configuration),
)
.await
{
Ok(server_handle) => Running {
configuration,
schema,
graph,
server_handle,
},
Err(err) => Errored(err),
}
}
}
Expand All @@ -202,7 +213,7 @@ where
},
UpdateConfiguration(mut new_configuration),
) => {
tracing::debug!("Reloading configuration");
tracing::info!("Reloading configuration");

match new_configuration.load_subgraphs(schema.as_ref()) {
Err(e) => {
Expand All @@ -220,27 +231,27 @@ where
}
Ok(()) => {
let configuration = Arc::new(new_configuration);
let server_handle =
if server_handle.listen_address != configuration.server.listen {
tracing::debug!("Restarting http");
if let Err(_err) = server_handle.shutdown().await {
tracing::error!("Failed to notify shutdown")
}
let new_handle = self
.http_server_factory
.create(Arc::clone(&graph), Arc::clone(&configuration))
.await;
tracing::debug!("Restarted on {}", new_handle.listen_address);
new_handle
} else {
server_handle
};
let graph = Arc::new(
self.graph_factory
.create(&configuration, Arc::clone(&schema))
.await,
);

Running {
configuration,
schema,
graph,
server_handle,
match server_handle
.restart(
&self.http_server_factory,
Arc::clone(&graph),
Arc::clone(&configuration),
)
.await
{
Ok(server_handle) => Running {
configuration,
schema,
graph,
server_handle,
},
Err(err) => Errored(err),
}
}
}
Expand Down Expand Up @@ -320,16 +331,25 @@ where
);
let configuration = Arc::new(configuration);

let server_handle = self
match self
.http_server_factory
.create(Arc::clone(&graph), Arc::clone(&configuration))
.await;
tracing::debug!("Started on {}", server_handle.listen_address);
Running {
configuration,
schema,
graph,
server_handle,
.create(Arc::clone(&graph), Arc::clone(&configuration), None)
.await
{
Ok(server_handle) => {
tracing::debug!("Started on {}", server_handle.listen_address());
Running {
configuration,
schema,
graph,
server_handle,
}
}

Err(err) => {
tracing::error!("Cannot start the router: {}", err);
Errored(err)
}
}
}
}
Expand All @@ -353,6 +373,7 @@ mod tests {
use std::pin::Pin;
use std::str::FromStr;
use test_env_log::test;
use tokio::net::TcpListener;

#[test(tokio::test)]
async fn no_configuration() {
Expand Down Expand Up @@ -438,7 +459,7 @@ mod tests {
#[test(tokio::test)]
async fn startup_reload_schema() {
let graph_factory = create_mock_graph_factory(2);
let (server_factory, shutdown_receivers) = create_mock_server_factory(1);
let (server_factory, shutdown_receivers) = create_mock_server_factory(2);
let schema = include_str!("testdata/supergraph.graphql");

assert!(matches!(
Expand Down Expand Up @@ -471,12 +492,12 @@ mod tests {
.await,
Ok(()),
));
assert_eq!(shutdown_receivers.lock().len(), 1);
assert_eq!(shutdown_receivers.lock().len(), 2);
}

#[test(tokio::test)]
async fn startup_reload_configuration() {
let graph_factory = create_mock_graph_factory(1);
let graph_factory = create_mock_graph_factory(2);
let (server_factory, shutdown_receivers) = create_mock_server_factory(2);

assert!(matches!(
Expand Down Expand Up @@ -666,16 +687,26 @@ mod tests {
.expect_create()
.times(expect_times_called)
.returning(
move |_: Arc<MockMyFetcher>, configuration: Arc<Configuration>| {
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
move |_: Arc<MockMyFetcher>,
configuration: Arc<Configuration>,
listener: Option<TcpListener>| {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
shutdown_receivers_clone.lock().push(shutdown_receiver);

let server = async move {
Ok(if let Some(l) = listener {
l
} else {
tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap()
})
};

Box::pin(async move {
HttpServerHandle {
Ok(HttpServerHandle::new(
shutdown_sender,
server_future: future::ready(Ok(())).boxed(),
listen_address: configuration.server.listen,
}
Box::pin(server),
configuration.server.listen,
))
})
},
);
Expand Down

0 comments on commit e0f7070

Please sign in to comment.