diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b286e1f..fc1c663 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -23,7 +23,7 @@ jobs: - name: Setup Rust uses: dtolnay/rust-toolchain@stable with: - target: wasm32-wasi + target: wasm32-wasip1 components: rustfmt, clippy - name: Run cargo-audit binary crate diff --git a/crates/http-service/Cargo.toml b/crates/http-service/Cargo.toml index 91103ef..f6e77a1 100644 --- a/crates/http-service/Cargo.toml +++ b/crates/http-service/Cargo.toml @@ -37,7 +37,7 @@ clickhouse = { version = "0.13", optional = true } chrono = "0.4" async-trait = "0.1" wasmtime-wasi-http = "20.0.2" -hyper-util = "0.1" +hyper-util = { version = "0.1", features = ["server", "server-graceful"] } http-body-util = "0.1" shellflip = {workspace = true} bytes = "1.6" diff --git a/crates/http-service/src/lib.rs b/crates/http-service/src/lib.rs index fb71bce..5d99078 100644 --- a/crates/http-service/src/lib.rs +++ b/crates/http-service/src/lib.rs @@ -24,14 +24,10 @@ use runtime::{ WasmEngine, WasmEngineBuilder, }; use secret::SecretStrategy; -use shellflip::ShutdownHandle; +use shellflip::{ShutdownHandle, ShutdownSignal}; use smol_str::SmolStr; use state::HttpState; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - net::TcpListener, - time::error::Elapsed, -}; +use tokio::{net::TcpListener, time::error::Elapsed}; pub use wasmtime_wasi_http::body::HyperOutgoingBody; pub mod executor; @@ -107,30 +103,62 @@ where tracing::info!("Listening on http://{}", listen_addr); let mut backoff = 1; let self_ = Arc::new(self); + let graceful = hyper_util::server::graceful::GracefulShutdown::new(); + let mut signal = config + .cancel + .upgrade() + .map(|s| ShutdownSignal::from(s.as_ref())) + .unwrap_or_default(); + loop { - match listener.accept().await { - Ok((stream, _)) => { - tracing::debug!(remote=?stream.peer_addr(), "new http connection"); - let connection = self_.clone(); - if let Some(cancel) = config.cancel.upgrade() { - tokio::spawn(connection.serve(stream, cancel)); - backoff = 1; - } else { - tracing::trace!("weak cancel handler"); - backoff *= 2; - } - } - Err(error) => { - tracing::warn!(cause=?error, "http accept error"); - tokio::time::sleep(Duration::from_millis(backoff * 100)).await; - if backoff > config.backoff { - backoff = 1; - } else { - backoff *= 2; + tokio::select! { + conn = listener.accept() => { + match conn { + Ok((stream, _)) => { + tracing::debug!(remote=?stream.peer_addr(), "new http connection"); + let connection = self_.clone(); + use tracing::Instrument; + let io = TokioIo::new(stream); + + let service = service_fn(move |req| { + let self_ = connection.clone(); + let request_id = remote_traceparent(&req); + async move { + self_ + .handle_request(&request_id, req) + .instrument(tracing::debug_span!("http", request_id)) + .await + } + }); + + let connection = http1::Builder::new().keep_alive(true).serve_connection(io, service); + let connection = graceful.watch(connection); + tokio::spawn(async move { + if let Err(error) = connection.await { + tracing::warn!(cause=?error, "Error serving connection"); + } + }); + } + Err(error) => { + tracing::warn!(cause=?error, "http accept error"); + tokio::time::sleep(Duration::from_millis(backoff * 100)).await; + if backoff > config.backoff { + backoff = 1; + } else { + backoff *= 2; + } + } } + }, + _ = signal.on_shutdown() => { + tracing::info!("Shutting down http service"); + break; } } } + + graceful.shutdown().await; + Ok(()) } fn configure_engine(builder: &mut WasmEngineBuilder) -> Result<()> { @@ -170,32 +198,6 @@ where T::Executor: HttpExecutor + Send + Sync, U: SecretStrategy, { - async fn serve(self: Arc, stream: S, _cancel: Arc) - where - S: AsyncRead + AsyncWrite + Unpin, - { - use tracing::Instrument; - let io = TokioIo::new(stream); - - let service = service_fn(move |req| { - let self_ = self.clone(); - let request_id = remote_traceparent(&req); - async move { - self_ - .handle_request(&request_id, req) - .instrument(tracing::debug_span!("http", request_id)) - .await - } - }); - if let Err(error) = http1::Builder::new() - .keep_alive(true) - .serve_connection(io, service) - .await - { - tracing::warn!(cause=?error, "Error serving connection"); - } - } - /// handle HTTP request. async fn handle_request( &self,