diff --git a/crates/http-service/src/executor/mod.rs b/crates/http-service/src/executor/mod.rs index 9af50dc..32e3d76 100644 --- a/crates/http-service/src/executor/mod.rs +++ b/crates/http-service/src/executor/mod.rs @@ -162,7 +162,9 @@ where .instance("gcore:fastedge/http-handler") .ok_or_else(|| anyhow!("gcore:fastedge/http-handler instance not found"))? .typed_func::<(fastedge::http::Request,), (fastedge::http::Response,)>("process")?; - let (resp,) = match func.call_async(&mut store, (request,)).await { + let duration = Duration::from_millis(store.data().timeout); + let func = tokio::time::timeout(duration, func.call_async(&mut store, (request,))); + let (resp,) = match func.await? { Ok(res) => res, Err(error) => { // log to application logger error diff --git a/crates/http-service/src/executor/wasi_http.rs b/crates/http-service/src/executor/wasi_http.rs index 7adc65d..05c6ae1 100644 --- a/crates/http-service/src/executor/wasi_http.rs +++ b/crates/http-service/src/executor/wasi_http.rs @@ -157,10 +157,14 @@ where ) .await?; - if let Err(e) = proxy - .wasi_http_incoming_handler() - .call_handle(&mut store, req, out) - .await + let duration = Duration::from_millis(store.data().timeout); + if let Err(e) = tokio::time::timeout( + duration, + proxy + .wasi_http_incoming_handler() + .call_handle(&mut store, req, out), + ) + .await? { error!(cause=?e, "incoming handler"); return Err(e); diff --git a/crates/http-service/src/lib.rs b/crates/http-service/src/lib.rs index 7c90aed..ea55b7c 100644 --- a/crates/http-service/src/lib.rs +++ b/crates/http-service/src/lib.rs @@ -29,6 +29,7 @@ use smol_str::SmolStr; use state::HttpState; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; +use tokio::time::error::Elapsed; use tracing::Instrument; use wasi_common::I32Exit; use wasmtime::Trap; @@ -359,6 +360,22 @@ where .boxed(), ), } + } else if let Some(_elapsed) = root_cause.downcast_ref::() { + ( + FASTEDGE_EXECUTION_TIMEOUT, + AppResult::TIMEOUT, + Full::new(Bytes::from("fastedge: Execution timeout")) + .map_err(|never| match never {}) + .boxed(), + ) + } else if root_cause.to_string().ends_with("deadline has elapsed") { + ( + FASTEDGE_EXECUTION_TIMEOUT, + AppResult::TIMEOUT, + Full::new(Bytes::from("fastedge: Execution timeout")) + .map_err(|never| match never {}) + .boxed(), + ) } else { ( FASTEDGE_INTERNAL_ERROR, diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index d5592a2..ad65131 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -31,14 +31,14 @@ use http::request::Parts; use http::Request; use smol_str::SmolStr; use std::borrow::Cow; -use std::sync::mpsc::TryRecvError; -use std::thread; use wasmtime_environ::wasmparser::{Encoding, Parser, Payload}; use wasmtime_wasi_http::bindings::http::types::ErrorCode; use wasmtime_wasi_http::types::{ default_send_request, HostFutureIncomingResponse, OutgoingRequest, }; +pub const DEFAULT_EPOCH_TICK_INTERVAL: u64 = 10; + const PREVIEW1_ADAPTER: &[u8] = include_bytes!("adapters/wasi_snapshot_preview1.reactor.wasm"); #[derive(PartialEq, Copy, Clone, Debug)] @@ -78,6 +78,7 @@ pub struct Data { wasi: Wasi, // memory usage limiter store_limits: ProxyLimiter, + pub timeout: u64, table: ResourceTable, pub logger: Option, http: WasiHttpCtx, @@ -235,8 +236,6 @@ pub type ComponentLinker = wasmtime::component::Linker>; /// An alias for [`wasmtime::Linker`] pub type ModuleLinker = wasmtime::Linker>; -pub const DEFAULT_EPOCH_TICK_INTERVAL: Duration = Duration::from_millis(10); - /// An `WasmEngine` is a global context for the initialization and execution of WASM application. pub struct WasmEngine { inner: Engine, @@ -245,8 +244,6 @@ pub struct WasmEngine { // WASI-NN global Graph Registry graph_registry: CachedGraphRegistry, - // Matching receiver closes on drop - _epoch_tick_handler: std::sync::mpsc::Sender<()>, } /// A builder interface for configuring a new [`WasmEngine`]. @@ -256,7 +253,6 @@ pub struct WasmEngineBuilder { engine: Engine, component_linker: ComponentLinker, module_linker: ModuleLinker, - epoch_tick_interval: Duration, } impl WasmEngine { @@ -292,7 +288,6 @@ impl WasmEngineBuilder { engine: Engine::clone(engine), component_linker, module_linker, - epoch_tick_interval: DEFAULT_EPOCH_TICK_INTERVAL, }) } @@ -304,32 +299,13 @@ impl WasmEngineBuilder { &mut self.module_linker } - fn spawn_epoch_ticker(&self) -> std::sync::mpsc::Sender<()> { - let engine = self.engine.clone(); - let interval = self.epoch_tick_interval; - let (send, recv) = std::sync::mpsc::channel(); - thread::spawn(move || loop { - thread::sleep(interval); - match recv.try_recv() { - Ok(_) | Err(TryRecvError::Disconnected) => { - break; - } - Err(TryRecvError::Empty) => {} - } - engine.increment_epoch(); - }); - send - } - /// Builds an [`WasmEngine`] from this builder. pub fn build(self) -> WasmEngine { - let handler = self.spawn_epoch_ticker(); WasmEngine { inner: self.engine, component_linker: self.component_linker, module_linker: self.module_linker, graph_registry: CachedGraphRegistry::new(), - _epoch_tick_handler: handler, } } } diff --git a/crates/runtime/src/store.rs b/crates/runtime/src/store.rs index 7a54a81..f954ae6 100644 --- a/crates/runtime/src/store.rs +++ b/crates/runtime/src/store.rs @@ -9,7 +9,7 @@ use wasmtime_wasi::WasiCtxBuilder; use wasmtime_wasi_http::WasiHttpCtx; use wasmtime_wasi_nn::WasiNnCtx; -use crate::{Data, Wasi, WasiVersion}; +use crate::{Data, Wasi, WasiVersion, DEFAULT_EPOCH_TICK_INTERVAL}; use crate::limiter::ProxyLimiter; use crate::logger::Logger; @@ -225,6 +225,7 @@ impl StoreBuilder { inner, wasi, store_limits: self.store_limits, + timeout: (self.max_duration + 1) * DEFAULT_EPOCH_TICK_INTERVAL, table, logger, http: WasiHttpCtx,