Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- name: Setup Rust
uses: dtolnay/rust-toolchain@stable
with:
target: wasm32-wasi
target: wasm32-wasip1
components: rustfmt, clippy

- name: Run cargo-audit binary crate
Expand Down
2 changes: 1 addition & 1 deletion crates/http-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ clickhouse = { version = "0.13", optional = true }
chrono = "0.4"
async-trait = "0.1"
wasmtime-wasi-http = "20.0.2"
hyper-util = "0.1"
hyper-util = { version = "0.1", features = ["server", "server-graceful"] }
http-body-util = "0.1"
shellflip = {workspace = true}
bytes = "1.6"
Expand Down
104 changes: 53 additions & 51 deletions crates/http-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ use runtime::{
WasmEngine, WasmEngineBuilder,
};
use secret::SecretStrategy;
use shellflip::ShutdownHandle;
use shellflip::{ShutdownHandle, ShutdownSignal};
use smol_str::SmolStr;
use state::HttpState;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
time::error::Elapsed,
};
use tokio::{net::TcpListener, time::error::Elapsed};
pub use wasmtime_wasi_http::body::HyperOutgoingBody;

pub mod executor;
Expand Down Expand Up @@ -107,30 +103,62 @@ where
tracing::info!("Listening on http://{}", listen_addr);
let mut backoff = 1;
let self_ = Arc::new(self);
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let mut signal = config
.cancel
.upgrade()
.map(|s| ShutdownSignal::from(s.as_ref()))
.unwrap_or_default();

loop {
match listener.accept().await {
Ok((stream, _)) => {
tracing::debug!(remote=?stream.peer_addr(), "new http connection");
let connection = self_.clone();
if let Some(cancel) = config.cancel.upgrade() {
tokio::spawn(connection.serve(stream, cancel));
backoff = 1;
} else {
tracing::trace!("weak cancel handler");
backoff *= 2;
}
}
Err(error) => {
tracing::warn!(cause=?error, "http accept error");
tokio::time::sleep(Duration::from_millis(backoff * 100)).await;
if backoff > config.backoff {
backoff = 1;
} else {
backoff *= 2;
tokio::select! {
conn = listener.accept() => {
match conn {
Ok((stream, _)) => {
tracing::debug!(remote=?stream.peer_addr(), "new http connection");
let connection = self_.clone();
use tracing::Instrument;
let io = TokioIo::new(stream);

let service = service_fn(move |req| {
let self_ = connection.clone();
let request_id = remote_traceparent(&req);
async move {
self_
.handle_request(&request_id, req)
.instrument(tracing::debug_span!("http", request_id))
.await
}
});

let connection = http1::Builder::new().keep_alive(true).serve_connection(io, service);
let connection = graceful.watch(connection);
tokio::spawn(async move {
if let Err(error) = connection.await {
tracing::warn!(cause=?error, "Error serving connection");
}
});
}
Err(error) => {
tracing::warn!(cause=?error, "http accept error");
tokio::time::sleep(Duration::from_millis(backoff * 100)).await;
if backoff > config.backoff {
backoff = 1;
} else {
backoff *= 2;
}
}
}
},
_ = signal.on_shutdown() => {
tracing::info!("Shutting down http service");
break;
}
}
}

graceful.shutdown().await;
Ok(())
}

fn configure_engine(builder: &mut WasmEngineBuilder<Self::State>) -> Result<()> {
Expand Down Expand Up @@ -170,32 +198,6 @@ where
T::Executor: HttpExecutor + Send + Sync,
U: SecretStrategy,
{
async fn serve<S>(self: Arc<Self>, stream: S, _cancel: Arc<ShutdownHandle>)
where
S: AsyncRead + AsyncWrite + Unpin,
{
use tracing::Instrument;
let io = TokioIo::new(stream);

let service = service_fn(move |req| {
let self_ = self.clone();
let request_id = remote_traceparent(&req);
async move {
self_
.handle_request(&request_id, req)
.instrument(tracing::debug_span!("http", request_id))
.await
}
});
if let Err(error) = http1::Builder::new()
.keep_alive(true)
.serve_connection(io, service)
.await
{
tracing::warn!(cause=?error, "Error serving connection");
}
}

/// handle HTTP request.
async fn handle_request<B>(
&self,
Expand Down