Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/http-service/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ where
.instance("gcore:fastedge/http-handler")
.ok_or_else(|| anyhow!("gcore:fastedge/http-handler instance not found"))?
.typed_func::<(fastedge::http::Request,), (fastedge::http::Response,)>("process")?;
let (resp,) = match func.call_async(&mut store, (request,)).await {
let duration = Duration::from_millis(store.data().timeout);
let func = tokio::time::timeout(duration, func.call_async(&mut store, (request,)));
let (resp,) = match func.await? {
Ok(res) => res,
Err(error) => {
// log to application logger error
Expand Down
12 changes: 8 additions & 4 deletions crates/http-service/src/executor/wasi_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,14 @@ where
)
.await?;

if let Err(e) = proxy
.wasi_http_incoming_handler()
.call_handle(&mut store, req, out)
.await
let duration = Duration::from_millis(store.data().timeout);
if let Err(e) = tokio::time::timeout(
duration,
proxy
.wasi_http_incoming_handler()
.call_handle(&mut store, req, out),
)
.await?
{
error!(cause=?e, "incoming handler");
return Err(e);
Expand Down
17 changes: 17 additions & 0 deletions crates/http-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use smol_str::SmolStr;
use state::HttpState;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tokio::time::error::Elapsed;
use tracing::Instrument;
use wasi_common::I32Exit;
use wasmtime::Trap;
Expand Down Expand Up @@ -359,6 +360,22 @@ where
.boxed(),
),
}
} else if let Some(_elapsed) = root_cause.downcast_ref::<Elapsed>() {
(
FASTEDGE_EXECUTION_TIMEOUT,
AppResult::TIMEOUT,
Full::new(Bytes::from("fastedge: Execution timeout"))
.map_err(|never| match never {})
.boxed(),
)
} else if root_cause.to_string().ends_with("deadline has elapsed") {
(
FASTEDGE_EXECUTION_TIMEOUT,
AppResult::TIMEOUT,
Full::new(Bytes::from("fastedge: Execution timeout"))
.map_err(|never| match never {})
.boxed(),
)
} else {
(
FASTEDGE_INTERNAL_ERROR,
Expand Down
30 changes: 3 additions & 27 deletions crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ use http::request::Parts;
use http::Request;
use smol_str::SmolStr;
use std::borrow::Cow;
use std::sync::mpsc::TryRecvError;
use std::thread;
use wasmtime_environ::wasmparser::{Encoding, Parser, Payload};
use wasmtime_wasi_http::bindings::http::types::ErrorCode;
use wasmtime_wasi_http::types::{
default_send_request, HostFutureIncomingResponse, OutgoingRequest,
};

pub const DEFAULT_EPOCH_TICK_INTERVAL: u64 = 10;

const PREVIEW1_ADAPTER: &[u8] = include_bytes!("adapters/wasi_snapshot_preview1.reactor.wasm");

#[derive(PartialEq, Copy, Clone, Debug)]
Expand Down Expand Up @@ -78,6 +78,7 @@ pub struct Data<T> {
wasi: Wasi,
// memory usage limiter
store_limits: ProxyLimiter,
pub timeout: u64,
table: ResourceTable,
pub logger: Option<Logger>,
http: WasiHttpCtx,
Expand Down Expand Up @@ -235,8 +236,6 @@ pub type ComponentLinker<T> = wasmtime::component::Linker<Data<T>>;
/// An alias for [`wasmtime::Linker`]
pub type ModuleLinker<T> = wasmtime::Linker<Data<T>>;

pub const DEFAULT_EPOCH_TICK_INTERVAL: Duration = Duration::from_millis(10);

/// An `WasmEngine` is a global context for the initialization and execution of WASM application.
pub struct WasmEngine<T> {
inner: Engine,
Expand All @@ -245,8 +244,6 @@ pub struct WasmEngine<T> {

// WASI-NN global Graph Registry
graph_registry: CachedGraphRegistry,
// Matching receiver closes on drop
_epoch_tick_handler: std::sync::mpsc::Sender<()>,
}

/// A builder interface for configuring a new [`WasmEngine`].
Expand All @@ -256,7 +253,6 @@ pub struct WasmEngineBuilder<T> {
engine: Engine,
component_linker: ComponentLinker<T>,
module_linker: ModuleLinker<T>,
epoch_tick_interval: Duration,
}

impl<T: Send + Sync> WasmEngine<T> {
Expand Down Expand Up @@ -292,7 +288,6 @@ impl<T: Send + Sync> WasmEngineBuilder<T> {
engine: Engine::clone(engine),
component_linker,
module_linker,
epoch_tick_interval: DEFAULT_EPOCH_TICK_INTERVAL,
})
}

Expand All @@ -304,32 +299,13 @@ impl<T: Send + Sync> WasmEngineBuilder<T> {
&mut self.module_linker
}

fn spawn_epoch_ticker(&self) -> std::sync::mpsc::Sender<()> {
let engine = self.engine.clone();
let interval = self.epoch_tick_interval;
let (send, recv) = std::sync::mpsc::channel();
thread::spawn(move || loop {
thread::sleep(interval);
match recv.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
break;
}
Err(TryRecvError::Empty) => {}
}
engine.increment_epoch();
});
send
}

/// Builds an [`WasmEngine`] from this builder.
pub fn build(self) -> WasmEngine<T> {
let handler = self.spawn_epoch_ticker();
WasmEngine {
inner: self.engine,
component_linker: self.component_linker,
module_linker: self.module_linker,
graph_registry: CachedGraphRegistry::new(),
_epoch_tick_handler: handler,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/runtime/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use wasmtime_wasi::WasiCtxBuilder;
use wasmtime_wasi_http::WasiHttpCtx;
use wasmtime_wasi_nn::WasiNnCtx;

use crate::{Data, Wasi, WasiVersion};
use crate::{Data, Wasi, WasiVersion, DEFAULT_EPOCH_TICK_INTERVAL};

use crate::limiter::ProxyLimiter;
use crate::logger::Logger;
Expand Down Expand Up @@ -225,6 +225,7 @@ impl StoreBuilder {
inner,
wasi,
store_limits: self.store_limits,
timeout: (self.max_duration + 1) * DEFAULT_EPOCH_TICK_INTERVAL,
table,
logger,
http: WasiHttpCtx,
Expand Down