Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ default = ["json"]
json = ["dep:serde", "dep:serde_json"]

[dependencies]
futures-core.workspace = true
async-task.workspace = true
http.workspace = true
itoa.workspace = true
pin-project-lite.workspace = true
Expand Down Expand Up @@ -63,6 +63,7 @@ authors = [

[workspace.dependencies]
anyhow = "1"
async-task = "4.7"
cargo_metadata = "0.22"
clap = { version = "4.5.26", features = ["derive"] }
futures-core = "0.3.19"
Expand All @@ -83,9 +84,6 @@ test-programs = { path = "test-programs" }
test-programs-artifacts = { path = "test-programs/artifacts" }
ureq = { version = "2.12.1", default-features = false }
wasi = "0.14.0"
wasmtime = "26"
wasmtime-wasi = "26"
wasmtime-wasi-http = "26"
wstd = { path = "." }
wstd-macro = { path = "macro", version = "=0.5.4" }

Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ $ cargo add wstd
```

## Safety
This crate uses ``#![forbid(unsafe_code)]`` to ensure everything is implemented in
100% Safe Rust.
This crate uses ``#![deny(unsafe_code)]``, and in the very small number of
exceptional cases where ``#[allow(unsafe_code)]`` is required, documentation
is provided justifying its use.

## Contributing
Want to join us? Check out our ["Contributing" guide][contributing] and take a
Expand Down
6 changes: 5 additions & 1 deletion examples/tcp_echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ async fn main() -> io::Result<()> {
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepted from: {}", stream.peer_addr()?);
io::copy(&stream, &stream).await?;
wstd::runtime::spawn(async move {
// If echo copy fails, we can ignore it.
let _ = io::copy(&stream, &stream).await;
})
.detach();
}
Ok(())
}
3 changes: 1 addition & 2 deletions src/future/delay.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use futures_core::ready;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

use pin_project_lite::pin_project;

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(async_fn_in_trait)]
#![warn(future_incompatible, unreachable_pub)]
#![forbid(unsafe_code)]
#![deny(unsafe_code)]
//#![deny(missing_debug_implementations)]
//#![warn(missing_docs)]
//#![forbid(rustdoc::missing_doc_code_examples)]
Expand Down
68 changes: 55 additions & 13 deletions src/net/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,8 @@ impl TcpListener {
wasi::sockets::tcp_create_socket::create_tcp_socket(family).map_err(to_io_err)?;
let network = wasi::sockets::instance_network::instance_network();

let local_address = match addr {
SocketAddr::V4(addr) => {
let ip = addr.ip().octets();
let address = (ip[0], ip[1], ip[2], ip[3]);
let port = addr.port();
IpSocketAddress::Ipv4(Ipv4SocketAddress { port, address })
}
SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpListener`"),
};
let local_address = sockaddr_to_wasi(addr);

socket
.start_bind(&network, local_address)
.map_err(to_io_err)?;
Expand All @@ -56,10 +49,11 @@ impl TcpListener {
}

/// Returns the local socket address of this listener.
// TODO: make this return an actual socket addr
pub fn local_addr(&self) -> io::Result<String> {
let addr = self.socket.local_address().map_err(to_io_err)?;
Ok(format!("{addr:?}"))
pub fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
self.socket
.local_address()
.map_err(to_io_err)
.map(sockaddr_from_wasi)
}

/// Returns an iterator over the connections being received on this listener.
Expand Down Expand Up @@ -105,3 +99,51 @@ pub(super) fn to_io_err(err: ErrorCode) -> io::Error {
_ => ErrorKind::Other.into(),
}
}

fn sockaddr_from_wasi(addr: IpSocketAddress) -> std::net::SocketAddr {
use wasi::sockets::network::Ipv6SocketAddress;
match addr {
IpSocketAddress::Ipv4(Ipv4SocketAddress { address, port }) => {
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(
std::net::Ipv4Addr::new(address.0, address.1, address.2, address.3),
port,
))
}
IpSocketAddress::Ipv6(Ipv6SocketAddress {
address,
port,
flow_info,
scope_id,
}) => std::net::SocketAddr::V6(std::net::SocketAddrV6::new(
std::net::Ipv6Addr::new(
address.0, address.1, address.2, address.3, address.4, address.5, address.6,
address.7,
),
port,
flow_info,
scope_id,
)),
}
}

fn sockaddr_to_wasi(addr: std::net::SocketAddr) -> IpSocketAddress {
use wasi::sockets::network::Ipv6SocketAddress;
match addr {
std::net::SocketAddr::V4(addr) => {
let ip = addr.ip().octets();
IpSocketAddress::Ipv4(Ipv4SocketAddress {
address: (ip[0], ip[1], ip[2], ip[3]),
port: addr.port(),
})
}
std::net::SocketAddr::V6(addr) => {
let ip = addr.ip().segments();
IpSocketAddress::Ipv6(Ipv6SocketAddress {
address: (ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]),
port: addr.port(),
flow_info: addr.flowinfo(),
scope_id: addr.scope_id(),
})
}
}
}
94 changes: 35 additions & 59 deletions src/runtime/block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use super::{Reactor, REACTOR};

use std::future::Future;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll, Wake, Waker};
use std::task::{Context, Poll, Waker};

/// Start the event loop
pub fn block_on<Fut>(fut: Fut) -> Fut::Output
/// Start the event loop. Blocks until the future
pub fn block_on<F>(fut: F) -> F::Output
where
Fut: Future,
F: Future + 'static,
F::Output: 'static,
{
// Construct the reactor
let reactor = Reactor::new();
Expand All @@ -19,67 +18,44 @@ where
panic!("cannot wstd::runtime::block_on inside an existing block_on!")
}

// Pin the future so it can be polled
let mut fut = pin!(fut);
// Spawn the task onto the reactor.
let root_task = reactor.spawn(fut);

// Create a new context to be passed to the future.
let root = Arc::new(RootWaker::new());
let waker = Waker::from(root.clone());
let mut cx = Context::from_waker(&waker);
loop {
match reactor.pop_ready_list() {
// No more work is possible - only a pending pollable could
// possibly create a runnable, and there are none.
None if reactor.pending_pollables_is_empty() => break,
// Block until a pending pollable puts something on the ready
// list.
None => reactor.block_on_pollables(),
Some(runnable) => {
// Run the task popped from the head of the ready list. If the
// task re-inserts itself onto the runlist during execution,
// last_run_awake is a hint that guarantees us the runlist is
// nonempty.
let last_run_awake = runnable.run();

// Either the future completes and we return, or some IO is happening
// and we wait.
let res = loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(res) => break res,
Poll::Pending => {
// If some non-pollable based future has marked the root task
// as awake, reset and poll again. otherwise, block until a
// pollable wakes a future.
if root.is_awake() {
// If any task is ready for running, we perform a nonblocking
// check of pollables, giving any tasks waiting on a pollable
// a chance to wake.
if last_run_awake || !reactor.ready_list_is_empty() {
reactor.nonblock_check_pollables();
root.reset()
} else {
// If there are no futures awake or waiting on a WASI
// pollable, its impossible for the reactor to make
// progress, and the only valid behaviors are to sleep
// forever or panic. This should only be reachable if the
// user's Futures are implemented incorrectly.
if !reactor.nonempty_pending_pollables() {
panic!("reactor has no futures which are awake, or are waiting on a WASI pollable to be ready")
}
reactor.block_on_pollables()
}
}
}
};
}
// Clear the singleton
REACTOR.replace(None);
res
}

/// This waker is used in the Context of block_on. If a Future executing in
/// the block_on calls context.wake(), it sets this boolean state so that
/// block_on's Future is polled again immediately, rather than waiting for
/// an external (WASI pollable) event before polling again.
struct RootWaker {
wake: AtomicBool,
}
impl RootWaker {
fn new() -> Self {
Self {
wake: AtomicBool::new(false),
// Get the result out of the root task
let mut root_task = pin!(root_task);
let mut noop_context = Context::from_waker(Waker::noop());
match root_task.as_mut().poll(&mut noop_context) {
Poll::Ready(res) => res,
Poll::Pending => {
unreachable!(
"ready list empty, therefore root task should be ready. malformed root task?"
)
}
}
fn is_awake(&self) -> bool {
self.wake.load(Ordering::Relaxed)
}
fn reset(&self) {
self.wake.store(false, Ordering::Relaxed);
}
}
impl Wake for RootWaker {
fn wake(self: Arc<Self>) {
self.wake.store(true, Ordering::Relaxed);
}
}
12 changes: 12 additions & 0 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
mod block_on;
mod reactor;

pub use ::async_task::Task;
pub use block_on::block_on;
pub use reactor::{AsyncPollable, Reactor, WaitFor};
use std::cell::RefCell;
Expand All @@ -22,3 +23,14 @@ use std::cell::RefCell;
std::thread_local! {
pub(crate) static REACTOR: RefCell<Option<Reactor>> = const { RefCell::new(None) };
}

/// Spawn a `Future` as a `Task` on the current `Reactor`.
///
/// Panics if called from outside `block_on`.
pub fn spawn<F, T>(fut: F) -> Task<T>
where
F: std::future::Future<Output = T> + 'static,
T: 'static,
{
Reactor::current().spawn(fut)
}
Loading
Loading