Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding graceful shutdown to server #1977

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ cfg-if = "1"
clap = { version = "4.0", default-features = false }
console = "0.15.0"
data-encoding = "2.2.0"
drain = "0.1.1"
enum-as-inner = "0.6"
idna = "0.4.0"
ipconfig = "0.3.0"
Expand Down
3 changes: 2 additions & 1 deletion crates/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ path = "src/lib.rs"
async-trait.workspace = true
bytes.workspace = true
cfg-if.workspace = true
drain.workspace = true
enum-as-inner.workspace = true
futures-executor = { workspace = true, default-features = false, features = ["std"] }
futures-util = { workspace = true, default-features = false, features = ["std"] }
Expand All @@ -86,7 +87,7 @@ serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
time.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["net", "sync"] }
tokio = { workspace = true, features = ["macros", "net", "sync"] }
tokio-openssl = { workspace = true, optional = true }
tokio-rustls = { workspace = true, optional = true }
toml.workspace = true
Expand Down
27 changes: 19 additions & 8 deletions crates/server/src/server/https_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::{io, net::SocketAddr, sync::Arc};

use bytes::{Bytes, BytesMut};
use drain::Watch;
use futures_util::lock::Mutex;
use h2::server;
use tokio::io::{AsyncRead, AsyncWrite};
Expand All @@ -28,6 +29,7 @@ pub(crate) async fn h2_handler<T, I>(
io: I,
src_addr: SocketAddr,
dns_hostname: Option<Arc<str>>,
shutdown: Watch,
) where
T: RequestHandler,
I: AsyncRead + AsyncWrite + Unpin,
Expand All @@ -45,13 +47,22 @@ pub(crate) async fn h2_handler<T, I>(

// Accept all inbound HTTP/2.0 streams sent over the
// connection.
while let Some(next_request) = h2.accept().await {
let (request, respond) = match next_request {
Ok(next_request) => next_request,
Err(err) => {
warn!("error accepting request {}: {}", src_addr, err);
return;
}
loop {
let (request, respond) = tokio::select! {
result = h2.accept() => match result {
Some(Ok(next_request)) => next_request,
Some(Err(err)) => {
warn!("error accepting request {}: {}", src_addr, err);
return;
}
None => {
return;
}
},
_ = shutdown.clone().signaled() => {
// A graceful shutdown was initiated.
return
},
};

debug!("Received request: {:#?}", request);
Expand Down Expand Up @@ -80,7 +91,7 @@ async fn handle_request<T>(
}

#[derive(Clone)]
struct HttpsResponseHandle(Arc<Mutex<::h2::server::SendResponse<Bytes>>>);
struct HttpsResponseHandle(Arc<Mutex<server::SendResponse<Bytes>>>);

#[async_trait::async_trait]
impl ResponseHandler for HttpsResponseHandle {
Expand Down
25 changes: 18 additions & 7 deletions crates/server/src/server/quic_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::{io, net::SocketAddr, sync::Arc};

use bytes::{Bytes, BytesMut};
use drain::Watch;
use futures_util::lock::Mutex;
use tracing::{debug, warn};
use trust_dns_proto::{
Expand All @@ -30,6 +31,7 @@ pub(crate) async fn quic_handler<T>(
mut quic_streams: QuicStreams,
src_addr: SocketAddr,
_dns_hostname: Option<Arc<str>>,
shutdown: Watch,
) -> Result<(), ProtoError>
where
T: RequestHandler,
Expand All @@ -38,13 +40,22 @@ where
let mut max_requests = 100u32;

// Accept all inbound quic streams sent over the connection.
while let Some(next_request) = quic_streams.next().await {
let mut request_stream = match next_request {
Ok(next_request) => next_request,
Err(err) => {
warn!("error accepting request {}: {}", src_addr, err);
return Err(err);
}
loop {
let mut request_stream = tokio::select! {
result = quic_streams.next() => match result {
Some(Ok(next_request)) => next_request,
Some(Err(err)) => {
warn!("error accepting request {}: {}", src_addr, err);
return Err(err);
}
None => {
break;
}
},
_ = shutdown.clone().signaled() => {
// A graceful shutdown was initiated.
break;
},
};

let request = request_stream.receive_bytes().await?;
Expand Down
Loading
Loading