Skip to content

Commit

Permalink
Adding graceful shutdown to server.
Browse files Browse the repository at this point in the history
Fixes #1976.
  • Loading branch information
nmittler committed Jun 29, 2023
1 parent c4e92cf commit 80e6bcb
Show file tree
Hide file tree
Showing 3 changed files with 473 additions and 308 deletions.
30 changes: 20 additions & 10 deletions crates/server/src/server/https_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use std::{io, net::SocketAddr, sync::Arc};

use bytes::{Bytes, BytesMut};
use futures_util::lock::Mutex;
use h2::server;
use std::future::Future;
use std::{io, net::SocketAddr, sync::Arc};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, warn};
use trust_dns_proto::rr::Record;
Expand All @@ -28,6 +28,7 @@ pub(crate) async fn h2_handler<T, I>(
io: I,
src_addr: SocketAddr,
dns_hostname: Option<Arc<str>>,
drain: impl Future<Output = ()>,
) where
T: RequestHandler,
I: AsyncRead + AsyncWrite + Unpin,
Expand All @@ -45,13 +46,22 @@ pub(crate) async fn h2_handler<T, I>(

// Accept all inbound HTTP/2.0 streams sent over the
// connection.
while let Some(next_request) = h2.accept().await {
let (request, respond) = match next_request {
Ok(next_request) => next_request,
Err(err) => {
warn!("error accepting request {}: {}", src_addr, err);
return;
}
loop {
let next_request = tokio::select! {
result = h2.accept() => match result {
Some(Ok(next_request)) => next_request,
Some(Err(err)) => {
warn!("error accepting request {}: {}", src_addr, err);
return;
}
None => {
return;
}
},
_ = drain => {
// A graceful shutdown was initiated.
return
},
};

debug!("Received request: {:#?}", request);
Expand Down Expand Up @@ -80,7 +90,7 @@ async fn handle_request<T>(
}

#[derive(Clone)]
struct HttpsResponseHandle(Arc<Mutex<::h2::server::SendResponse<Bytes>>>);
struct HttpsResponseHandle(Arc<Mutex<server::SendResponse<Bytes>>>);

#[async_trait::async_trait]
impl ResponseHandler for HttpsResponseHandle {
Expand Down
28 changes: 19 additions & 9 deletions crates/server/src/server/quic_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use std::{io, net::SocketAddr, sync::Arc};

use bytes::{Bytes, BytesMut};
use futures_util::lock::Mutex;
use std::future::Future;
use std::{io, net::SocketAddr, sync::Arc};
use tracing::{debug, warn};
use trust_dns_proto::{
error::ProtoError,
Expand All @@ -30,6 +30,7 @@ pub(crate) async fn quic_handler<T>(
mut quic_streams: QuicStreams,
src_addr: SocketAddr,
_dns_hostname: Option<Arc<str>>,
drain: impl Future<Output = ()>,
) -> Result<(), ProtoError>
where
T: RequestHandler,
Expand All @@ -38,13 +39,22 @@ where
let mut max_requests = 100u32;

// Accept all inbound quic streams sent over the connection.
while let Some(next_request) = quic_streams.next().await {
let mut request_stream = match next_request {
Ok(next_request) => next_request,
Err(err) => {
warn!("error accepting request {}: {}", src_addr, err);
return Err(err);
}
loop {
let next_request = tokio::select! {
result = quic_streams.next() => match result {
Some(Ok(next_request)) => next_request,
Some(Err(err)) => {
warn!("error accepting request {}: {}", src_addr, err);
return Err(err);
}
None => {
break;
}
},
_ = drain => {
// A graceful shutdown was initiated.
break;
},
};

let request = request_stream.receive_bytes().await?;
Expand Down

0 comments on commit 80e6bcb

Please sign in to comment.