Skip to content

Commit

Permalink
Created a moduleinfo struct to shorten execute signature/params
Browse files Browse the repository at this point in the history
Closes #80

Signed-off-by: Nitish Malhotra <nitish.malhotra@gmail.com>
  • Loading branch information
nitishm committed Aug 12, 2021
1 parent f97c804 commit 65bd275
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 48 deletions.
20 changes: 12 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,22 @@ impl Router {
"/healthz" => Ok(Response::new(Body::from("OK"))),
_ => match self.module_store.handler_for_path(uri_path).await {
Ok(h) => {
let info = ModuleInfo::new(
h.entrypoint.as_str(),
client_addr,
self.cache_config_path.clone(),
self.module_cache.clone(),
self.base_log_dir.clone(),
self.default_host.to_owned(),
self.use_tls,
self.global_env_vars.clone(),
);

let res = h
.module
.execute(
h.entrypoint.as_str(),
req,
client_addr,
&self.cache_config_path,
&self.module_cache,
&self.base_log_dir,
self.default_host.to_owned(),
self.use_tls,
self.global_env_vars.clone(),
info,
)
.await;
Ok(res)
Expand Down
99 changes: 59 additions & 40 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,42 @@ pub const DEFAULT_BINDLE_SERVER: &str = "http://localhost:8080/v1";
const WASM_LAYER_CONTENT_TYPE: &str = "application/vnd.wasm.content.layer.v1+wasm";
const STDERR_FILE: &str = "module.stderr";

pub struct ModuleInfo {
pub entrypoint: String,
pub client_addr: SocketAddr,
pub cache_config_path: PathBuf,
pub module_cache_dir: PathBuf,
pub base_log_dir: PathBuf,
pub default_host: String,
pub use_tls: bool,
pub env_vars: HashMap<String, String>,
}

impl ModuleInfo {
pub fn new(
entrypoint: &str,
client_addr: SocketAddr,
cache_config_path: PathBuf,
module_cache_dir: PathBuf,
base_log_dir: PathBuf,
default_host: String,
use_tls: bool,
env_vars: HashMap<String, String>,
) -> Self {

ModuleInfo {
entrypoint: String::from(entrypoint),
client_addr,
cache_config_path,
module_cache_dir,
base_log_dir,
default_host,
use_tls,
env_vars,
}
}
}

/// An internal representation of a mapping from a URI fragment to a function in a module.
#[derive(Clone)]
pub struct RouteEntry {
Expand Down Expand Up @@ -141,43 +177,21 @@ impl Module {
/// [`id` method](Module::id)) for its name. The log will be placed in that directory at
/// `module.stderr`
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", skip(self, entrypoint, req, client_addr, cache_config_path, module_cache_dir, base_log_dir, default_host), fields(route = %self.route, module = %self.module))]
pub async fn execute(
&self,
entrypoint: &str,
req: Request<Body>,
client_addr: SocketAddr,
cache_config_path: &Path,
module_cache_dir: &Path,
base_log_dir: &Path,
default_host: String,
use_tls: bool,
env_vars: HashMap<String, String>,
) -> Response<Body> {
#[instrument(level = "trace", skip(self, req, info), fields(route = %self.route, module = %self.module))]
pub async fn execute(&self, req: Request<Body>, info: ModuleInfo) -> Response<Body> {
// Read the parts in here
let (parts, body) = req.into_parts();
let data = hyper::body::to_bytes(body)
.await
.unwrap_or_default()
.to_vec();
let ep = entrypoint.to_owned();
let me = self.clone();
// Get owned copies of the various paths to pass into the thread
let cccp = cache_config_path.to_owned();
let mcd = module_cache_dir.to_owned();
let bld = base_log_dir.to_owned();
let res = match tokio::task::spawn_blocking(move || {
me.run_wasm(
&ep,
&parts,
data,
client_addr,
&cccp,
&mcd,
&bld,
default_host.as_str(),
use_tls,
env_vars,
info,
)
})
.await
Expand Down Expand Up @@ -534,31 +548,35 @@ impl Module {
// the status code on its own.
//
// TODO: Waaaay too many args
/*
pub entrypoint: String,
pub client_addr: SocketAddr,
pub cache_config_path: PathBuf,
pub module_cache_dir: PathBuf,
pub base_log_dir: PathBuf,
pub default_host: String,
pub use_tls: bool,
pub env_vars: HashMap<String, String>,
*/

#[allow(clippy::too_many_arguments)]
#[instrument(level = "info", skip(self, req, body, client_addr, cache_config_path, cache_dir, base_log_dir, default_host), fields(uri = %req.uri, module = %self.module))]
#[instrument(level = "info", skip(self, req, body, info), fields(uri = %req.uri, module = %self.module))]
fn run_wasm(
&self,
entrypoint: &str,
req: &Parts,
body: Vec<u8>,
client_addr: SocketAddr,
cache_config_path: &Path,
cache_dir: &Path,
base_log_dir: &Path,
default_host: &str,
use_tls: bool,
env: HashMap<String, String>,
info: ModuleInfo,
) -> Result<Response<Body>, anyhow::Error> {
let startup_span = tracing::info_span!("module instantiation").entered();
let uri_path = req.uri.path();
let headers = self.build_headers(req, body.len(), client_addr, default_host, use_tls, env);
let headers = self.build_headers(req, body.len(), info.client_addr, info.default_host.as_str(), info.use_tls, info.env_vars);
let stdin = ReadPipe::from(body);
let stdout_buf: Vec<u8> = vec![];
let stdout_mutex = Arc::new(RwLock::new(stdout_buf));
let stdout = WritePipe::from_shared(stdout_mutex.clone());

// Make sure the directory exists
let log_dir = base_log_dir.join(self.id());
let log_dir = info.base_log_dir.join(self.id());
tracing::info!(log_dir = %log_dir.display(), "Using log dir");
std::fs::create_dir_all(&log_dir)?;
// Open a file for appending. Right now this will just keep appending as there is no log
Expand Down Expand Up @@ -610,22 +628,23 @@ impl Module {

let ctx = builder.build();

let (mut store, engine) = self.new_store_and_engine(cache_config_path, ctx)?;
let (mut store, engine) = self.new_store_and_engine(&info.cache_config_path, ctx)?;
let mut linker = Linker::new(&engine);
wasmtime_wasi::add_to_linker(&mut linker, |cx| cx)?;

let http = wasi_experimental_http_wasmtime::HttpCtx::new(self.allowed_hosts.clone(), None)?;
http.add_to_linker(&mut linker)?;

let module = self.load_cached_module(&store, cache_dir)?;
let module = self.load_cached_module(&store, &info.module_cache_dir)?;
let instance = linker.instantiate(&mut store, &module)?;

// Manually drop the span so we get instantiation time
drop(startup_span);

let ep = info.entrypoint.to_owned();
// This shouldn't error out, because we already know there is a match.
let start = instance.get_func(&mut store, entrypoint).ok_or_else(|| {
anyhow::anyhow!("No such function '{}' in {}", entrypoint, self.module)
let start = instance.get_func(&mut store, info.entrypoint.as_str()).ok_or_else(|| {
anyhow::anyhow!("No such function '{}' in {}", ep, self.module)
})?;

tracing::trace!("Calling Wasm entry point");
Expand Down

0 comments on commit 65bd275

Please sign in to comment.