Skip to content

Commit

Permalink
Adding graceful shutdown to server.
Browse files Browse the repository at this point in the history
Fixes #1976.
  • Loading branch information
nmittler committed Jul 10, 2023
1 parent 1482e65 commit e7a7604
Show file tree
Hide file tree
Showing 8 changed files with 606 additions and 294 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ cfg-if = "1"
clap = { version = "4.0", default-features = false }
console = "0.15.0"
data-encoding = "2.2.0"
drain = "0.1.1"
enum-as-inner = "0.6"
idna = "0.4.0"
ipconfig = "0.3.0"
Expand Down
3 changes: 2 additions & 1 deletion crates/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ path = "src/lib.rs"
async-trait.workspace = true
bytes.workspace = true
cfg-if.workspace = true
drain.workspace = true
enum-as-inner.workspace = true
futures-executor = { workspace = true, default-features = false, features = ["std"] }
futures-util = { workspace = true, default-features = false, features = ["std"] }
Expand All @@ -86,7 +87,7 @@ serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
time.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["net", "sync"] }
tokio = { workspace = true, features = ["macros", "net", "sync"] }
tokio-openssl = { workspace = true, optional = true }
tokio-rustls = { workspace = true, optional = true }
toml.workspace = true
Expand Down
27 changes: 19 additions & 8 deletions crates/server/src/server/https_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::{io, net::SocketAddr, sync::Arc};

use bytes::{Bytes, BytesMut};
use drain::Watch;
use futures_util::lock::Mutex;
use h2::server;
use tokio::io::{AsyncRead, AsyncWrite};
Expand All @@ -28,6 +29,7 @@ pub(crate) async fn h2_handler<T, I>(
io: I,
src_addr: SocketAddr,
dns_hostname: Option<Arc<str>>,
drain: Watch,
) where
T: RequestHandler,
I: AsyncRead + AsyncWrite + Unpin,
Expand All @@ -45,13 +47,22 @@ pub(crate) async fn h2_handler<T, I>(

// Accept all inbound HTTP/2.0 streams sent over the
// connection.
while let Some(next_request) = h2.accept().await {
let (request, respond) = match next_request {
Ok(next_request) => next_request,
Err(err) => {
warn!("error accepting request {}: {}", src_addr, err);
return;
}
loop {
let (request, respond) = tokio::select! {
result = h2.accept() => match result {
Some(Ok(next_request)) => next_request,
Some(Err(err)) => {
warn!("error accepting request {}: {}", src_addr, err);
return;
}
None => {
return;
}
},
_ = drain.clone().signaled() => {
// A graceful shutdown was initiated.
return
},
};

debug!("Received request: {:#?}", request);
Expand Down Expand Up @@ -80,7 +91,7 @@ async fn handle_request<T>(
}

#[derive(Clone)]
struct HttpsResponseHandle(Arc<Mutex<::h2::server::SendResponse<Bytes>>>);
struct HttpsResponseHandle(Arc<Mutex<server::SendResponse<Bytes>>>);

#[async_trait::async_trait]
impl ResponseHandler for HttpsResponseHandle {
Expand Down
26 changes: 19 additions & 7 deletions crates/server/src/server/quic_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::{io, net::SocketAddr, sync::Arc};

use bytes::{Bytes, BytesMut};
use drain::Watch;
use futures_util::lock::Mutex;
use tracing::{debug, warn};
use trust_dns_proto::{
Expand All @@ -30,6 +31,7 @@ pub(crate) async fn quic_handler<T>(
mut quic_streams: QuicStreams,
src_addr: SocketAddr,
_dns_hostname: Option<Arc<str>>,
drain: Watch,
) -> Result<(), ProtoError>
where
T: RequestHandler,
Expand All @@ -38,13 +40,23 @@ where
let mut max_requests = 100u32;

// Accept all inbound quic streams sent over the connection.
while let Some(next_request) = quic_streams.next().await {
let mut request_stream = match next_request {
Ok(next_request) => next_request,
Err(err) => {
warn!("error accepting request {}: {}", src_addr, err);
return Err(err);
}
let drain = drain.clone();
loop {
let mut request_stream = tokio::select! {
result = quic_streams.next() => match result {
Some(Ok(next_request)) => next_request,
Some(Err(err)) => {
warn!("error accepting request {}: {}", src_addr, err);
return Err(err);
}
None => {
break;
}
},
_ = drain.clone().signaled() => {
// A graceful shutdown was initiated.
break;
},
};

let request = request_stream.receive_bytes().await?;
Expand Down

0 comments on commit e7a7604

Please sign in to comment.