From 6f5390c011935ca45c3767636b2d868fcf068dec Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Tue, 2 Sep 2025 15:43:09 -0700 Subject: [PATCH 1/6] chore: drop dep on futures-core, ready! macro is in std --- Cargo.toml | 1 - src/future/delay.rs | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9edfd87..8dc1637 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,6 @@ default = ["json"] json = ["dep:serde", "dep:serde_json"] [dependencies] -futures-core.workspace = true http.workspace = true itoa.workspace = true pin-project-lite.workspace = true diff --git a/src/future/delay.rs b/src/future/delay.rs index 48d3b70..d8fcdbf 100644 --- a/src/future/delay.rs +++ b/src/future/delay.rs @@ -1,7 +1,6 @@ -use futures_core::ready; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use pin_project_lite::pin_project; From 08cb00f810e7d84e51cacfe833a9ef567799e376 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Tue, 2 Sep 2025 16:42:56 -0700 Subject: [PATCH 2/6] feat: event loop is now based on async-task --- Cargo.toml | 2 + src/lib.rs | 14 +++++- src/runtime/block_on.rs | 100 +++++++++++++++++----------------------- src/runtime/reactor.rs | 89 ++++++++++++++++++++++++----------- 4 files changed, 118 insertions(+), 87 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8dc1637..b4538ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ default = ["json"] json = ["dep:serde", "dep:serde_json"] [dependencies] +async-task.workspace = true http.workspace = true itoa.workspace = true pin-project-lite.workspace = true @@ -62,6 +63,7 @@ authors = [ [workspace.dependencies] anyhow = "1" +async-task = "4.7" cargo_metadata = "0.22" clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" diff --git a/src/lib.rs b/src/lib.rs index bb34042..9fdf435 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,5 @@ #![allow(async_fn_in_trait)] #![warn(future_incompatible, unreachable_pub)] -#![forbid(unsafe_code)] //#![deny(missing_debug_implementations)] //#![warn(missing_docs)] //#![forbid(rustdoc::missing_doc_code_examples)] @@ -55,15 +54,26 @@ //! These are unique capabilities provided by WASI 0.2, and because this library //! is specific to that are exposed from here. +// We need unsafe code in the runtime. +pub mod runtime; + +// All other mods do not require unsafe. +#[forbid(unsafe_code)] pub mod future; +#[forbid(unsafe_code)] #[macro_use] pub mod http; +#[forbid(unsafe_code)] pub mod io; +#[forbid(unsafe_code)] pub mod iter; +#[forbid(unsafe_code)] pub mod net; +#[forbid(unsafe_code)] pub mod rand; -pub mod runtime; +#[forbid(unsafe_code)] pub mod task; +#[forbid(unsafe_code)] pub mod time; pub use wstd_macro::attr_macro_http_server as http_server; diff --git a/src/runtime/block_on.rs b/src/runtime/block_on.rs index d38a9de..77645a9 100644 --- a/src/runtime/block_on.rs +++ b/src/runtime/block_on.rs @@ -2,14 +2,13 @@ use super::{Reactor, REACTOR}; use std::future::Future; use std::pin::pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll, Wake, Waker}; +use std::task::{Context, Poll, Waker}; -/// Start the event loop -pub fn block_on(fut: Fut) -> Fut::Output +/// Start the event loop. Blocks until the future +pub fn block_on(fut: F) -> F::Output where - Fut: Future, + F: Future + 'static, + F::Output: 'static, { // Construct the reactor let reactor = Reactor::new(); @@ -19,67 +18,52 @@ where panic!("cannot wstd::runtime::block_on inside an existing block_on!") } - // Pin the future so it can be polled - let mut fut = pin!(fut); + // Spawn the task onto the reactor. + let root_task = reactor.spawn(fut); - // Create a new context to be passed to the future. - let root = Arc::new(RootWaker::new()); - let waker = Waker::from(root.clone()); - let mut cx = Context::from_waker(&waker); + loop { + match reactor.pop_ready_list() { + None => { + if reactor.pending_pollables_is_empty() { + break; + } else { + reactor.block_on_pollables() + } + } + Some(runnable) => { + // Run the task popped from the head of the runlist. If the + // task re-inserts itself onto the runlist during execution, + // last_run_awake is a hint that guarantees us the runlist is + // nonempty. + let last_run_awake = runnable.run(); - // Either the future completes and we return, or some IO is happening - // and we wait. - let res = loop { - match fut.as_mut().poll(&mut cx) { - Poll::Ready(res) => break res, - Poll::Pending => { - // If some non-pollable based future has marked the root task - // as awake, reset and poll again. otherwise, block until a - // pollable wakes a future. - if root.is_awake() { + // If any task is ready for running, we perform a nonblocking + // check of pollables, giving any tasks waiting on a pollable + // a chance to wake. + if last_run_awake || !reactor.ready_list_is_empty() { reactor.nonblock_check_pollables(); - root.reset() - } else { - // If there are no futures awake or waiting on a WASI - // pollable, its impossible for the reactor to make - // progress, and the only valid behaviors are to sleep - // forever or panic. This should only be reachable if the - // user's Futures are implemented incorrectly. - if !reactor.nonempty_pending_pollables() { - panic!("reactor has no futures which are awake, or are waiting on a WASI pollable to be ready") - } + } else if !reactor.pending_pollables_is_empty() { + // If the runlist is empty, block until any of the pending + // pollables have woken a task, putting it back on the + // ready list reactor.block_on_pollables() + } else { + break; } } } - }; + } // Clear the singleton REACTOR.replace(None); - res -} - -/// This waker is used in the Context of block_on. If a Future executing in -/// the block_on calls context.wake(), it sets this boolean state so that -/// block_on's Future is polled again immediately, rather than waiting for -/// an external (WASI pollable) event before polling again. -struct RootWaker { - wake: AtomicBool, -} -impl RootWaker { - fn new() -> Self { - Self { - wake: AtomicBool::new(false), + // Get the result out of the root task + let mut root_task = pin!(root_task); + let mut noop_context = Context::from_waker(Waker::noop()); + match root_task.as_mut().poll(&mut noop_context) { + Poll::Ready(res) => res, + Poll::Pending => { + unreachable!( + "ready list empty, therefore root task should be ready. malformed root task?" + ) } } - fn is_awake(&self) -> bool { - self.wake.load(Ordering::Relaxed) - } - fn reset(&self) { - self.wake.store(false, Ordering::Relaxed); - } -} -impl Wake for RootWaker { - fn wake(self: Arc) { - self.wake.store(true, Ordering::Relaxed); - } } diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index efa1d80..e6e75fc 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -1,11 +1,12 @@ use super::REACTOR; +use async_task::{Runnable, Task}; use core::cell::RefCell; -use core::future; +use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll, Waker}; use slab::Slab; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::rc::Rc; use wasi::io::poll::Pollable; @@ -68,7 +69,7 @@ pub struct WaitFor { waitee: Waitee, needs_deregistration: bool, } -impl future::Future for WaitFor { +impl Future for WaitFor { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let reactor = Reactor::current(); @@ -91,15 +92,16 @@ impl Drop for WaitFor { /// Manage async system resources for WASI 0.2 #[derive(Debug, Clone)] pub struct Reactor { - inner: Rc>, + inner: Rc, } /// The private, internal `Reactor` implementation - factored out so we can take /// a lock of the whole. #[derive(Debug)] struct InnerReactor { - pollables: Slab, - wakers: HashMap, + pollables: RefCell>, + wakers: RefCell>, + ready_list: RefCell>, } impl Reactor { @@ -119,18 +121,19 @@ impl Reactor { /// Create a new instance of `Reactor` pub(crate) fn new() -> Self { Self { - inner: Rc::new(RefCell::new(InnerReactor { - pollables: Slab::new(), - wakers: HashMap::new(), - })), + inner: Rc::new(InnerReactor { + pollables: RefCell::new(Slab::new()), + wakers: RefCell::new(HashMap::new()), + ready_list: RefCell::new(VecDeque::new()), + }), } } /// The reactor tracks the set of WASI pollables which have an associated /// Future pending on their readiness. This function returns indicating /// that set of pollables is not empty. - pub(crate) fn nonempty_pending_pollables(&self) -> bool { - !self.inner.borrow().wakers.is_empty() + pub(crate) fn pending_pollables_is_empty(&self) -> bool { + self.inner.wakers.borrow().is_empty() } /// Block until at least one pending pollable is ready, waking a pending future. @@ -152,7 +155,7 @@ impl Reactor { pub(crate) fn nonblock_check_pollables(&self) { // If there are no pollables with associated pending futures, there is // no work to do here, so return immediately. - if !self.nonempty_pending_pollables() { + if self.pending_pollables_is_empty() { return; } // Lazily create a pollable which always resolves to ready. @@ -186,7 +189,8 @@ impl Reactor { where F: FnOnce(&[&Pollable]) -> Vec, { - let reactor = self.inner.borrow(); + let wakers = self.inner.wakers.borrow(); + let pollables = self.inner.pollables.borrow(); // We're about to wait for a number of pollables. When they wake we get // the *indexes* back for the pollables whose events were available - so @@ -194,12 +198,12 @@ impl Reactor { // We start by iterating over the pollables, and keeping note of which // pollable belongs to which waker - let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len()); - let mut targets = Vec::with_capacity(reactor.wakers.len()); - for (waitee, waker) in reactor.wakers.iter() { + let mut indexed_wakers = Vec::with_capacity(wakers.len()); + let mut targets = Vec::with_capacity(wakers.len()); + for (waitee, waker) in wakers.iter() { let pollable_index = waitee.pollable.0.key; indexed_wakers.push(waker); - targets.push(&reactor.pollables[pollable_index.0]); + targets.push(&pollables[pollable_index.0]); } // Now that we have that association, we're ready to check our targets for readiness. @@ -221,33 +225,64 @@ impl Reactor { /// Turn a Wasi [`Pollable`] into an [`AsyncPollable`] pub fn schedule(&self, pollable: Pollable) -> AsyncPollable { - let mut reactor = self.inner.borrow_mut(); - let key = EventKey(reactor.pollables.insert(pollable)); + let mut pollables = self.inner.pollables.borrow_mut(); + let key = EventKey(pollables.insert(pollable)); AsyncPollable(Rc::new(Registration { key })) } fn deregister_event(&self, key: EventKey) { - let mut reactor = self.inner.borrow_mut(); - reactor.pollables.remove(key.0); + let mut pollables = self.inner.pollables.borrow_mut(); + pollables.remove(key.0); } fn deregister_waitee(&self, waitee: &Waitee) { - let mut reactor = self.inner.borrow_mut(); - reactor.wakers.remove(waitee); + let mut wakers = self.inner.wakers.borrow_mut(); + wakers.remove(waitee); } fn ready(&self, waitee: &Waitee, waker: &Waker) -> bool { - let mut reactor = self.inner.borrow_mut(); - let ready = reactor + let ready = self + .inner .pollables + .borrow() .get(waitee.pollable.0.key.0) .expect("only live EventKey can be checked for readiness") .ready(); if !ready { - reactor.wakers.insert(waitee.clone(), waker.clone()); + self.inner + .wakers + .borrow_mut() + .insert(waitee.clone(), waker.clone()); } ready } + + /// Spawn a `Task` on the `Reactor`. + pub fn spawn(&self, fut: F) -> Task + where + F: Future + 'static, + T: 'static, + { + let this = self.clone(); + let schedule = move |runnable| this.inner.ready_list.borrow_mut().push_back(runnable); + + // SAFETY: + // we're using this exactly like async_task::spawn_local, except that + // the schedule function is not Send or Sync, because Runnable is not + // Send or Sync. This is safe because wasm32-wasip2 is always + // single-threaded. + let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) }; + self.inner.ready_list.borrow_mut().push_back(runnable); + task + } + + pub(super) fn pop_ready_list(&self) -> Option { + self.inner.ready_list.borrow_mut().pop_front() + } + + pub(super) fn ready_list_is_empty(&self) -> bool { + self.inner.ready_list.borrow().is_empty() + } } #[cfg(test)] From f312da6616a954a8a6ed3e17eef05094dc6717e3 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Thu, 4 Sep 2025 15:05:39 -0700 Subject: [PATCH 3/6] feat: tcp-echo-server uses tasks, test runner uses wasmtime cli, tcp-listener supports v6 The tcp-echo-server example has been rewritten to spawn the echo part of each accept into a task, which means it can accept new connections while other echos are in flight. The tcp-echo-server test has been rewritten to test that connections can be accepted while other echoes are in flight. The tcp-echo-server test has been rewritten to use wasmtime cli as a process, rather than use wasmtime as a crate. This drops wasmtime from the dev-dependencies of the workspace, which is good because it was running a quite out-of-date wasmtime. Fix up the missing conversions to/from std::net::SocketAddr in wstd::net::tcp_listener, so that we can send a Display impl of the listening address from the guest to host, and parse it out in the host (see get_listening_address) --- Cargo.toml | 3 - examples/tcp_echo_server.rs | 6 +- src/net/tcp_listener.rs | 68 ++++-- src/runtime/mod.rs | 12 + test-programs/artifacts/Cargo.toml | 3 - .../artifacts/tests/tcp_echo_server.rs | 208 +++++++++--------- 6 files changed, 175 insertions(+), 125 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b4538ab..772aff3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,9 +84,6 @@ test-programs = { path = "test-programs" } test-programs-artifacts = { path = "test-programs/artifacts" } ureq = { version = "2.12.1", default-features = false } wasi = "0.14.0" -wasmtime = "26" -wasmtime-wasi = "26" -wasmtime-wasi-http = "26" wstd = { path = "." } wstd-macro = { path = "macro", version = "=0.5.4" } diff --git a/examples/tcp_echo_server.rs b/examples/tcp_echo_server.rs index fdd21e9..f1dd895 100644 --- a/examples/tcp_echo_server.rs +++ b/examples/tcp_echo_server.rs @@ -12,7 +12,11 @@ async fn main() -> io::Result<()> { while let Some(stream) = incoming.next().await { let stream = stream?; println!("Accepted from: {}", stream.peer_addr()?); - io::copy(&stream, &stream).await?; + wstd::runtime::spawn(async move { + // If echo copy fails, we can ignore it. + let _ = io::copy(&stream, &stream).await; + }) + .detach(); } Ok(()) } diff --git a/src/net/tcp_listener.rs b/src/net/tcp_listener.rs index 7aedc71..99cdeb4 100644 --- a/src/net/tcp_listener.rs +++ b/src/net/tcp_listener.rs @@ -33,15 +33,8 @@ impl TcpListener { wasi::sockets::tcp_create_socket::create_tcp_socket(family).map_err(to_io_err)?; let network = wasi::sockets::instance_network::instance_network(); - let local_address = match addr { - SocketAddr::V4(addr) => { - let ip = addr.ip().octets(); - let address = (ip[0], ip[1], ip[2], ip[3]); - let port = addr.port(); - IpSocketAddress::Ipv4(Ipv4SocketAddress { port, address }) - } - SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpListener`"), - }; + let local_address = sockaddr_to_wasi(addr); + socket .start_bind(&network, local_address) .map_err(to_io_err)?; @@ -56,10 +49,11 @@ impl TcpListener { } /// Returns the local socket address of this listener. - // TODO: make this return an actual socket addr - pub fn local_addr(&self) -> io::Result { - let addr = self.socket.local_address().map_err(to_io_err)?; - Ok(format!("{addr:?}")) + pub fn local_addr(&self) -> io::Result { + self.socket + .local_address() + .map_err(to_io_err) + .map(sockaddr_from_wasi) } /// Returns an iterator over the connections being received on this listener. @@ -105,3 +99,51 @@ pub(super) fn to_io_err(err: ErrorCode) -> io::Error { _ => ErrorKind::Other.into(), } } + +fn sockaddr_from_wasi(addr: IpSocketAddress) -> std::net::SocketAddr { + use wasi::sockets::network::Ipv6SocketAddress; + match addr { + IpSocketAddress::Ipv4(Ipv4SocketAddress { address, port }) => { + std::net::SocketAddr::V4(std::net::SocketAddrV4::new( + std::net::Ipv4Addr::new(address.0, address.1, address.2, address.3), + port, + )) + } + IpSocketAddress::Ipv6(Ipv6SocketAddress { + address, + port, + flow_info, + scope_id, + }) => std::net::SocketAddr::V6(std::net::SocketAddrV6::new( + std::net::Ipv6Addr::new( + address.0, address.1, address.2, address.3, address.4, address.5, address.6, + address.7, + ), + port, + flow_info, + scope_id, + )), + } +} + +fn sockaddr_to_wasi(addr: std::net::SocketAddr) -> IpSocketAddress { + use wasi::sockets::network::Ipv6SocketAddress; + match addr { + std::net::SocketAddr::V4(addr) => { + let ip = addr.ip().octets(); + IpSocketAddress::Ipv4(Ipv4SocketAddress { + address: (ip[0], ip[1], ip[2], ip[3]), + port: addr.port(), + }) + } + std::net::SocketAddr::V6(addr) => { + let ip = addr.ip().segments(); + IpSocketAddress::Ipv6(Ipv6SocketAddress { + address: (ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), + port: addr.port(), + flow_info: addr.flowinfo(), + scope_id: addr.scope_id(), + }) + } + } +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 6b01a1f..e5d4b17 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -13,6 +13,7 @@ mod block_on; mod reactor; +pub use ::async_task::Task; pub use block_on::block_on; pub use reactor::{AsyncPollable, Reactor, WaitFor}; use std::cell::RefCell; @@ -22,3 +23,14 @@ use std::cell::RefCell; std::thread_local! { pub(crate) static REACTOR: RefCell> = const { RefCell::new(None) }; } + +/// Spawn a `Future` as a `Task` on the current `Reactor`. +/// +/// Panics if called from outside `block_on`. +pub fn spawn(fut: F) -> Task +where + F: std::future::Future + 'static, + T: 'static, +{ + Reactor::current().spawn(fut) +} diff --git a/test-programs/artifacts/Cargo.toml b/test-programs/artifacts/Cargo.toml index 696edda..7c69049 100644 --- a/test-programs/artifacts/Cargo.toml +++ b/test-programs/artifacts/Cargo.toml @@ -13,9 +13,6 @@ anyhow.workspace = true test-log.workspace = true test-programs-artifacts.workspace = true ureq.workspace = true -wasmtime.workspace = true -wasmtime-wasi.workspace = true -wasmtime-wasi-http.workspace = true [build-dependencies] cargo_metadata.workspace = true diff --git a/test-programs/artifacts/tests/tcp_echo_server.rs b/test-programs/artifacts/tests/tcp_echo_server.rs index 6ca2ea5..3cf7752 100644 --- a/test-programs/artifacts/tests/tcp_echo_server.rs +++ b/test-programs/artifacts/tests/tcp_echo_server.rs @@ -1,123 +1,121 @@ -use anyhow::{anyhow, Context, Result}; -use wasmtime::{ - component::{Component, Linker, ResourceTable}, - Config, Engine, Store, -}; -use wasmtime_wasi::{pipe::MemoryOutputPipe, WasiCtx, WasiView}; -use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; - -struct Ctx { - table: ResourceTable, - wasi: WasiCtx, - http: WasiHttpCtx, -} - -impl WasiView for Ctx { - fn table(&mut self) -> &mut ResourceTable { - &mut self.table - } - fn ctx(&mut self) -> &mut WasiCtx { - &mut self.wasi - } -} - -impl WasiHttpView for Ctx { - fn table(&mut self) -> &mut ResourceTable { - &mut self.table - } - fn ctx(&mut self) -> &mut WasiHttpCtx { - &mut self.http - } -} - -fn run_in_wasmtime(wasm: &[u8], stdout: Option) -> Result<()> { - let config = Config::default(); - let engine = Engine::new(&config).context("creating engine")?; - let component = Component::new(&engine, wasm).context("loading component")?; - - let mut linker: Linker = Linker::new(&engine); - wasmtime_wasi::add_to_linker_sync(&mut linker).context("add wasi to linker")?; - wasmtime_wasi_http::add_only_http_to_linker_sync(&mut linker) - .context("add wasi-http to linker")?; - - let mut builder = WasiCtx::builder(); - builder.inherit_stderr().inherit_network(); - let wasi = match stdout { - Some(stdout) => builder.stdout(stdout).build(), - None => builder.inherit_stdout().build(), - }; - let mut store = Store::new( - &engine, - Ctx { - table: ResourceTable::new(), - wasi, - http: WasiHttpCtx::new(), - }, - ); - - let instance = linker.instantiate(&mut store, &component)?; - let run_interface = instance - .get_export(&mut store, None, "wasi:cli/run@0.2.0") - .ok_or_else(|| anyhow!("wasi:cli/run missing?"))?; - let run_func_export = instance - .get_export(&mut store, Some(&run_interface), "run") - .ok_or_else(|| anyhow!("run export missing?"))?; - let run_func = instance - .get_typed_func::<(), (Result<(), ()>,)>(&mut store, &run_func_export) - .context("run as typed func")?; - - println!("entering wasm..."); - let (runtime_result,) = run_func.call(&mut store, ())?; - runtime_result.map_err(|()| anyhow!("run returned an error"))?; - println!("done"); - - Ok(()) -} +use anyhow::{Context, Result}; +use std::process::Command; #[test_log::test] fn tcp_echo_server() -> Result<()> { use std::io::{Read, Write}; use std::net::{Shutdown, TcpStream}; - use std::thread::sleep; - use std::time::Duration; println!("testing {}", test_programs_artifacts::TCP_ECHO_SERVER); - let wasm = std::fs::read(test_programs_artifacts::TCP_ECHO_SERVER).context("read wasm")?; - let pipe = wasmtime_wasi::pipe::MemoryOutputPipe::new(1024 * 1024); - let write_end = pipe.clone(); - let wasmtime_thread = std::thread::spawn(move || run_in_wasmtime(&wasm, Some(write_end))); + // Run the component in wasmtime + // -Sinherit-network required for sockets to work + let mut wasmtime_process = Command::new("wasmtime") + .arg("run") + .arg("-Sinherit-network") + .arg(test_programs_artifacts::TCP_ECHO_SERVER) + .stdout(std::process::Stdio::piped()) + .spawn()?; - 'wait: loop { - sleep(Duration::from_millis(100)); - for line in pipe.contents().split(|c| *c == b'\n') { - if line.starts_with(b"Listening on") { - break 'wait; - } - } - } + let addr = get_listening_address(wasmtime_process.stdout.take().expect("stdout is piped"))?; - let mut tcpstream = - TcpStream::connect("127.0.0.1:8080").context("connect to wasm echo server")?; - println!("connected to wasm echo server"); + println!("tcp echo server is listening on {addr:?}"); - const MESSAGE: &[u8] = b"hello, echoserver!\n"; + let mut sock1 = TcpStream::connect(addr).context("connect sock1")?; + println!("sock1 connected"); - tcpstream.write_all(MESSAGE).context("write to socket")?; - println!("wrote to echo server"); + let mut sock2 = TcpStream::connect(addr).context("connect sock2")?; + println!("sock2 connected"); - tcpstream.shutdown(Shutdown::Write)?; + const MESSAGE1: &[u8] = b"hello, echoserver!\n"; - let mut readback = Vec::new(); - tcpstream - .read_to_end(&mut readback) - .context("read from socket")?; + sock1.write_all(MESSAGE1).context("write to sock1")?; + println!("sock1 wrote to echo server"); - println!("read from wasm server"); - assert_eq!(MESSAGE, readback); + let mut sock3 = TcpStream::connect(addr).context("connect sock3")?; + println!("sock3 connected"); + + const MESSAGE2: &[u8] = b"hello, gussie!\n"; + sock2.write_all(MESSAGE2).context("write to sock1")?; + println!("sock2 wrote to echo server"); + + sock1.shutdown(Shutdown::Write)?; + sock2.shutdown(Shutdown::Write)?; + + let mut readback2 = Vec::new(); + sock2 + .read_to_end(&mut readback2) + .context("read from sock2")?; + println!("read from sock2"); + + let mut readback1 = Vec::new(); + sock1 + .read_to_end(&mut readback1) + .context("read from sock1")?; + println!("read from sock1"); + + assert_eq!(MESSAGE1, readback1, "readback of sock1"); + assert_eq!(MESSAGE2, readback2, "readback of sock2"); + + let mut sock4 = TcpStream::connect(addr).context("connect sock4")?; + println!("sock4 connected"); + const MESSAGE4: &[u8] = b"hello, sparky!\n"; + sock4.write_all(MESSAGE4).context("write to sock4")?; + // Hang up - demonstrate that a failure on this connection doesn't affect + // others. + drop(sock4); + println!("sock4 hung up"); + + const MESSAGE3: &[u8] = b"hello, willa!\n"; + sock3.write_all(MESSAGE3).context("write to sock3")?; + println!("sock3 wrote to echo server"); + sock3.shutdown(Shutdown::Write)?; + + let mut readback3 = Vec::new(); + sock3 + .read_to_end(&mut readback3) + .context("read from sock3")?; + println!("read from sock3"); + assert_eq!(MESSAGE3, readback3, "readback of sock3"); + + wasmtime_process.kill()?; - if wasmtime_thread.is_finished() { - wasmtime_thread.join().expect("wasmtime panicked")?; - } Ok(()) } + +fn get_listening_address( + mut wasmtime_stdout: std::process::ChildStdout, +) -> Result { + use std::io::Read; + use std::thread::sleep; + use std::time::Duration; + + // Gather complete contents of stdout here + let mut stdout_contents = String::new(); + loop { + // Wait for process to print + sleep(Duration::from_millis(100)); + + // Read more that the process printed, append to contents + let mut buf = vec![0; 4096]; + let len = wasmtime_stdout + .read(&mut buf) + .context("reading wasmtime stdout")?; + buf.truncate(len); + stdout_contents + .push_str(std::str::from_utf8(&buf).context("wasmtime stdout should be string")?); + + // Parse out the line where guest program says where it is listening + for line in stdout_contents.lines() { + if let Some(rest) = line.strip_prefix("Listening on ") { + // Forget wasmtime_stdout, rather than drop it, so that any + // subsequent stdout from wasmtime doesn't panic on a broken + // pipe. + std::mem::forget(wasmtime_stdout); + return rest + .parse() + .with_context(|| format!("parsing socket addr from line: {line:?}")); + } + } + } +} From ddde5448cb5d73551dac944add39973ddc647d49 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Thu, 4 Sep 2025 15:54:43 -0700 Subject: [PATCH 4/6] README: no longer 100% safe rust --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index b196626..686b131 100644 --- a/README.md +++ b/README.md @@ -85,8 +85,7 @@ $ cargo add wstd ``` ## Safety -This crate uses ``#![forbid(unsafe_code)]`` to ensure everything is implemented in -100% Safe Rust. +This crate uses ``#![forbid(unsafe_code)]`` everywhere possible. ## Contributing Want to join us? Check out our ["Contributing" guide][contributing] and take a From d0cc49d9aceb9d5965da26e6086bc278b3bae929 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Thu, 11 Sep 2025 10:38:39 -0700 Subject: [PATCH 5/6] switch to #![deny(unsafe_code)] at lib level --- README.md | 4 +++- src/lib.rs | 14 ++------------ src/runtime/reactor.rs | 1 + 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 686b131..0f72510 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,9 @@ $ cargo add wstd ``` ## Safety -This crate uses ``#![forbid(unsafe_code)]`` everywhere possible. +This crate uses ``#![deny(unsafe_code)]``, and in the very small number of +exceptional cases where ``#[allow(unsafe_code)]`` is required, documentation +is provided justifying its use. ## Contributing Want to join us? Check out our ["Contributing" guide][contributing] and take a diff --git a/src/lib.rs b/src/lib.rs index 9fdf435..5502555 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![allow(async_fn_in_trait)] #![warn(future_incompatible, unreachable_pub)] +#![deny(unsafe_code)] //#![deny(missing_debug_implementations)] //#![warn(missing_docs)] //#![forbid(rustdoc::missing_doc_code_examples)] @@ -54,26 +55,15 @@ //! These are unique capabilities provided by WASI 0.2, and because this library //! is specific to that are exposed from here. -// We need unsafe code in the runtime. -pub mod runtime; - -// All other mods do not require unsafe. -#[forbid(unsafe_code)] pub mod future; -#[forbid(unsafe_code)] #[macro_use] pub mod http; -#[forbid(unsafe_code)] pub mod io; -#[forbid(unsafe_code)] pub mod iter; -#[forbid(unsafe_code)] pub mod net; -#[forbid(unsafe_code)] pub mod rand; -#[forbid(unsafe_code)] +pub mod runtime; pub mod task; -#[forbid(unsafe_code)] pub mod time; pub use wstd_macro::attr_macro_http_server as http_server; diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index e6e75fc..a4a34e9 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -271,6 +271,7 @@ impl Reactor { // the schedule function is not Send or Sync, because Runnable is not // Send or Sync. This is safe because wasm32-wasip2 is always // single-threaded. + #[allow(unsafe_code)] let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) }; self.inner.ready_list.borrow_mut().push_back(runnable); task From 9ad46fdfa3dcabb46d2e317610efccfbe49205e0 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Thu, 11 Sep 2025 10:50:29 -0700 Subject: [PATCH 6/6] refactor block on loop --- src/runtime/block_on.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/runtime/block_on.rs b/src/runtime/block_on.rs index 77645a9..5c951ce 100644 --- a/src/runtime/block_on.rs +++ b/src/runtime/block_on.rs @@ -23,15 +23,14 @@ where loop { match reactor.pop_ready_list() { - None => { - if reactor.pending_pollables_is_empty() { - break; - } else { - reactor.block_on_pollables() - } - } + // No more work is possible - only a pending pollable could + // possibly create a runnable, and there are none. + None if reactor.pending_pollables_is_empty() => break, + // Block until a pending pollable puts something on the ready + // list. + None => reactor.block_on_pollables(), Some(runnable) => { - // Run the task popped from the head of the runlist. If the + // Run the task popped from the head of the ready list. If the // task re-inserts itself onto the runlist during execution, // last_run_awake is a hint that guarantees us the runlist is // nonempty. @@ -42,13 +41,6 @@ where // a chance to wake. if last_run_awake || !reactor.ready_list_is_empty() { reactor.nonblock_check_pollables(); - } else if !reactor.pending_pollables_is_empty() { - // If the runlist is empty, block until any of the pending - // pollables have woken a task, putting it back on the - // ready list - reactor.block_on_pollables() - } else { - break; } } }