diff --git a/Cargo.lock b/Cargo.lock index 924b8748..3a49089e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,9 +121,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "cap-fs-ext" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cd43a11b76b72fd4de1d0358cc3c0a11fed09c2d67caef7c6c0ca3338245d96" +checksum = "15ed685fe2949d035b080fbe7536b944efffb648af1d34630aa887ca2b132d2b" dependencies = [ "cap-primitives", "cap-std", @@ -133,9 +133,9 @@ dependencies = [ [[package]] name = "cap-primitives" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58ae40664c77c3dd4c0df2e5bc97743ede4b814f9970d81228e69d101702e03" +checksum = "0315442c0232cb9a1c2be55ee289a0e9bf5fd0b0f162be8e7f16673e095f5e09" dependencies = [ "ambient-authority", "fs-set-times", @@ -150,9 +150,9 @@ dependencies = [ [[package]] name = "cap-rand" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dcd5285cc063c837f10d80010a29eda2f22fe4ce507229a03a7886f074ee6fd" +checksum = "b78d30c0b3c656f6193bef0697cff6bd903d9b2b1437c7af3d35a6a9d1a7af2e" dependencies = [ "ambient-authority", "rand", @@ -160,9 +160,9 @@ dependencies = [ [[package]] name = "cap-std" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b515b8f641ddea066fc2ce25f2c27b60c1f7c2f50f7b8c8c4acfe70a1a51646" +checksum = "3e9256648eae510b29aae4d52ed71877239a61f9a2494d23ddad7fb6f50e5de8" dependencies = [ "cap-primitives", "io-extras", @@ -578,7 +578,9 @@ dependencies = [ "async-trait", "cap-rand", "cap-std", + "is-terminal", "tempfile", + "terminal_size", "test-log", "test-programs-macros", "thiserror", @@ -1154,6 +1156,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "terminal_size" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb20089a8ba2b69debd491f8d2d023761cbf196e999218c591fa1e7e15a21907" +dependencies = [ + "rustix", + "windows-sys", +] + [[package]] name = "test-log" version = "0.2.11" @@ -1422,6 +1434,7 @@ dependencies = [ "anyhow", "async-trait", "bitflags", + "cap-fs-ext", "cap-rand", "cap-std", "io-extras", diff --git a/Cargo.toml b/Cargo.toml index 0b38414e..240a6ef5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,9 @@ edition = "2021" anyhow = "1.0.22" thiserror = "1.0.15" tracing = "0.1.26" -cap-std = "1.0.0" -cap-rand = "1.0.0" +cap-std = "1.0.2" +cap-rand = "1.0.2" +cap-fs-ext = "1.0.2" bitflags = "1.2" windows-sys = "0.42.0" rustix = "0.36.0" diff --git a/host/Cargo.toml b/host/Cargo.toml index b39c9113..78da4884 100644 --- a/host/Cargo.toml +++ b/host/Cargo.toml @@ -14,6 +14,8 @@ tracing = { workspace = true } wasmtime = { git = "https://github.com/bytecodealliance/wasmtime", features = ["component-model"] } wasi-common = { path = "../wasi-common" } wasi-cap-std-sync = { path = "../wasi-common/cap-std-sync" } +is-terminal = "0.4.1" +terminal_size = "0.2.3" [dev-dependencies] test-programs-macros = { path = "../test-programs/macros" } diff --git a/host/src/clocks.rs b/host/src/clocks.rs index ecad346a..0012d9fe 100644 --- a/host/src/clocks.rs +++ b/host/src/clocks.rs @@ -1,7 +1,6 @@ use crate::{wasi_clocks, wasi_default_clocks, WasiCtx}; -use anyhow::Context; use cap_std::time::SystemTime; -use wasi_common::clocks::{MonotonicClock, MonotonicTimer, WallClock}; +use wasi_common::clocks::{TableMonotonicClockExt, TableWallClockExt}; impl TryFrom for wasi_clocks::Datetime { type Error = anyhow::Error; @@ -19,102 +18,56 @@ impl TryFrom for wasi_clocks::Datetime { #[async_trait::async_trait] impl wasi_default_clocks::WasiDefaultClocks for WasiCtx { - async fn default_monotonic_clock(&mut self) -> anyhow::Result { - Ok(self.clocks.default_monotonic) + async fn default_wall_clock(&mut self) -> anyhow::Result { + // Create a new handle to the default wall clock. + let new = self.clocks.default_wall_clock.dup(); + Ok(self.table_mut().push(Box::new(new))?) } - async fn default_wall_clock(&mut self) -> anyhow::Result { - Ok(self.clocks.default_wall) + async fn default_monotonic_clock(&mut self) -> anyhow::Result { + // Create a new handle to the default monotonic clock. + let new = self.clocks.default_monotonic_clock.dup(); + Ok(self.table_mut().push(Box::new(new))?) } } #[async_trait::async_trait] impl wasi_clocks::WasiClocks for WasiCtx { - async fn subscribe_wall_clock( - &mut self, - when: wasi_clocks::Datetime, - absolute: bool, - ) -> anyhow::Result { - drop((when, absolute)); - todo!() - } - - async fn subscribe_monotonic_clock( - &mut self, - when: wasi_clocks::Instant, - absolute: bool, - ) -> anyhow::Result { - drop((when, absolute)); - todo!() - } - async fn monotonic_clock_now( &mut self, fd: wasi_clocks::MonotonicClock, ) -> anyhow::Result { - let clock = self.table.get::(fd)?; - let now = clock.now(self.clocks.monotonic.as_ref()); - Ok(now - .as_nanos() - .try_into() - .context("converting monotonic time to nanos u64")?) + Ok(self.table().get_monotonic_clock(fd)?.now()) } async fn monotonic_clock_resolution( &mut self, fd: wasi_clocks::MonotonicClock, ) -> anyhow::Result { - self.table.get::(fd)?; - let res = self.clocks.monotonic.resolution(); - Ok(res - .as_nanos() - .try_into() - .context("converting monotonic resolution to nanos u64")?) - } - - async fn monotonic_clock_new_timer( - &mut self, - fd: wasi_clocks::MonotonicClock, - initial: wasi_clocks::Instant, - ) -> anyhow::Result { - let clock = self.table.get::(fd)?; - let timer = clock.new_timer(std::time::Duration::from_micros(initial)); - drop(clock); - let timer_fd = self.table.push(Box::new(timer))?; - Ok(timer_fd) + Ok(self.table().get_monotonic_clock(fd)?.now()) } async fn wall_clock_now( &mut self, fd: wasi_clocks::WallClock, ) -> anyhow::Result { - let clock = self.table.get::(fd)?; - Ok(clock.now(self.clocks.system.as_ref()).try_into()?) + let clock = self.table().get_wall_clock(fd)?; + let now = clock.now(); + Ok(wasi_clocks::Datetime { + seconds: now.as_secs(), + nanoseconds: now.subsec_nanos(), + }) } async fn wall_clock_resolution( &mut self, fd: wasi_clocks::WallClock, ) -> anyhow::Result { - self.table.get::(fd)?; - let nanos = self.clocks.system.resolution().as_nanos(); + let clock = self.table().get_wall_clock(fd)?; + let res = clock.resolution(); Ok(wasi_clocks::Datetime { - seconds: (nanos / 1_000_000_000_u128) - .try_into() - .context("converting wall clock resolution to seconds u64")?, - nanoseconds: (nanos % 1_000_000_000_u128).try_into().unwrap(), + seconds: res.as_secs(), + nanoseconds: res.subsec_nanos(), }) } - - async fn monotonic_timer_current( - &mut self, - fd: wasi_clocks::MonotonicTimer, - ) -> anyhow::Result { - let timer = self.table.get::(fd)?; - Ok(timer - .current(self.clocks.monotonic.as_ref()) - .as_nanos() - .try_into() - .context("converting monotonic timer to nanos u64")?) - } } diff --git a/host/src/filesystem.rs b/host/src/filesystem.rs index 5e82a953..bb8fce05 100644 --- a/host/src/filesystem.rs +++ b/host/src/filesystem.rs @@ -1,12 +1,17 @@ #![allow(unused_variables)] +use crate::wasi_poll::WasiStream; use crate::{wasi_filesystem, HostResult, WasiCtx}; use std::{ io::{IoSlice, IoSliceMut}, ops::BitAnd, time::SystemTime, }; -use wasi_common::{dir::TableDirExt, file::TableFileExt, WasiDir, WasiFile}; +use wasi_common::{ + dir::TableDirExt, + file::{FileStream, TableFileExt}, + WasiDir, WasiFile, +}; fn contains + Eq + Copy>(flags: T, flag: T) -> bool { (flags & flag) == flag @@ -118,9 +123,6 @@ impl Into for wasi_filesystem::OFlags { impl Into for wasi_filesystem::DescriptorFlags { fn into(self) -> wasi_common::file::FdFlags { let mut flags = wasi_common::file::FdFlags::empty(); - if contains(self, wasi_filesystem::DescriptorFlags::APPEND) { - flags |= wasi_common::file::FdFlags::APPEND; - } if contains(self, wasi_filesystem::DescriptorFlags::DSYNC) { flags |= wasi_common::file::FdFlags::DSYNC; } @@ -242,19 +244,19 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx { fd: wasi_filesystem::Descriptor, len: wasi_filesystem::Size, offset: wasi_filesystem::Filesize, - ) -> HostResult, wasi_filesystem::Errno> { - let f = self.table().get_file_mut(u32::from(fd)).map_err(convert)?; + ) -> HostResult<(Vec, bool), wasi_filesystem::Errno> { + let f = self.table_mut().get_file_mut(fd).map_err(convert)?; let mut buffer = vec![0; len.try_into().unwrap()]; - let bytes_read = f + let (bytes_read, end) = f .read_vectored_at(&mut [IoSliceMut::new(&mut buffer)], offset) .await .map_err(convert)?; buffer.truncate(bytes_read.try_into().unwrap()); - Ok(buffer) + Ok((buffer, end)) } async fn pwrite( @@ -263,7 +265,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx { buf: Vec, offset: wasi_filesystem::Filesize, ) -> HostResult { - let f = self.table().get_file_mut(u32::from(fd)).map_err(convert)?; + let f = self.table_mut().get_file_mut(fd).map_err(convert)?; let bytes_written = f .write_vectored_at(&[IoSlice::new(&buf)], offset) @@ -294,28 +296,6 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx { todo!() } - async fn seek( - &mut self, - fd: wasi_filesystem::Descriptor, - from: wasi_filesystem::SeekFrom, - ) -> HostResult { - use std::io::SeekFrom; - - let from = match from { - wasi_filesystem::SeekFrom::Cur(offset) => SeekFrom::Current(offset), - wasi_filesystem::SeekFrom::End(offset) => SeekFrom::End(offset.try_into().unwrap()), - wasi_filesystem::SeekFrom::Set(offset) => SeekFrom::Start(offset), - }; - - Ok(self - .table() - .get_file_mut(fd) - .map_err(convert)? - .seek(from) - .await - .map_err(convert)?) - } - async fn sync( &mut self, fd: wasi_filesystem::Descriptor, @@ -323,13 +303,6 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx { todo!() } - async fn tell( - &mut self, - fd: wasi_filesystem::Descriptor, - ) -> HostResult { - todo!() - } - async fn create_directory_at( &mut self, fd: wasi_filesystem::Descriptor, @@ -345,7 +318,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx { let table = self.table(); if table.is::>(fd) { Ok(table - .get_file_mut(fd) + .get_file(fd) .map_err(convert)? .get_filestat() .await @@ -405,7 +378,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx { // TODO: How should this be used? _mode: wasi_filesystem::Mode, ) -> HostResult { - let table = self.table(); + let table = self.table_mut(); if !table.is::>(fd) { return Err(wasi_filesystem::Errno::Notdir.into()); } @@ -444,7 +417,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx { } async fn close(&mut self, fd: wasi_filesystem::Descriptor) -> anyhow::Result<()> { - let table = self.table(); + let table = self.table_mut(); if table.is::>(fd) { let _ = table.delete(fd); } else if table.is::>(fd) { @@ -554,4 +527,69 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx { ) -> HostResult<(), wasi_filesystem::Errno> { todo!() } + + async fn read_via_stream( + &mut self, + fd: wasi_filesystem::Descriptor, + offset: u64, + ) -> HostResult { + let f = self.table_mut().get_file_mut(fd).map_err(convert)?; + + // Duplicate the file descriptor so that we get an indepenent lifetime. + let clone = f.try_clone().await.map_err(convert)?; + + // Create a stream view for it. + let reader = FileStream::new_reader(clone, offset); + + // Box it up. + let boxed: Box = Box::new(reader); + + // Insert the stream view into the table. + let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?; + + Ok(index) + } + + async fn write_via_stream( + &mut self, + fd: wasi_filesystem::Descriptor, + offset: u64, + ) -> HostResult { + let f = self.table_mut().get_file_mut(fd).map_err(convert)?; + + // Duplicate the file descriptor so that we get an indepenent lifetime. + let clone = f.try_clone().await.map_err(convert)?; + + // Create a stream view for it. + let writer = FileStream::new_writer(clone, offset); + + // Box it up. + let boxed: Box = Box::new(writer); + + // Insert the stream view into the table. + let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?; + + Ok(index) + } + + async fn append_via_stream( + &mut self, + fd: wasi_filesystem::Descriptor, + ) -> HostResult { + let f = self.table_mut().get_file_mut(fd).map_err(convert)?; + + // Duplicate the file descriptor so that we get an indepenent lifetime. + let clone = f.try_clone().await.map_err(convert)?; + + // Create a stream view for it. + let appender = FileStream::new_appender(clone); + + // Box it up. + let boxed: Box = Box::new(appender); + + // Insert the stream view into the table. + let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?; + + Ok(index) + } } diff --git a/host/src/lib.rs b/host/src/lib.rs index 12a8a012..84216abb 100644 --- a/host/src/lib.rs +++ b/host/src/lib.rs @@ -4,6 +4,7 @@ mod filesystem; mod logging; mod poll; mod random; +mod stderr; mod tcp; pub use wasi_common::{table::Table, WasiCtx}; @@ -23,6 +24,7 @@ pub fn add_to_linker( wasi_default_clocks::add_to_linker(l, f)?; wasi_filesystem::add_to_linker(l, f)?; wasi_logging::add_to_linker(l, f)?; + wasi_stderr::add_to_linker(l, f)?; wasi_poll::add_to_linker(l, f)?; wasi_random::add_to_linker(l, f)?; wasi_tcp::add_to_linker(l, f)?; diff --git a/host/src/poll.rs b/host/src/poll.rs index 6a032e45..cb1e48a5 100644 --- a/host/src/poll.rs +++ b/host/src/poll.rs @@ -1,18 +1,186 @@ -#![allow(unused_variables)] - use crate::{ - wasi_poll::{WasiFuture, WasiPoll}, - WasiCtx, + wasi_clocks, + wasi_poll::{self, Size, StreamError, WasiFuture, WasiPoll, WasiStream}, + HostResult, WasiCtx, }; -use anyhow::Result; +use wasi_common::clocks::TableMonotonicClockExt; +use wasi_common::stream::TableStreamExt; + +fn convert(error: wasi_common::Error) -> wasmtime::component::Error { + if let Some(_errno) = error.downcast_ref() { + wasmtime::component::Error::new(wasi_poll::StreamError {}) + } else { + error.into().into() + } +} + +/// A pseudo-future representation. +#[derive(Copy, Clone)] +enum Future { + /// Poll for read events. + Read(WasiStream), + /// Poll for write events. + Write(WasiStream), + /// Poll for a monotonic-clock timer. + MonotonicClock(wasi_clocks::MonotonicClock, wasi_clocks::Instant, bool), +} #[async_trait::async_trait] impl WasiPoll for WasiCtx { - async fn drop_future(&mut self, future: WasiFuture) -> Result<()> { - todo!() + async fn drop_future(&mut self, future: WasiFuture) -> anyhow::Result<()> { + self.table_mut().delete(future); + Ok(()) } - async fn poll_oneoff(&mut self, futures: Vec) -> Result> { + async fn poll_oneoff(&mut self, futures: Vec) -> anyhow::Result> { + use wasi_common::sched::{Poll, Userdata}; + + // Convert `futures` into `Poll` subscriptions. + let mut poll = Poll::new(); + let len = futures.len(); + for (index, future) in futures.into_iter().enumerate() { + match *self.table().get(future).map_err(convert)? { + Future::Read(stream) => { + let wasi_stream: &dyn wasi_common::WasiStream = + self.table().get_stream(stream).map_err(convert)?; + poll.subscribe_read(wasi_stream, Userdata::from(index as u64)); + } + Future::Write(stream) => { + let wasi_stream: &dyn wasi_common::WasiStream = + self.table().get_stream(stream).map_err(convert)?; + poll.subscribe_write(wasi_stream, Userdata::from(index as u64)); + } + Future::MonotonicClock(clock, when, absolute) => { + let wasi_clock = self.table().get_monotonic_clock(clock).map_err(convert)?; + poll.subscribe_monotonic_clock( + wasi_clock, + when, + absolute, + Userdata::from(index as u64), + )?; + } + } + } + + // Do the poll. + self.sched.poll_oneoff(&mut poll).await?; + + // Convert the results into a list of `u8` to return. + let mut results = vec![0_u8; len]; + for (_result, data) in poll.results() { + results[u64::from(data) as usize] = u8::from(true); + } + Ok(results) + } + + async fn drop_stream(&mut self, stream: WasiStream) -> anyhow::Result<()> { + self.table_mut().delete(stream); + Ok(()) + } + + async fn read_stream( + &mut self, + stream: WasiStream, + len: Size, + ) -> HostResult<(Vec, bool), StreamError> { + let s: &mut Box = + self.table_mut().get_stream_mut(stream).map_err(convert)?; + + let mut buffer = vec![0; len.try_into().unwrap()]; + + let (bytes_read, end) = s.read(&mut buffer).await.map_err(convert)?; + + buffer.truncate(bytes_read as usize); + + Ok((buffer, end)) + } + + async fn write_stream( + &mut self, + stream: WasiStream, + bytes: Vec, + ) -> HostResult { + let s: &mut Box = + self.table_mut().get_stream_mut(stream).map_err(convert)?; + + let bytes_written: u64 = s.write(&bytes).await.map_err(convert)?; + + Ok(Size::try_from(bytes_written).unwrap()) + } + + async fn skip_stream( + &mut self, + stream: WasiStream, + len: u64, + ) -> HostResult<(u64, bool), StreamError> { + let s: &mut Box = + self.table_mut().get_stream_mut(stream).map_err(convert)?; + + let (bytes_skipped, end) = s.skip(len).await.map_err(convert)?; + + Ok((bytes_skipped, end)) + } + + async fn write_repeated_stream( + &mut self, + stream: WasiStream, + byte: u8, + len: u64, + ) -> HostResult { + let s: &mut Box = + self.table_mut().get_stream_mut(stream).map_err(convert)?; + + let bytes_written: u64 = s.write_repeated(byte, len).await.map_err(convert)?; + + Ok(bytes_written) + } + + async fn splice_stream( + &mut self, + _src: WasiStream, + _dst: WasiStream, + _len: u64, + ) -> HostResult<(u64, bool), StreamError> { + // TODO: We can't get two streams at the same time because they both + // carry the exclusive lifetime of `self`. When [`get_many_mut`] is + // stabilized, that could allow us to add a `get_many_stream_mut` or + // so which lets us do this. + // + // [`get_many_mut`]: https://doc.rust-lang.org/stable/std/collections/hash_map/struct.HashMap.html#method.get_many_mut + /* + let s: &mut Box = self + .table_mut() + .get_stream_mut(src) + .map_err(convert)?; + let d: &mut Box = self + .table_mut() + .get_stream_mut(dst) + .map_err(convert)?; + + let bytes_spliced: u64 = s.splice(&mut **d, len).await.map_err(convert)?; + + Ok(bytes_spliced) + */ + todo!() } + + async fn subscribe_read(&mut self, stream: WasiStream) -> anyhow::Result { + Ok(self.table_mut().push(Box::new(Future::Read(stream)))?) + } + + async fn subscribe_write(&mut self, stream: WasiStream) -> anyhow::Result { + Ok(self.table_mut().push(Box::new(Future::Write(stream)))?) + } + + async fn subscribe_monotonic_clock( + &mut self, + clock: wasi_clocks::MonotonicClock, + when: wasi_clocks::Instant, + absolute: bool, + ) -> anyhow::Result { + Ok(self + .table_mut() + .push(Box::new(Future::MonotonicClock(clock, when, absolute)))?) + } } diff --git a/host/src/random.rs b/host/src/random.rs index 36c76aa8..55417db2 100644 --- a/host/src/random.rs +++ b/host/src/random.rs @@ -12,6 +12,6 @@ impl wasi_random::WasiRandom for WasiCtx { } async fn get_random_u64(&mut self) -> anyhow::Result { - Ok((&mut self.random).sample(Standard)) + Ok(self.random.sample(Standard)) } } diff --git a/host/src/stderr.rs b/host/src/stderr.rs new file mode 100644 index 00000000..255ea32e --- /dev/null +++ b/host/src/stderr.rs @@ -0,0 +1,36 @@ +use crate::{wasi_stderr, WasiCtx}; +use is_terminal::IsTerminal; +#[cfg(unix)] +use std::os::unix::io::AsRawFd; +#[cfg(windows)] +use std::os::windows::io::AsRawHandle; + +#[async_trait::async_trait] +impl wasi_stderr::WasiStderr for WasiCtx { + async fn print(&mut self, message: String) -> anyhow::Result<()> { + eprint!("{}", message); + Ok(()) + } + + async fn is_terminal(&mut self) -> anyhow::Result { + Ok(std::io::stderr().is_terminal()) + } + + async fn num_columns(&mut self) -> anyhow::Result> { + #[cfg(unix)] + { + Ok( + terminal_size::terminal_size_using_fd(std::io::stderr().as_raw_fd()) + .map(|(width, _height)| width.0), + ) + } + + #[cfg(windows)] + { + Ok( + terminal_size::terminal_size_using_handle(std::io::stderr().as_raw_handle()) + .map(|(width, _height)| width.0), + ) + } + } +} diff --git a/host/src/tcp.rs b/host/src/tcp.rs index 244df046..a009e077 100644 --- a/host/src/tcp.rs +++ b/host/src/tcp.rs @@ -1,8 +1,7 @@ use crate::{ - wasi_tcp::{self, BytesResult, Socket, WasiFuture, WasiTcp}, + wasi_tcp::{self, BytesResult, Socket, WasiTcp}, HostResult, WasiCtx, }; -use anyhow::Result; #[async_trait::async_trait] impl WasiTcp for WasiCtx { @@ -15,14 +14,4 @@ impl WasiTcp for WasiCtx { drop(socket); todo!() } - - async fn subscribe_read(&mut self, socket: Socket) -> Result { - drop(socket); - todo!() - } - - async fn subscribe_write(&mut self, socket: Socket) -> Result { - drop(socket); - todo!() - } } diff --git a/host/tests/runtime.rs b/host/tests/runtime.rs index dc9a0da1..8137df64 100644 --- a/host/tests/runtime.rs +++ b/host/tests/runtime.rs @@ -1,9 +1,6 @@ use anyhow::Result; use cap_rand::RngCore; -use cap_std::{ - fs::Dir, - time::{Duration, Instant, SystemTime}, -}; +use cap_std::{fs::Dir, time::Duration}; use host::{add_to_linker, Wasi, WasiCtx}; use std::{ io::{Cursor, Write}, @@ -11,7 +8,7 @@ use std::{ }; use wasi_cap_std_sync::WasiCtxBuilder; use wasi_common::{ - clocks::{WasiMonotonicClock, WasiSystemClock}, + clocks::{WasiMonotonicClock, WasiWallClock}, pipe::ReadPipe, }; use wasmtime::{ @@ -43,8 +40,8 @@ async fn instantiate(path: &str) -> Result<(Store, Wasi)> { async fn run_hello_stdout(mut store: Store, wasi: Wasi) -> Result<()> { wasi.command( &mut store, - 0 as host::Descriptor, - 1 as host::Descriptor, + 0 as host::WasiStream, + 1 as host::WasiStream, &["gussie", "sparky", "willa"], &[], &[], @@ -57,8 +54,8 @@ async fn run_panic(mut store: Store, wasi: Wasi) -> Result<()> { let r = wasi .command( &mut store, - 0 as host::Descriptor, - 1 as host::Descriptor, + 0 as host::WasiStream, + 1 as host::WasiStream, &[ "diesel", "the", @@ -81,8 +78,8 @@ async fn run_panic(mut store: Store, wasi: Wasi) -> Result<()> { async fn run_args(mut store: Store, wasi: Wasi) -> Result<()> { wasi.command( &mut store, - 0 as host::Descriptor, - 1 as host::Descriptor, + 0 as host::WasiStream, + 1 as host::WasiStream, &["hello", "this", "", "is an argument", "with 🚩 emoji"], &[], &[], @@ -116,8 +113,8 @@ async fn run_random(mut store: Store, wasi: Wasi) -> Result<()> { wasi.command( &mut store, - 0 as host::Descriptor, - 1 as host::Descriptor, + 0 as host::WasiStream, + 1 as host::WasiStream, &[], &[], &[], @@ -127,45 +124,54 @@ async fn run_random(mut store: Store, wasi: Wasi) -> Result<()> { } async fn run_time(mut store: Store, wasi: Wasi) -> Result<()> { - struct FakeSystemClock; + struct FakeWallClock; - impl WasiSystemClock for FakeSystemClock { + impl WasiWallClock for FakeWallClock { fn resolution(&self) -> Duration { Duration::from_secs(1) } - fn now(&self, _precision: Duration) -> SystemTime { - SystemTime::from_std(std::time::SystemTime::UNIX_EPOCH) - + Duration::from_secs(1431648000) + fn now(&self) -> Duration { + Duration::from_secs(1431648000) + } + + fn dup(&self) -> Box { + Box::new(Self) } } struct FakeMonotonicClock { - now: Mutex, + now: Mutex, } impl WasiMonotonicClock for FakeMonotonicClock { - fn resolution(&self) -> Duration { - Duration::from_secs(1) + fn resolution(&self) -> u64 { + 1_000_000_000 } - fn now(&self, _precision: Duration) -> Instant { + fn now(&self) -> u64 { let mut now = self.now.lock().unwrap(); let then = *now; - *now += Duration::from_secs(42); + *now += 42 * 1_000_000_000; then } + + fn dup(&self) -> Box { + let now = *self.now.lock().unwrap(); + Box::new(Self { + now: Mutex::new(now), + }) + } } - store.data_mut().clocks.system = Box::new(FakeSystemClock); - store.data_mut().clocks.monotonic = Box::new(FakeMonotonicClock { - now: Mutex::new(Instant::from_std(std::time::Instant::now())), - }); + store.data_mut().clocks.default_wall_clock = Box::new(FakeWallClock); + store.data_mut().clocks.default_monotonic_clock = + Box::new(FakeMonotonicClock { now: Mutex::new(0) }); wasi.command( &mut store, - 0 as host::Descriptor, - 1 as host::Descriptor, + 0 as host::WasiStream, + 1 as host::WasiStream, &[], &[], &[], @@ -183,8 +189,8 @@ async fn run_stdin(mut store: Store, wasi: Wasi) -> Result<()> { wasi.command( &mut store, - 0 as host::Descriptor, - 1 as host::Descriptor, + 0 as host::WasiStream, + 1 as host::WasiStream, &[], &[], &[], diff --git a/src/lib.rs b/src/lib.rs index 97c83c48..f8d99b45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ use core::mem::{self, forget, replace, size_of, ManuallyDrop, MaybeUninit}; use core::ptr::{self, copy_nonoverlapping, null_mut}; use core::slice; use wasi::*; -use wasi_poll::WasiFuture; +use wasi_poll::{WasiFuture, WasiStream}; mod bindings { wit_bindgen_guest_rust::generate!({ @@ -27,8 +27,8 @@ mod bindings { #[no_mangle] pub unsafe extern "C" fn command( - stdin: u32, - stdout: u32, + stdin: WasiStream, + stdout: WasiStream, args_ptr: *const WasmStr, args_len: usize, env_vars: StrTupleList, @@ -40,6 +40,7 @@ pub unsafe extern "C" fn command( if !cfg!(feature = "command") { unreachable(); } + State::with_mut(|state| { // Initialization of `State` automatically fills in some dummy // structures for fds 0, 1, and 2. Overwrite the stdin/stdout slots of 0 @@ -49,13 +50,15 @@ pub unsafe extern "C" fn command( if descriptors.len() < 3 { unreachable(); } - descriptors[0] = Descriptor::File(File { - fd: stdin, - position: Cell::new(0), + descriptors[0] = Descriptor::Streams(Streams { + input: Cell::new(Some(stdin)), + output: Cell::new(None), + type_: StreamType::Unknown, }); - descriptors[1] = Descriptor::File(File { - fd: stdout, - position: Cell::new(0), + descriptors[1] = Descriptor::Streams(Streams { + input: Cell::new(None), + output: Cell::new(Some(stdout)), + type_: StreamType::Unknown, }); } state.args = Some(slice::from_raw_parts(args_ptr, args_len)); @@ -65,9 +68,13 @@ pub unsafe extern "C" fn command( state.preopens = Some(preopens); for preopen in preopens { - unwrap_result(state.push_desc(Descriptor::File(File { - fd: preopen.descriptor, - position: Cell::new(0), + unwrap_result(state.push_desc(Descriptor::Streams(Streams { + input: Cell::new(None), + output: Cell::new(None), + type_: StreamType::File(File { + fd: preopen.descriptor, + position: Cell::new(0), + }), }))); } @@ -286,20 +293,23 @@ pub unsafe extern "C" fn environ_sizes_get( /// Note: This is similar to `clock_getres` in POSIX. #[no_mangle] pub extern "C" fn clock_res_get(id: Clockid, resolution: &mut Timestamp) -> Errno { - match id { - CLOCKID_MONOTONIC => { - let res = wasi_clocks::monotonic_clock_resolution( - wasi_default_clocks::default_monotonic_clock(), - ); - *resolution = res; - } - CLOCKID_REALTIME => { - let res = wasi_clocks::wall_clock_resolution(wasi_default_clocks::default_wall_clock()); - *resolution = u64::from(res.nanoseconds) + res.seconds * 1_000_000_000; + State::with(|state| { + match id { + CLOCKID_MONOTONIC => { + let res = wasi_clocks::monotonic_clock_resolution(state.default_monotonic_clock()); + *resolution = res; + } + CLOCKID_REALTIME => { + let res = wasi_clocks::wall_clock_resolution(state.default_wall_clock()); + *resolution = u64::from(res.nanoseconds) + .checked_add(res.seconds) + .and_then(|secs| secs.checked_mul(1_000_000_000)) + .ok_or(ERRNO_OVERFLOW)?; + } + _ => unreachable(), } - _ => unreachable(), - } - ERRNO_SUCCESS + Ok(()) + }) } /// Return the time value of a clock. @@ -310,18 +320,22 @@ pub unsafe extern "C" fn clock_time_get( _precision: Timestamp, time: &mut Timestamp, ) -> Errno { - match id { - CLOCKID_MONOTONIC => { - *time = - wasi_clocks::monotonic_clock_now(wasi_default_clocks::default_monotonic_clock()); - } - CLOCKID_REALTIME => { - let res = wasi_clocks::wall_clock_now(wasi_default_clocks::default_wall_clock()); - *time = u64::from(res.nanoseconds) + res.seconds * 1_000_000_000; + State::with(|state| { + match id { + CLOCKID_MONOTONIC => { + *time = wasi_clocks::monotonic_clock_now(state.default_monotonic_clock()); + } + CLOCKID_REALTIME => { + let res = wasi_clocks::wall_clock_now(state.default_wall_clock()); + *time = u64::from(res.nanoseconds) + .checked_add(res.seconds) + .and_then(|secs| secs.checked_mul(1_000_000_000)) + .ok_or(ERRNO_OVERFLOW)?; + } + _ => unreachable(), } - _ => unreachable(), - } - ERRNO_SUCCESS + Ok(()) + }) } /// Provide file advisory information on a file descriptor. @@ -392,7 +406,10 @@ pub unsafe extern "C" fn fd_datasync(fd: Fd) -> Errno { #[no_mangle] pub unsafe extern "C" fn fd_fdstat_get(fd: Fd, stat: *mut Fdstat) -> Errno { State::with(|state| match state.get(fd)? { - Descriptor::File(file) => { + Descriptor::Streams(Streams { + type_: StreamType::File(file), + .. + }) => { let flags = wasi_filesystem::flags(file.fd)?; let type_ = wasi_filesystem::todo_type(file.fd)?; @@ -406,9 +423,6 @@ pub unsafe extern "C" fn fd_fdstat_get(fd: Fd, stat: *mut Fdstat) -> Errno { if !flags.contains(wasi_filesystem::DescriptorFlags::WRITE) { fs_rights_base &= !RIGHTS_FD_WRITE; } - if flags.contains(wasi_filesystem::DescriptorFlags::APPEND) { - fs_flags |= FDFLAGS_APPEND; - } if flags.contains(wasi_filesystem::DescriptorFlags::DSYNC) { fs_flags |= FDFLAGS_DSYNC; } @@ -444,7 +458,39 @@ pub unsafe extern "C" fn fd_fdstat_get(fd: Fd, stat: *mut Fdstat) -> Errno { }); Ok(()) } - Descriptor::EmptyStdin => { + Descriptor::Streams(Streams { + input, + output, + type_: StreamType::Socket(_), + }) + | Descriptor::Streams(Streams { + input, + output, + type_: StreamType::Unknown, + }) => { + let fs_filetype = FILETYPE_UNKNOWN; + let fs_flags = 0; + let mut fs_rights_base = 0; + if input.get().is_some() { + fs_rights_base |= RIGHTS_FD_READ; + } + if output.get().is_some() { + fs_rights_base |= RIGHTS_FD_WRITE; + } + let fs_rights_inheriting = fs_rights_base; + stat.write(Fdstat { + fs_filetype, + fs_flags, + fs_rights_base, + fs_rights_inheriting, + }); + Ok(()) + } + Descriptor::Streams(Streams { + input, + output, + type_: StreamType::EmptyStdin, + }) => { let fs_filetype = FILETYPE_UNKNOWN; let fs_flags = 0; let fs_rights_base = RIGHTS_FD_READ; @@ -457,8 +503,6 @@ pub unsafe extern "C" fn fd_fdstat_get(fd: Fd, stat: *mut Fdstat) -> Errno { }); Ok(()) } - // TODO: Handle socket case here once `wasi-tcp` has been fleshed out - Descriptor::Socket(_) => unreachable(), Descriptor::Closed(_) => Err(ERRNO_BADF), }) } @@ -468,9 +512,6 @@ pub unsafe extern "C" fn fd_fdstat_get(fd: Fd, stat: *mut Fdstat) -> Errno { #[no_mangle] pub unsafe extern "C" fn fd_fdstat_set_flags(fd: Fd, flags: Fdflags) -> Errno { let mut new_flags = wasi_filesystem::DescriptorFlags::empty(); - if flags & FDFLAGS_APPEND == FDFLAGS_APPEND { - new_flags |= wasi_filesystem::DescriptorFlags::APPEND; - } if flags & FDFLAGS_DSYNC == FDFLAGS_DSYNC { new_flags |= wasi_filesystem::DescriptorFlags::DSYNC; } @@ -594,12 +635,18 @@ pub unsafe extern "C" fn fd_pread( let read_len = unwrap_result(u32::try_from(len)); let file = state.get_file(fd)?; - let data = wasi_filesystem::pread(file.fd, read_len, offset)?; + let (data, end) = wasi_filesystem::pread(file.fd, read_len, offset)?; assert_eq!(data.as_ptr(), ptr); assert!(data.len() <= len); - *nread = data.len(); + + let len = data.len(); forget(data); - Ok(()) + if !end && len == 0 { + Err(ERRNO_INTR) + } else { + *nread = len; + Ok(()) + } }) } @@ -695,32 +742,42 @@ pub unsafe extern "C" fn fd_read( return ERRNO_SUCCESS; } - State::with(|state| { - let ptr = (*iovs_ptr).buf; - let len = (*iovs_ptr).buf_len; + let ptr = (*iovs_ptr).buf; + let len = (*iovs_ptr).buf_len; + State::with(|state| { state.register_buffer(ptr, len); - let read_len = unwrap_result(u32::try_from(len)); - let file = match state.get(fd)? { - Descriptor::File(f) => f, - Descriptor::Closed(_) | Descriptor::StdoutLog | Descriptor::StderrLog => { - return Err(ERRNO_BADF) + match state.get(fd)? { + Descriptor::Streams(streams) => { + let wasi_stream = streams.get_read_stream()?; + + let read_len = unwrap_result(u32::try_from(len)); + let wasi_stream = streams.get_read_stream()?; + let (data, end) = + wasi_poll::read_stream(wasi_stream, read_len).map_err(|_| ERRNO_IO)?; + + assert_eq!(data.as_ptr(), ptr); + assert!(data.len() <= len); + + // If this is a file, keep the current-position pointer up to date. + if let StreamType::File(file) = &streams.type_ { + file.position.set(file.position.get() + data.len() as u64); + } + + let len = data.len(); + forget(data); + if !end && len == 0 { + Err(ERRNO_INTR) + } else { + *nread = len; + Ok(()) + } } - // TODO: Handle socket case here once `wasi-tcp` has been fleshed out - Descriptor::Socket(_) => unreachable(), - Descriptor::EmptyStdin => { - *nread = 0; - return Ok(()); + Descriptor::StdoutLog | Descriptor::StderrLog | Descriptor::Closed(_) => { + Err(ERRNO_BADF) } - }; - let data = wasi_filesystem::pread(file.fd, read_len, file.position.get())?; - assert_eq!(data.as_ptr(), ptr); - assert!(data.len() <= len); - *nread = data.len(); - file.position.set(file.position.get() + data.len() as u64); - forget(data); - Ok(()) + } }) } @@ -959,19 +1016,26 @@ pub unsafe extern "C" fn fd_seek( newoffset: *mut Filesize, ) -> Errno { State::with(|state| { - let file = state.get_seekable_file(fd)?; - // It's ok to cast these indices; the WASI API will fail if - // the resulting values are out of range. - let from = match whence { - WHENCE_SET => wasi_filesystem::SeekFrom::Set(offset as _), - WHENCE_CUR => wasi_filesystem::SeekFrom::Cur(offset), - WHENCE_END => wasi_filesystem::SeekFrom::End(offset as _), - _ => return Err(ERRNO_INVAL), - }; - let result = wasi_filesystem::seek(file.fd, from)?; - file.position.set(result); - *newoffset = result; - Ok(()) + let stream = state.get_seekable_stream(fd)?; + + // Seeking only works on files. + if let StreamType::File(file) = &stream.type_ { + // It's ok to cast these indices; the WASI API will fail if + // the resulting values are out of range. + let from = match whence { + WHENCE_SET => offset, + WHENCE_CUR => (file.position.get() as i64).wrapping_add(offset), + WHENCE_END => (wasi_filesystem::stat(file.fd)?.size as i64) + offset, + _ => return Err(ERRNO_INVAL), + }; + stream.input.set(None); + stream.output.set(None); + file.position.set(from as u64); + *newoffset = from as u64; + Ok(()) + } else { + Err(ERRNO_SPIPE) + } }) } @@ -992,7 +1056,7 @@ pub unsafe extern "C" fn fd_sync(fd: Fd) -> Errno { pub unsafe extern "C" fn fd_tell(fd: Fd, offset: *mut Filesize) -> Errno { State::with(|state| { let file = state.get_seekable_file(fd)?; - *offset = wasi_filesystem::tell(file.fd)?; + *offset = file.position.get() as Filesize; Ok(()) }) } @@ -1021,10 +1085,16 @@ pub unsafe extern "C" fn fd_write( let bytes = slice::from_raw_parts(ptr, len); State::with(|state| match state.get(fd)? { - Descriptor::File(file) => { - let bytes = wasi_filesystem::pwrite(file.fd, bytes, file.position.get())?; + Descriptor::Streams(streams) => { + let wasi_stream = streams.get_write_stream()?; + let bytes = wasi_poll::write_stream(wasi_stream, bytes).map_err(|_| ERRNO_IO)?; + + // If this is a file, keep the current-position pointer up to date. + if let StreamType::File(file) = &streams.type_ { + file.position.set(file.position.get() + u64::from(bytes)); + } + *nwritten = bytes as usize; - file.position.set(file.position.get() + u64::from(bytes)); Ok(()) } Descriptor::StderrLog | Descriptor::StdoutLog => { @@ -1033,9 +1103,6 @@ pub unsafe extern "C" fn fd_write( *nwritten = len; Ok(()) } - // TODO: Handle socket case here once `wasi-tcp` has been fleshed out - Descriptor::Socket(_) => unreachable(), - Descriptor::EmptyStdin => Err(ERRNO_INVAL), Descriptor::Closed(_) => Err(ERRNO_BADF), }) } @@ -1183,9 +1250,13 @@ pub unsafe extern "C" fn path_open( let file = state.get_dir(fd)?; let result = wasi_filesystem::open_at(file.fd, at_flags, path, o_flags, flags, mode)?; - let desc = Descriptor::File(File { - fd: result, - position: Cell::new(0), + let desc = Descriptor::Streams(Streams { + input: Cell::new(None), + output: Cell::new(None), + type_: StreamType::File(File { + fd: result, + position: Cell::new(0), + }), }); let fd = match state.closed { @@ -1410,35 +1481,66 @@ pub unsafe extern "C" fn poll_oneoff( }; for subscription in subscriptions { + const EVENTTYPE_CLOCK: u8 = wasi::EVENTTYPE_CLOCK.raw(); + const EVENTTYPE_FD_READ: u8 = wasi::EVENTTYPE_FD_READ.raw(); + const EVENTTYPE_FD_WRITE: u8 = wasi::EVENTTYPE_FD_WRITE.raw(); futures.push(match subscription.u.tag { - 0 => { + EVENTTYPE_CLOCK => { let clock = &subscription.u.u.clock; let absolute = (clock.flags & SUBCLOCKFLAGS_SUBSCRIPTION_CLOCK_ABSTIME) == 0; match clock.id { - CLOCKID_REALTIME => wasi_clocks::subscribe_wall_clock( - wasi_clocks::Datetime { - seconds: unwrap((clock.timeout / 1_000_000_000).try_into().ok()), - nanoseconds: unwrap( - (clock.timeout % 1_000_000_000).try_into().ok(), - ), - }, - absolute, - ), + CLOCKID_REALTIME => { + let timeout = if absolute { + // Convert `clock.timeout` to `Datetime`. + let mut datetime = wasi_clocks::Datetime { + seconds: clock.timeout / 1_000_000_000, + nanoseconds: (clock.timeout % 1_000_000_000) as _, + }; + + // Subtract `now`. + let now = wasi_clocks::wall_clock_now(state.default_wall_clock()); + datetime.seconds -= now.seconds; + if datetime.nanoseconds < now.nanoseconds { + datetime.seconds -= 1; + datetime.nanoseconds += 1_000_000_000; + } + datetime.nanoseconds -= now.nanoseconds; + + // Convert to nanoseconds. + let nanos = datetime + .seconds + .checked_mul(1_000_000_000) + .ok_or(ERRNO_OVERFLOW)?; + nanos + .checked_add(datetime.nanoseconds.into()) + .ok_or(ERRNO_OVERFLOW)? + } else { + clock.timeout + }; - CLOCKID_MONOTONIC => { - wasi_clocks::subscribe_monotonic_clock(clock.timeout, absolute) + wasi_poll::subscribe_monotonic_clock( + state.default_monotonic_clock(), + timeout, + false, + ) } + CLOCKID_MONOTONIC => wasi_poll::subscribe_monotonic_clock( + state.default_monotonic_clock(), + clock.timeout, + absolute, + ), + _ => return Err(ERRNO_INVAL), } } - 1 => wasi_tcp::subscribe_read( - state.get_socket(subscription.u.u.fd_read.file_descriptor)?, + EVENTTYPE_FD_READ => wasi_poll::subscribe_read( + state.get_read_stream(subscription.u.u.fd_read.file_descriptor)?, ), - 2 => wasi_tcp::subscribe_write( - state.get_socket(subscription.u.u.fd_write.file_descriptor)?, + EVENTTYPE_FD_WRITE => wasi_poll::subscribe_write( + state.get_write_stream(subscription.u.u.fd_write.file_descriptor)?, ), _ => return Err(ERRNO_INVAL), @@ -1663,9 +1765,6 @@ fn flags_from_descriptor_flags( if fdflags & wasi::FDFLAGS_RSYNC == wasi::FDFLAGS_RSYNC { flags |= wasi_filesystem::DescriptorFlags::RSYNC; } - if fdflags & wasi::FDFLAGS_APPEND == wasi::FDFLAGS_APPEND { - flags |= wasi_filesystem::DescriptorFlags::APPEND; - } if fdflags & wasi::FDFLAGS_NONBLOCK == wasi::FDFLAGS_NONBLOCK { flags |= wasi_filesystem::DescriptorFlags::NONBLOCK; } @@ -1775,15 +1874,13 @@ fn black_box(x: Errno) -> Errno { } #[repr(C)] -pub enum Descriptor { +enum Descriptor { + /// A closed descriptor, holding a reference to the previous closed + /// descriptor to support reusing them. Closed(Option), - File(File), - Socket(wasi_tcp::Socket), - /// Initial state of fd 0 when `State` is created, representing a standard - /// input that is empty as it hasn't been configured yet. This is the - /// permanent fd 0 marker if `command` is never called. - EmptyStdin, + /// Input and/or output wasi-streams, along with stream metadata. + Streams(Streams), /// Initial state of fd 1 when `State` is created, representing that writes /// to `fd_write` will go to a call to `log`. This is overwritten during @@ -1795,21 +1892,99 @@ pub enum Descriptor { StderrLog, } +/// Input and/or output wasi-streams, along with a stream type that +/// identifies what kind of stream they are and possibly supporting +/// type-specific operations like seeking. +struct Streams { + /// The output stream, if present. + input: Cell>, + + /// The input stream, if present. + output: Cell>, + + /// Information about the source of the stream. + type_: StreamType, +} + +impl Streams { + /// Return the input stream, initializing it on the fly if needed. + fn get_read_stream(&self) -> Result { + match &self.input.get() { + Some(wasi_stream) => Ok(*wasi_stream), + None => match &self.type_ { + // For files, we may have adjusted the position for seeking, so + // create a new stream. + StreamType::File(file) => { + let input = wasi_filesystem::read_via_stream(file.fd, file.position.get())?; + self.input.set(Some(input)); + Ok(input) + } + _ => Err(ERRNO_BADF), + }, + } + } + + /// Return the output stream, initializing it on the fly if needed. + fn get_write_stream(&self) -> Result { + match &self.output.get() { + Some(wasi_stream) => Ok(*wasi_stream), + None => match &self.type_ { + // For files, we may have adjusted the position for seeking, so + // create a new stream. + StreamType::File(file) => { + let output = wasi_filesystem::write_via_stream(file.fd, file.position.get())?; + self.output.set(Some(output)); + Ok(output) + } + _ => Err(ERRNO_BADF), + }, + } + } +} + +#[allow(dead_code)] // until Socket is implemented +enum StreamType { + /// It's a valid stream but we don't know where it comes from. + Unknown, + + /// A stdin source containing no bytes. + EmptyStdin, + + /// Streaming data with a file. + File(File), + + /// Streaming data with a socket. + Socket(wasi_tcp::Socket), +} + impl Drop for Descriptor { fn drop(&mut self) { match self { - Descriptor::File(file) => wasi_filesystem::close(file.fd), - Descriptor::StdoutLog | Descriptor::StderrLog | Descriptor::EmptyStdin => {} - Descriptor::Socket(_) => unreachable(), + Descriptor::Streams(stream) => { + if let Some(input) = stream.input.get() { + wasi_poll::drop_stream(input); + } + if let Some(output) = stream.output.get() { + wasi_poll::drop_stream(output); + } + match &stream.type_ { + StreamType::File(file) => wasi_filesystem::close(file.fd), + StreamType::Socket(_) => unreachable(), + StreamType::EmptyStdin | StreamType::Unknown => {} + } + } + Descriptor::StdoutLog | Descriptor::StderrLog => {} Descriptor::Closed(_) => {} } } } #[repr(C)] -pub struct File { +struct File { /// The handle to the preview2 descriptor that this file is referencing. fd: wasi_filesystem::Descriptor, + + /// The current-position pointer. position: Cell, } @@ -1861,6 +2036,12 @@ struct State { /// Cache for the `fd_readdir` call for a final `wasi::Dirent` plus path /// name that didn't fit into the caller's buffer. dirent_cache: DirentCache, + + /// The clock handle for `CLOCKID_MONOTONIC`. + default_monotonic_clock: Cell>, + + /// The clock handle for `CLOCKID_REALTIME`. + default_wall_clock: Cell>, } struct DirentCache { @@ -1921,7 +2102,7 @@ const fn command_data_size() -> usize { start -= size_of::(); // Remove miscellaneous metadata also stored in state. - start -= 14 * size_of::(); + start -= 17 * size_of::(); // Everything else is the `command_data` allocation. start @@ -2004,6 +2185,8 @@ impl State { }), path_data: UnsafeCell::new(MaybeUninit::uninit()), }, + default_monotonic_clock: Cell::new(None), + default_wall_clock: Cell::new(None), })); &*ret }; @@ -2014,7 +2197,11 @@ impl State { } fn init(&mut self) { - unwrap_result(self.push_desc(Descriptor::EmptyStdin)); + unwrap_result(self.push_desc(Descriptor::Streams(Streams { + input: Cell::new(None), + output: Cell::new(None), + type_: StreamType::Unknown, + }))); unwrap_result(self.push_desc(Descriptor::StdoutLog)); unwrap_result(self.push_desc(Descriptor::StderrLog)); } @@ -2062,17 +2249,32 @@ impl State { .ok_or(ERRNO_BADF) } + fn get_stream_with_error(&self, fd: Fd, error: Errno) -> Result<&Streams, Errno> { + match self.get(fd)? { + Descriptor::Streams(streams) => Ok(streams), + Descriptor::Closed(_) => Err(ERRNO_BADF), + _ => Err(error), + } + } + fn get_file_with_error(&self, fd: Fd, error: Errno) -> Result<&File, Errno> { match self.get(fd)? { - Descriptor::File(file) => Ok(file), + Descriptor::Streams(Streams { + type_: StreamType::File(file), + .. + }) => Ok(file), Descriptor::Closed(_) => Err(ERRNO_BADF), _ => Err(error), } } + #[allow(dead_code)] // until Socket is implemented fn get_socket(&self, fd: Fd) -> Result { match self.get(fd)? { - Descriptor::Socket(socket) => Ok(*socket), + Descriptor::Streams(Streams { + type_: StreamType::Socket(socket), + .. + }) => Ok(*socket), Descriptor::Closed(_) => Err(ERRNO_BADF), _ => Err(ERRNO_INVAL), } @@ -2090,10 +2292,62 @@ impl State { self.get_file_with_error(fd, ERRNO_SPIPE) } + fn get_seekable_stream(&self, fd: Fd) -> Result<&Streams, Errno> { + self.get_stream_with_error(fd, ERRNO_SPIPE) + } + + fn get_read_stream(&self, fd: Fd) -> Result { + match self.get(fd)? { + Descriptor::Streams(streams) => streams.get_read_stream(), + Descriptor::Closed(_) | Descriptor::StdoutLog | Descriptor::StderrLog => { + Err(ERRNO_BADF) + } + } + } + + fn get_write_stream(&self, fd: Fd) -> Result { + match self.get(fd)? { + Descriptor::Streams(streams) => streams.get_write_stream(), + Descriptor::Closed(_) | Descriptor::StdoutLog | Descriptor::StderrLog => { + Err(ERRNO_BADF) + } + } + } + /// Register `buf` and `buf_len` to be used by `cabi_realloc` to satisfy /// the next request. fn register_buffer(&self, buf: *mut u8, buf_len: usize) { self.buffer_ptr.set(buf); self.buffer_len.set(buf_len); } + + /// Return a handle to the default wall clock, creating one if we + /// don't already have one. + fn default_wall_clock(&self) -> Fd { + match self.default_wall_clock.get() { + Some(fd) => fd, + None => self.init_default_wall_clock(), + } + } + + fn init_default_wall_clock(&self) -> Fd { + let clock = wasi_default_clocks::default_wall_clock(); + self.default_wall_clock.set(Some(clock)); + clock + } + + /// Return a handle to the default monotonic clock, creating one if we + /// don't already have one. + fn default_monotonic_clock(&self) -> Fd { + match self.default_monotonic_clock.get() { + Some(fd) => fd, + None => self.init_default_monotonic_clock(), + } + } + + fn init_default_monotonic_clock(&self) -> Fd { + let clock = wasi_default_clocks::default_monotonic_clock(); + self.default_monotonic_clock.set(Some(clock)); + clock + } } diff --git a/wasi-common/Cargo.toml b/wasi-common/Cargo.toml index 519399d7..25982a63 100644 --- a/wasi-common/Cargo.toml +++ b/wasi-common/Cargo.toml @@ -24,6 +24,7 @@ thiserror = { workspace = true } tracing = { workspace = true } cap-std = { workspace = true } cap-rand = { workspace = true } +cap-fs-ext = { workspace = true } bitflags = { workspace = true } async-trait = { workspace = true } diff --git a/wasi-common/cap-std-sync/Cargo.toml b/wasi-common/cap-std-sync/Cargo.toml index c2508d94..ea4ac416 100644 --- a/wasi-common/cap-std-sync/Cargo.toml +++ b/wasi-common/cap-std-sync/Cargo.toml @@ -16,7 +16,7 @@ wasi-common = { workspace = true } async-trait = { workspace = true } anyhow = { workspace = true } cap-std = { workspace = true } -cap-fs-ext = "1.0.0" +cap-fs-ext = { workspace = true } cap-time-ext = "1.0.0" cap-rand = { workspace = true } fs-set-times = "0.18.0" diff --git a/wasi-common/cap-std-sync/src/clocks.rs b/wasi-common/cap-std-sync/src/clocks.rs index 96367d09..60792ee7 100644 --- a/wasi-common/cap-std-sync/src/clocks.rs +++ b/wasi-common/cap-std-sync/src/clocks.rs @@ -1,60 +1,106 @@ -use crate::Table; -use cap_std::time::{Duration, Instant, SystemTime}; +use cap_std::time::{Duration, Instant, SystemClock}; use cap_std::{ambient_authority, AmbientAuthority}; use cap_time_ext::{MonotonicClockExt, SystemClockExt}; -use wasi_common::clocks::{WallClock, WasiClocks, WasiMonotonicClock, WasiSystemClock}; +use wasi_common::clocks::{WasiClocks, WasiMonotonicClock, WasiWallClock}; -pub struct SystemClock(cap_std::time::SystemClock); +pub struct WallClock { + /// The underlying system clock. + clock: cap_std::time::SystemClock, -impl SystemClock { + /// The ambient authority used to create this `WallClock` and + /// which we use to create clones of it. + ambient_authority: AmbientAuthority, +} + +impl WallClock { pub fn new(ambient_authority: AmbientAuthority) -> Self { - SystemClock(cap_std::time::SystemClock::new(ambient_authority)) + Self { + clock: cap_std::time::SystemClock::new(ambient_authority), + ambient_authority, + } } } -impl WasiSystemClock for SystemClock { + +impl WasiWallClock for WallClock { fn resolution(&self) -> Duration { - self.0.resolution() + self.clock.resolution() + } + + fn now(&self) -> Duration { + // WASI defines wall clocks to return "Unix time". + self.clock + .now() + .duration_since(SystemClock::UNIX_EPOCH) + .unwrap() } - fn now(&self, precision: Duration) -> SystemTime { - self.0.now_with(precision) + + fn dup(&self) -> Box { + let clock = cap_std::time::SystemClock::new(self.ambient_authority); + Box::new(Self { + clock, + ambient_authority: self.ambient_authority, + }) } } -pub struct MonotonicClock(cap_std::time::MonotonicClock); +pub struct MonotonicClock { + /// The underlying system clock. + clock: cap_std::time::MonotonicClock, + + /// The `Instant` this clock was created. All returned times are + /// durations since that time. + initial: Instant, + + /// The ambient authority used to create this `MonotonicClock` and + /// which we use to create clones of it. + ambient_authority: AmbientAuthority, +} impl MonotonicClock { pub fn new(ambient_authority: AmbientAuthority) -> Self { - MonotonicClock(cap_std::time::MonotonicClock::new(ambient_authority)) + let clock = cap_std::time::MonotonicClock::new(ambient_authority); + let initial = clock.now(); + Self { + clock, + initial, + ambient_authority, + } } } + impl WasiMonotonicClock for MonotonicClock { - fn resolution(&self) -> Duration { - self.0.resolution() - } - fn now(&self, precision: Duration) -> Instant { - self.0.now_with(precision) + fn resolution(&self) -> u64 { + self.clock.resolution().as_nanos().try_into().unwrap() } -} -pub fn clocks_ctx(table: &mut Table) -> WasiClocks { - let system = Box::new(SystemClock::new(ambient_authority())); - let monotonic = cap_std::time::MonotonicClock::new(ambient_authority()); - let creation_time = monotonic.now(); - let monotonic = Box::new(MonotonicClock(monotonic)) as Box; + fn now(&self) -> u64 { + // Unwrap here and in `resolution` above; a `u64` is wide enough to + // hold over 584 years of nanoseconds. + self.clock + .now() + .duration_since(self.initial) + .as_nanos() + .try_into() + .unwrap() + } - let default_monotonic = table - .push(Box::new(wasi_common::clocks::MonotonicClock::from( - monotonic.as_ref(), - ))) - .unwrap(); + fn dup(&self) -> Box { + let clock = cap_std::time::MonotonicClock::new(self.ambient_authority); + Box::new(Self { + clock, + initial: self.initial, + ambient_authority: self.ambient_authority, + }) + } +} - let default_wall = table.push(Box::new(WallClock::default())).unwrap(); +pub fn clocks_ctx() -> WasiClocks { + // Create the default clock resources. + let default_monotonic_clock = Box::new(MonotonicClock::new(ambient_authority())); + let default_wall_clock = Box::new(WallClock::new(ambient_authority())); WasiClocks { - system, - monotonic, - creation_time, - default_monotonic, - default_wall, + default_monotonic_clock, + default_wall_clock, } } diff --git a/wasi-common/cap-std-sync/src/file.rs b/wasi-common/cap-std-sync/src/file.rs index 8fe395fd..e34aabe4 100644 --- a/wasi-common/cap-std-sync/src/file.rs +++ b/wasi-common/cap-std-sync/src/file.rs @@ -5,10 +5,8 @@ use is_terminal::IsTerminal; use std::any::Any; use std::convert::TryInto; use std::io; -use system_interface::{ - fs::{FileIoExt, GetSetFdFlags}, - io::{IoExt, ReadReady}, -}; +use system_interface::fs::{FileIoExt, GetSetFdFlags}; +use system_interface::io::IsReadWrite; use wasi_common::{ file::{Advice, FdFlags, FileType, Filestat, WasiFile}, Error, ErrorExt, @@ -36,6 +34,12 @@ impl WasiFile for File { fn pollable(&self) -> Option { Some(self.0.as_raw_handle_or_socket()) } + + async fn try_clone(&mut self) -> Result, Error> { + let clone = self.0.try_clone()?; + Ok(Box::new(Self(clone))) + } + async fn datasync(&mut self) -> Result<(), Error> { self.0.sync_data()?; Ok(()) @@ -44,11 +48,11 @@ impl WasiFile for File { self.0.sync_all()?; Ok(()) } - async fn get_filetype(&mut self) -> Result { + async fn get_filetype(&self) -> Result { let meta = self.0.metadata()?; Ok(filetype_from(&meta.file_type())) } - async fn get_fdflags(&mut self) -> Result { + async fn get_fdflags(&self) -> Result { let fdflags = get_fd_flags(&self.0)?; Ok(fdflags) } @@ -64,7 +68,7 @@ impl WasiFile for File { self.0.set_fd_flags(set_fd_flags)?; Ok(()) } - async fn get_filestat(&mut self) -> Result { + async fn get_filestat(&self) -> Result { let meta = self.0.metadata()?; Ok(Filestat { device_id: meta.dev(), @@ -98,20 +102,31 @@ impl WasiFile for File { .set_times(convert_systimespec(atime), convert_systimespec(mtime))?; Ok(()) } - async fn read_vectored<'a>(&mut self, bufs: &mut [io::IoSliceMut<'a>]) -> Result { - let n = self.0.read_vectored(bufs)?; - Ok(n.try_into()?) + async fn read_at<'a>(&mut self, buf: &mut [u8], offset: u64) -> Result<(u64, bool), Error> { + match self.0.read_at(buf, offset) { + Ok(0) => Ok((0, true)), + Ok(n) => Ok((n as u64, false)), + Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok((0, false)), + Err(err) => Err(err.into()), + } } async fn read_vectored_at<'a>( &mut self, bufs: &mut [io::IoSliceMut<'a>], offset: u64, - ) -> Result { - let n = self.0.read_vectored_at(bufs, offset)?; - Ok(n.try_into()?) + ) -> Result<(u64, bool), Error> { + match self.0.read_vectored_at(bufs, offset) { + Ok(0) => Ok((0, true)), + Ok(n) => Ok((n as u64, false)), + Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok((0, false)), + Err(err) => Err(err.into()), + } + } + fn is_read_vectored_at(&self) -> bool { + self.0.is_read_vectored_at() } - async fn write_vectored<'a>(&mut self, bufs: &[io::IoSlice<'a>]) -> Result { - let n = self.0.write_vectored(bufs)?; + async fn write_at<'a>(&mut self, buf: &[u8], offset: u64) -> Result { + let n = self.0.write_at(buf, offset)?; Ok(n.try_into()?) } async fn write_vectored_at<'a>( @@ -122,19 +137,28 @@ impl WasiFile for File { let n = self.0.write_vectored_at(bufs, offset)?; Ok(n.try_into()?) } - async fn seek(&mut self, pos: std::io::SeekFrom) -> Result { - Ok(self.0.seek(pos)?) - } - async fn peek(&mut self, buf: &mut [u8]) -> Result { - let n = self.0.peek(buf)?; - Ok(n.try_into()?) - } - async fn num_ready_bytes(&self) -> Result { - Ok(self.0.num_ready_bytes()?) + fn is_write_vectored_at(&self) -> bool { + self.0.is_write_vectored_at() } fn isatty(&mut self) -> bool { self.0.is_terminal() } + + async fn readable(&self) -> Result<(), Error> { + if is_read_write(&self.0)?.0 { + Ok(()) + } else { + Err(Error::badf()) + } + } + + async fn writable(&self) -> Result<(), Error> { + if is_read_write(&self.0)?.1 { + Ok(()) + } else { + Err(Error::badf()) + } + } } pub fn filetype_from(ft: &cap_std::fs::FileType) -> FileType { @@ -245,6 +269,13 @@ pub fn get_fd_flags(f: Filelike) -> io::Result(f: Filelike) -> io::Result<(bool, bool)> { + f.is_read_write() +} + fn convert_advice(advice: Advice) -> system_interface::fs::Advice { match advice { Advice::Normal => system_interface::fs::Advice::Normal, diff --git a/wasi-common/cap-std-sync/src/lib.rs b/wasi-common/cap-std-sync/src/lib.rs index 21fb9e0d..739b8da0 100644 --- a/wasi-common/cap-std-sync/src/lib.rs +++ b/wasi-common/cap-std-sync/src/lib.rs @@ -46,31 +46,30 @@ pub use cap_std::net::TcpListener; pub use clocks::clocks_ctx; pub use sched::sched_ctx; -use crate::net::Socket; +use crate::net::Listener; use cap_rand::{Rng, RngCore, SeedableRng}; -use wasi_common::{table::Table, WasiCtx, WasiFile}; +use wasi_common::{listener::WasiListener, stream::WasiStream, table::Table, WasiCtx}; pub struct WasiCtxBuilder(WasiCtx); impl WasiCtxBuilder { pub fn new() -> Self { - let mut table = Table::new(); WasiCtxBuilder(WasiCtx::new( random_ctx(), - clocks_ctx(&mut table), + clocks_ctx(), sched_ctx(), - table, + Table::new(), )) } - pub fn stdin(mut self, f: Box) -> Self { + pub fn stdin(mut self, f: Box) -> Self { self.0.set_stdin(f); self } - pub fn stdout(mut self, f: Box) -> Self { + pub fn stdout(mut self, f: Box) -> Self { self.0.set_stdout(f); self } - pub fn stderr(mut self, f: Box) -> Self { + pub fn stderr(mut self, f: Box) -> Self { self.0.set_stderr(f); self } @@ -91,11 +90,11 @@ impl WasiCtxBuilder { self.0.insert_dir(fd, dir); self } - pub fn preopened_socket(mut self, fd: u32, socket: impl Into) -> Self { - let socket: Socket = socket.into(); - let file: Box = socket.into(); + pub fn preopened_listener(mut self, fd: u32, listener: impl Into) -> Self { + let listener: Listener = listener.into(); + let listener: Box = listener.into(); - self.0.insert_file(fd, file); + self.0.insert_listener(fd, listener); self } pub fn build(self) -> WasiCtx { diff --git a/wasi-common/cap-std-sync/src/net.rs b/wasi-common/cap-std-sync/src/net.rs index b20a7148..d009fd11 100644 --- a/wasi-common/cap-std-sync/src/net.rs +++ b/wasi-common/cap-std-sync/src/net.rs @@ -7,70 +7,90 @@ use io_lifetimes::{AsFd, BorrowedFd}; use io_lifetimes::{AsSocket, BorrowedSocket}; use std::any::Any; use std::convert::TryInto; -use std::io; -#[cfg(unix)] +use std::io::{self, Read, Write}; use system_interface::fs::GetSetFdFlags; use system_interface::io::IoExt; use system_interface::io::IsReadWrite; use system_interface::io::ReadReady; use wasi_common::{ - file::{FdFlags, FileType, RiFlags, RoFlags, SdFlags, SiFlags, WasiFile}, + connection::{RiFlags, RoFlags, SdFlags, SiFlags, WasiConnection}, + listener::WasiListener, + stream::WasiStream, Error, ErrorExt, }; -pub enum Socket { +pub enum Listener { TcpListener(cap_std::net::TcpListener), + #[cfg(unix)] + UnixListener(cap_std::os::unix::net::UnixListener), +} + +pub enum Connection { TcpStream(cap_std::net::TcpStream), #[cfg(unix)] UnixStream(cap_std::os::unix::net::UnixStream), - #[cfg(unix)] - UnixListener(cap_std::os::unix::net::UnixListener), } -impl From for Socket { +impl From for Listener { fn from(listener: cap_std::net::TcpListener) -> Self { Self::TcpListener(listener) } } -impl From for Socket { +impl From for Connection { fn from(stream: cap_std::net::TcpStream) -> Self { Self::TcpStream(stream) } } #[cfg(unix)] -impl From for Socket { +impl From for Listener { fn from(listener: cap_std::os::unix::net::UnixListener) -> Self { Self::UnixListener(listener) } } #[cfg(unix)] -impl From for Socket { +impl From for Connection { fn from(stream: cap_std::os::unix::net::UnixStream) -> Self { Self::UnixStream(stream) } } #[cfg(unix)] -impl From for Box { - fn from(listener: Socket) -> Self { +impl From for Box { + fn from(listener: Listener) -> Self { + match listener { + Listener::TcpListener(l) => Box::new(crate::net::TcpListener::from_cap_std(l)), + Listener::UnixListener(l) => Box::new(crate::net::UnixListener::from_cap_std(l)), + } + } +} + +#[cfg(windows)] +impl From for Box { + fn from(listener: Listener) -> Self { + match listener { + Listener::TcpListener(l) => Box::new(crate::net::TcpListener::from_cap_std(l)), + } + } +} + +#[cfg(unix)] +impl From for Box { + fn from(listener: Connection) -> Self { match listener { - Socket::TcpListener(l) => Box::new(crate::net::TcpListener::from_cap_std(l)), - Socket::UnixListener(l) => Box::new(crate::net::UnixListener::from_cap_std(l)), - Socket::TcpStream(l) => Box::new(crate::net::TcpStream::from_cap_std(l)), - Socket::UnixStream(l) => Box::new(crate::net::UnixStream::from_cap_std(l)), + Connection::TcpStream(l) => Box::new(crate::net::TcpStream::from_cap_std(l)), + Connection::UnixStream(l) => Box::new(crate::net::UnixStream::from_cap_std(l)), } } } #[cfg(windows)] -impl From for Box { - fn from(listener: Socket) -> Self { +impl From for Box { + fn from(listener: Connection) -> Self { match listener { - Socket::TcpListener(l) => Box::new(crate::net::TcpListener::from_cap_std(l)), - Socket::TcpStream(l) => Box::new(crate::net::TcpStream::from_cap_std(l)), + Connection::TcpStream(l) => Box::new(crate::net::TcpStream::from_cap_std(l)), } } } @@ -78,48 +98,30 @@ impl From for Box { macro_rules! wasi_listen_write_impl { ($ty:ty, $stream:ty) => { #[async_trait::async_trait] - impl WasiFile for $ty { + impl WasiListener for $ty { fn as_any(&self) -> &dyn Any { self } - #[cfg(unix)] - fn pollable(&self) -> Option { - Some(self.0.as_fd()) - } - #[cfg(windows)] - fn pollable(&self) -> Option { - Some(self.0.as_raw_handle_or_socket()) - } - async fn sock_accept(&mut self, fdflags: FdFlags) -> Result, Error> { + async fn sock_accept( + &mut self, + nonblocking: bool, + ) -> Result, Error> { let (stream, _) = self.0.accept()?; - let mut stream = <$stream>::from_cap_std(stream); - stream.set_fdflags(fdflags).await?; + stream.set_nonblocking(nonblocking)?; + let stream = <$stream>::from_cap_std(stream); Ok(Box::new(stream)) } - async fn get_filetype(&mut self) -> Result { - Ok(FileType::SocketStream) + + fn get_nonblocking(&mut self) -> Result { + let s = self.0.as_socketlike().get_fd_flags()?; + Ok(s.contains(system_interface::fs::FdFlags::NONBLOCK)) } - #[cfg(unix)] - async fn get_fdflags(&mut self) -> Result { - let fdflags = get_fd_flags(&self.0)?; - Ok(fdflags) - } - async fn set_fdflags(&mut self, fdflags: FdFlags) -> Result<(), Error> { - if fdflags == wasi_common::file::FdFlags::NONBLOCK { - self.0.set_nonblocking(true)?; - } else if fdflags.is_empty() { - self.0.set_nonblocking(false)?; - } else { - return Err( - Error::invalid_argument().context("cannot set anything else than NONBLOCK") - ); - } + + fn set_nonblocking(&mut self, flag: bool) -> Result<(), Error> { + self.0.set_nonblocking(flag)?; Ok(()) } - async fn num_ready_bytes(&self) -> Result { - Ok(1) - } } #[cfg(windows)] @@ -172,80 +174,14 @@ wasi_listen_write_impl!(UnixListener, UnixStream); macro_rules! wasi_stream_write_impl { ($ty:ty, $std_ty:ty) => { #[async_trait::async_trait] - impl WasiFile for $ty { + impl WasiConnection for $ty { fn as_any(&self) -> &dyn Any { self } - #[cfg(unix)] - fn pollable(&self) -> Option { - Some(self.0.as_fd()) - } - - #[cfg(windows)] - fn pollable(&self) -> Option { - Some(self.0.as_raw_handle_or_socket()) - } - async fn get_filetype(&mut self) -> Result { - Ok(FileType::SocketStream) - } - #[cfg(unix)] - async fn get_fdflags(&mut self) -> Result { - let fdflags = get_fd_flags(&self.0)?; - Ok(fdflags) - } - async fn set_fdflags(&mut self, fdflags: FdFlags) -> Result<(), Error> { - if fdflags == wasi_common::file::FdFlags::NONBLOCK { - self.0.set_nonblocking(true)?; - } else if fdflags.is_empty() { - self.0.set_nonblocking(false)?; - } else { - return Err( - Error::invalid_argument().context("cannot set anything else than NONBLOCK") - ); - } - Ok(()) - } - async fn read_vectored<'a>( - &mut self, - bufs: &mut [io::IoSliceMut<'a>], - ) -> Result { - use std::io::Read; - let n = Read::read_vectored(&mut &*self.as_socketlike_view::<$std_ty>(), bufs)?; - Ok(n.try_into()?) - } - async fn write_vectored<'a>(&mut self, bufs: &[io::IoSlice<'a>]) -> Result { - use std::io::Write; - let n = Write::write_vectored(&mut &*self.as_socketlike_view::<$std_ty>(), bufs)?; - Ok(n.try_into()?) - } - async fn peek(&mut self, buf: &mut [u8]) -> Result { - let n = self.0.peek(buf)?; - Ok(n.try_into()?) - } - async fn num_ready_bytes(&self) -> Result { - let val = self.as_socketlike_view::<$std_ty>().num_ready_bytes()?; - Ok(val) - } - async fn readable(&self) -> Result<(), Error> { - let (readable, _writeable) = is_read_write(&self.0)?; - if readable { - Ok(()) - } else { - Err(Error::io()) - } - } - async fn writable(&self) -> Result<(), Error> { - let (_readable, writeable) = is_read_write(&self.0)?; - if writeable { - Ok(()) - } else { - Err(Error::io()) - } - } async fn sock_recv<'a>( &mut self, - ri_data: &mut [std::io::IoSliceMut<'a>], + ri_data: &mut [io::IoSliceMut<'a>], ri_flags: RiFlags, ) -> Result<(u64, RoFlags), Error> { if (ri_flags & !(RiFlags::RECV_PEEK | RiFlags::RECV_WAITALL)) != RiFlags::empty() { @@ -273,7 +209,7 @@ macro_rules! wasi_stream_write_impl { async fn sock_send<'a>( &mut self, - si_data: &[std::io::IoSlice<'a>], + si_data: &[io::IoSlice<'a>], si_flags: SiFlags, ) -> Result { if si_flags != SiFlags::empty() { @@ -297,6 +233,133 @@ macro_rules! wasi_stream_write_impl { self.0.shutdown(how)?; Ok(()) } + + fn get_nonblocking(&mut self) -> Result { + let s = self.0.as_socketlike().get_fd_flags()?; + Ok(s.contains(system_interface::fs::FdFlags::NONBLOCK)) + } + + fn set_nonblocking(&mut self, flag: bool) -> Result<(), Error> { + self.0.set_nonblocking(flag)?; + Ok(()) + } + + async fn readable(&self) -> Result<(), Error> { + if is_read_write(&self.0)?.0 { + Ok(()) + } else { + Err(Error::badf()) + } + } + + async fn writable(&self) -> Result<(), Error> { + if is_read_write(&self.0)?.1 { + Ok(()) + } else { + Err(Error::badf()) + } + } + } + + #[async_trait::async_trait] + impl WasiStream for $ty { + fn as_any(&self) -> &dyn Any { + self + } + #[cfg(unix)] + fn pollable_read(&self) -> Option { + Some(self.0.as_fd()) + } + #[cfg(unix)] + fn pollable_write(&self) -> Option { + Some(self.0.as_fd()) + } + + #[cfg(windows)] + fn pollable_read(&self) -> Option { + Some(self.0.as_raw_handle_or_socket()) + } + #[cfg(windows)] + fn pollable_write(&self) -> Option { + Some(self.0.as_raw_handle_or_socket()) + } + + async fn read(&mut self, buf: &mut [u8]) -> Result<(u64, bool), Error> { + match Read::read(&mut &*self.as_socketlike_view::<$std_ty>(), buf) { + Ok(0) => Ok((0, true)), + Ok(n) => Ok((n as u64, false)), + Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok((0, false)), + Err(err) => Err(err.into()), + } + } + async fn read_vectored<'a>( + &mut self, + bufs: &mut [io::IoSliceMut<'a>], + ) -> Result<(u64, bool), Error> { + match Read::read_vectored(&mut &*self.as_socketlike_view::<$std_ty>(), bufs) { + Ok(0) => Ok((0, true)), + Ok(n) => Ok((n as u64, false)), + Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok((0, false)), + Err(err) => Err(err.into()), + } + } + #[cfg(can_vector)] + fn is_read_vectored(&self) -> bool { + Read::is_read_vectored(&mut &*self.as_socketlike_view::<$std_ty>()) + } + async fn write(&mut self, buf: &[u8]) -> Result { + let n = Write::write(&mut &*self.as_socketlike_view::<$std_ty>(), buf)?; + Ok(n.try_into()?) + } + async fn write_vectored<'a>(&mut self, bufs: &[io::IoSlice<'a>]) -> Result { + let n = Write::write_vectored(&mut &*self.as_socketlike_view::<$std_ty>(), bufs)?; + Ok(n.try_into()?) + } + #[cfg(can_vector)] + fn is_write_vectored(&self) -> bool { + Write::is_write_vectored(&mut &*self.as_socketlike_view::<$std_ty>()) + } + async fn splice( + &mut self, + dst: &mut dyn WasiStream, + nelem: u64, + ) -> Result<(u64, bool), Error> { + if let Some(handle) = dst.pollable_write() { + let num = io::copy( + &mut io::Read::take(&self.0, nelem), + &mut &*handle.as_socketlike_view::<$std_ty>(), + )?; + Ok((num, num < nelem)) + } else { + WasiStream::splice(self, dst, nelem).await + } + } + async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> { + let num = io::copy(&mut io::Read::take(&self.0, nelem), &mut io::sink())?; + Ok((num, num < nelem)) + } + async fn write_repeated(&mut self, byte: u8, nelem: u64) -> Result { + let num = io::copy(&mut io::Read::take(io::repeat(byte), nelem), &mut self.0)?; + Ok(num) + } + async fn num_ready_bytes(&self) -> Result { + let val = self.as_socketlike_view::<$std_ty>().num_ready_bytes()?; + Ok(val) + } + async fn readable(&self) -> Result<(), Error> { + if is_read_write(&self.0)?.0 { + Ok(()) + } else { + Err(Error::badf()) + } + } + async fn writable(&self) -> Result<(), Error> { + if is_read_write(&self.0)?.1 { + Ok(()) + } else { + Err(Error::badf()) + } + } } #[cfg(unix)] impl AsFd for $ty { @@ -346,45 +409,6 @@ impl UnixStream { #[cfg(unix)] wasi_stream_write_impl!(UnixStream, std::os::unix::net::UnixStream); -pub fn filetype_from(ft: &cap_std::fs::FileType) -> FileType { - use cap_fs_ext::FileTypeExt; - if ft.is_block_device() { - FileType::SocketDgram - } else { - FileType::SocketStream - } -} - -/// Return the file-descriptor flags for a given file-like object. -/// -/// This returns the flags needed to implement [`WasiFile::get_fdflags`]. -pub fn get_fd_flags( - f: Socketlike, -) -> io::Result { - // On Unix-family platforms, we can use the same system call that we'd use - // for files on sockets here. - #[cfg(not(windows))] - { - let mut out = wasi_common::file::FdFlags::empty(); - if f.get_fd_flags()? - .contains(system_interface::fs::FdFlags::NONBLOCK) - { - out |= wasi_common::file::FdFlags::NONBLOCK; - } - Ok(out) - } - - // On Windows, sockets are different, and there is no direct way to - // query for the non-blocking flag. We can get a sufficient approximation - // by testing whether a zero-length `recv` appears to block. - #[cfg(windows)] - match rustix::net::recv(f, &mut [], rustix::net::RecvFlags::empty()) { - Ok(_) => Ok(wasi_common::file::FdFlags::empty()), - Err(rustix::io::Errno::WOULDBLOCK) => Ok(wasi_common::file::FdFlags::NONBLOCK), - Err(e) => Err(e.into()), - } -} - /// Return the file-descriptor flags for a given file-like object. /// /// This returns the flags needed to implement [`wasi_common::WasiFile::get_fdflags`]. diff --git a/wasi-common/cap-std-sync/src/sched/unix.rs b/wasi-common/cap-std-sync/src/sched/unix.rs index c53acf1a..70ed04ac 100644 --- a/wasi-common/cap-std-sync/src/sched/unix.rs +++ b/wasi-common/cap-std-sync/src/sched/unix.rs @@ -1,4 +1,3 @@ -use cap_std::time::Duration; use rustix::io::{PollFd, PollFlags}; use std::convert::TryInto; use wasi_common::sched::subscription::{RwEventFlags, Subscription}; @@ -13,16 +12,16 @@ pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { match s { Subscription::Read(f) => { let fd = f - .file - .pollable() + .stream + .pollable_read() .ok_or(Error::invalid_argument().context("file is not pollable"))?; pollfds.push(PollFd::from_borrowed_fd(fd, PollFlags::IN)); } Subscription::Write(f) => { let fd = f - .file - .pollable() + .stream + .pollable_write() .ok_or(Error::invalid_argument().context("file is not pollable"))?; pollfds.push(PollFd::from_borrowed_fd(fd, PollFlags::OUT)); } @@ -32,8 +31,13 @@ pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { let ready = loop { let poll_timeout = if let Some(t) = poll.earliest_clock_deadline() { - let duration = t.duration_until().unwrap_or(Duration::from_secs(0)); - (duration.as_millis() + 1) // XXX try always rounding up? + let duration = t.duration_until().unwrap_or(0); + + // Convert the timeout to milliseconds for `poll`, rounding up. + // + // TODO: On Linux and FreeBSD, we could use `ppoll` instead + // which takes a `timespec.` + ((duration + 999) / 1000) .try_into() .map_err(|_| Error::overflow().context("poll timeout"))? } else { @@ -55,7 +59,7 @@ pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { let revents = pollfd.revents(); let (nbytes, rwsub) = match rwsub { Subscription::Read(sub) => { - let ready = sub.file.num_ready_bytes().await?; + let ready = sub.stream.num_ready_bytes().await?; (std::cmp::max(ready, 1), sub) } Subscription::Write(sub) => (0, sub), diff --git a/wasi-common/cap-std-sync/src/stdio.rs b/wasi-common/cap-std-sync/src/stdio.rs index 9d82348a..287c5abb 100644 --- a/wasi-common/cap-std-sync/src/stdio.rs +++ b/wasi-common/cap-std-sync/src/stdio.rs @@ -1,10 +1,5 @@ -use crate::file::convert_systimespec; -use fs_set_times::SetTimes; -use io_lifetimes::AsFilelike; -use is_terminal::IsTerminal; use std::any::Any; use std::convert::TryInto; -use std::fs::File; use std::io; use std::io::{Read, Write}; use system_interface::io::ReadReady; @@ -15,10 +10,7 @@ use io_extras::os::windows::{AsRawHandleOrSocket, RawHandleOrSocket}; use io_lifetimes::{AsFd, BorrowedFd}; #[cfg(windows)] use io_lifetimes::{AsHandle, BorrowedHandle}; -use wasi_common::{ - file::{FdFlags, FileType, WasiFile}, - Error, ErrorExt, -}; +use wasi_common::{stream::WasiStream, Error, ErrorExt}; pub struct Stdin(std::io::Stdin); @@ -27,57 +19,80 @@ pub fn stdin() -> Stdin { } #[async_trait::async_trait] -impl WasiFile for Stdin { +impl WasiStream for Stdin { fn as_any(&self) -> &dyn Any { self } #[cfg(unix)] - fn pollable(&self) -> Option { + fn pollable_read(&self) -> Option { Some(self.0.as_fd()) } #[cfg(windows)] - fn pollable(&self) -> Option { + fn pollable_read(&self) -> Option { Some(self.0.as_raw_handle_or_socket()) } - async fn get_filetype(&mut self) -> Result { - if self.isatty() { - Ok(FileType::CharacterDevice) - } else { - Ok(FileType::Unknown) + + async fn read(&mut self, buf: &mut [u8]) -> Result<(u64, bool), Error> { + match Read::read(&mut self.0, buf) { + Ok(0) => Ok((0, true)), + Ok(n) => Ok((n as u64, false)), + Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok((0, false)), + Err(err) => Err(err.into()), } } - async fn read_vectored<'a>(&mut self, bufs: &mut [io::IoSliceMut<'a>]) -> Result { - let n = (&*self.0.as_filelike_view::()).read_vectored(bufs)?; - Ok(n.try_into().map_err(|_| Error::range())?) - } - async fn read_vectored_at<'a>( + async fn read_vectored<'a>( &mut self, - _bufs: &mut [io::IoSliceMut<'a>], - _offset: u64, - ) -> Result { - Err(Error::seek_pipe()) + bufs: &mut [io::IoSliceMut<'a>], + ) -> Result<(u64, bool), Error> { + match Read::read_vectored(&mut self.0, bufs) { + Ok(0) => Ok((0, true)), + Ok(n) => Ok((n as u64, false)), + Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok((0, false)), + Err(err) => Err(err.into()), + } } - async fn seek(&mut self, _pos: std::io::SeekFrom) -> Result { - Err(Error::seek_pipe()) + #[cfg(can_vector)] + fn is_read_vectored(&self) { + Read::is_read_vectored(&mut self.0) } - async fn peek(&mut self, _buf: &mut [u8]) -> Result { - Err(Error::seek_pipe()) + async fn write(&mut self, _buf: &[u8]) -> Result { + Err(Error::badf()) } - async fn set_times( + async fn write_vectored<'a>(&mut self, _bufs: &[io::IoSlice<'a>]) -> Result { + Err(Error::badf()) + } + #[cfg(can_vector)] + fn is_write_vectored(&self) { + false + } + + // TODO: Optimize for stdio streams. + /* + async fn splice( &mut self, - atime: Option, - mtime: Option, - ) -> Result<(), Error> { - self.0 - .set_times(convert_systimespec(atime), convert_systimespec(mtime))?; - Ok(()) + dst: &mut dyn WasiStream, + nelem: u64, + ) -> Result { + todo!() + } + */ + + async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> { + let num = io::copy(&mut io::Read::take(&mut self.0, nelem), &mut io::sink())?; + Ok((num, num < nelem)) } + async fn num_ready_bytes(&self) -> Result { Ok(self.0.num_ready_bytes()?) } - fn isatty(&mut self) -> bool { - self.0.is_terminal() + + async fn readable(&self) -> Result<(), Error> { + Err(Error::badf()) + } + + async fn writable(&self) -> Result<(), Error> { + Ok(()) } } #[cfg(windows)] @@ -103,56 +118,67 @@ impl AsFd for Stdin { macro_rules! wasi_file_write_impl { ($ty:ty, $ident:ident) => { #[async_trait::async_trait] - impl WasiFile for $ty { + impl WasiStream for $ty { fn as_any(&self) -> &dyn Any { self } + #[cfg(unix)] - fn pollable(&self) -> Option { + fn pollable_write(&self) -> Option { Some(self.0.as_fd()) } - #[cfg(windows)] - fn pollable(&self) -> Option { + fn pollable_write(&self) -> Option { Some(self.0.as_raw_handle_or_socket()) } - async fn get_filetype(&mut self) -> Result { - if self.isatty() { - Ok(FileType::CharacterDevice) - } else { - Ok(FileType::Unknown) - } + + async fn read(&mut self, _buf: &mut [u8]) -> Result<(u64, bool), Error> { + Err(Error::badf()) + } + async fn read_vectored<'a>( + &mut self, + _bufs: &mut [io::IoSliceMut<'a>], + ) -> Result<(u64, bool), Error> { + Err(Error::badf()) + } + #[cfg(can_vector)] + fn is_read_vectored(&self) { + false } - async fn get_fdflags(&mut self) -> Result { - Ok(FdFlags::APPEND) + async fn write(&mut self, buf: &[u8]) -> Result { + let n = Write::write(&mut self.0, buf)?; + Ok(n.try_into()?) } async fn write_vectored<'a>(&mut self, bufs: &[io::IoSlice<'a>]) -> Result { - let n = (&*self.0.as_filelike_view::()).write_vectored(bufs)?; - Ok(n.try_into().map_err(|_| { - Error::range().context("converting write_vectored total length") - })?) + let n = Write::write_vectored(&mut self.0, bufs)?; + Ok(n.try_into()?) } - async fn write_vectored_at<'a>( + #[cfg(can_vector)] + fn is_write_vectored(&self) { + Write::is_write_vectored(&mut self.0) + } + // TODO: Optimize for stdio streams. + /* + async fn splice( &mut self, - _bufs: &[io::IoSlice<'a>], - _offset: u64, + dst: &mut dyn WasiStream, + nelem: u64, ) -> Result { - Err(Error::seek_pipe()) + todo!() } - async fn seek(&mut self, _pos: std::io::SeekFrom) -> Result { - Err(Error::seek_pipe()) + */ + + async fn write_repeated(&mut self, byte: u8, nelem: u64) -> Result { + let num = io::copy(&mut io::Read::take(io::repeat(byte), nelem), &mut self.0)?; + Ok(num) } - async fn set_times( - &mut self, - atime: Option, - mtime: Option, - ) -> Result<(), Error> { - self.0 - .set_times(convert_systimespec(atime), convert_systimespec(mtime))?; - Ok(()) + + async fn readable(&self) -> Result<(), Error> { + Err(Error::badf()) } - fn isatty(&mut self) -> bool { - self.0.is_terminal() + + async fn writable(&self) -> Result<(), Error> { + Ok(()) } } #[cfg(windows)] diff --git a/wasi-common/src/clocks.rs b/wasi-common/src/clocks.rs index 0d5cda6c..0015f67a 100644 --- a/wasi-common/src/clocks.rs +++ b/wasi-common/src/clocks.rs @@ -1,66 +1,65 @@ -use cap_std::time::{Duration, Instant, SystemTime}; +use crate::Error; +use cap_std::time::Duration; -pub enum SystemTimeSpec { - SymbolicNow, - Absolute(SystemTime), -} - -pub trait WasiSystemClock: Send + Sync { +pub trait WasiWallClock: Send + Sync { fn resolution(&self) -> Duration; - fn now(&self, precision: Duration) -> SystemTime; + fn now(&self) -> Duration; + fn dup(&self) -> Box; } pub trait WasiMonotonicClock: Send + Sync { - fn resolution(&self) -> Duration; - fn now(&self, precision: Duration) -> Instant; + fn resolution(&self) -> u64; + fn now(&self) -> u64; + fn dup(&self) -> Box; } pub struct WasiClocks { - pub system: Box, - pub monotonic: Box, - pub creation_time: cap_std::time::Instant, - pub default_monotonic: u32, - pub default_wall: u32, + pub default_wall_clock: Box, + pub default_monotonic_clock: Box, } -pub struct MonotonicClock { - start: Instant, +pub trait TableWallClockExt { + fn get_wall_clock(&self, fd: u32) -> Result<&(dyn WasiWallClock + Send + Sync), Error>; + fn get_wall_clock_mut( + &mut self, + fd: u32, + ) -> Result<&mut Box, Error>; } - -impl From<&dyn WasiMonotonicClock> for MonotonicClock { - fn from(clock: &dyn WasiMonotonicClock) -> MonotonicClock { - MonotonicClock { - start: clock.now(clock.resolution()), - } +impl TableWallClockExt for crate::table::Table { + fn get_wall_clock(&self, fd: u32) -> Result<&(dyn WasiWallClock + Send + Sync), Error> { + self.get::>(fd) + .map(|f| f.as_ref()) } -} - -impl MonotonicClock { - pub fn now(&self, clock: &dyn WasiMonotonicClock) -> Duration { - clock.now(clock.resolution()).duration_since(self.start) - } - pub fn new_timer(&self, initial: Duration) -> MonotonicTimer { - MonotonicTimer { - start: self.start + initial, - } + fn get_wall_clock_mut( + &mut self, + fd: u32, + ) -> Result<&mut Box, Error> { + self.get_mut::>(fd) } } -pub struct MonotonicTimer { - start: Instant, +pub trait TableMonotonicClockExt { + fn get_monotonic_clock( + &self, + fd: u32, + ) -> Result<&(dyn WasiMonotonicClock + Send + Sync), Error>; + fn get_monotonic_clock_mut( + &mut self, + fd: u32, + ) -> Result<&mut Box, Error>; } - -impl MonotonicTimer { - pub fn current(&self, clock: &dyn WasiMonotonicClock) -> Duration { - clock.now(clock.resolution()).duration_since(self.start) +impl TableMonotonicClockExt for crate::table::Table { + fn get_monotonic_clock( + &self, + fd: u32, + ) -> Result<&(dyn WasiMonotonicClock + Send + Sync), Error> { + self.get::>(fd) + .map(|f| f.as_ref()) } -} - -#[derive(Default)] -pub struct WallClock; - -impl WallClock { - pub fn now(&self, clock: &dyn WasiSystemClock) -> SystemTime { - clock.now(clock.resolution()) + fn get_monotonic_clock_mut( + &mut self, + fd: u32, + ) -> Result<&mut Box, Error> { + self.get_mut::>(fd) } } diff --git a/wasi-common/src/connection.rs b/wasi-common/src/connection.rs new file mode 100644 index 00000000..24536cd5 --- /dev/null +++ b/wasi-common/src/connection.rs @@ -0,0 +1,71 @@ +//! Socket connections. + +use crate::Error; +use bitflags::bitflags; +use std::any::Any; + +/// A socket connection. +#[async_trait::async_trait] +pub trait WasiConnection: Send + Sync { + fn as_any(&self) -> &dyn Any; + + async fn sock_recv<'a>( + &mut self, + _ri_data: &mut [std::io::IoSliceMut<'a>], + _ri_flags: RiFlags, + ) -> Result<(u64, RoFlags), Error>; + + async fn sock_send<'a>( + &mut self, + _si_data: &[std::io::IoSlice<'a>], + _si_flags: SiFlags, + ) -> Result; + + async fn sock_shutdown(&mut self, _how: SdFlags) -> Result<(), Error>; + + fn get_nonblocking(&mut self) -> Result; + + fn set_nonblocking(&mut self, _flag: bool) -> Result<(), Error>; + + async fn readable(&self) -> Result<(), Error>; + + async fn writable(&self) -> Result<(), Error>; +} + +bitflags! { + pub struct SdFlags: u32 { + const RD = 0b1; + const WR = 0b10; + } +} + +bitflags! { + pub struct SiFlags: u32 { + } +} + +bitflags! { + pub struct RiFlags: u32 { + const RECV_PEEK = 0b1; + const RECV_WAITALL = 0b10; + } +} + +bitflags! { + pub struct RoFlags: u32 { + const RECV_DATA_TRUNCATED = 0b1; + } +} + +pub trait TableConnectionExt { + fn get_connection(&self, fd: u32) -> Result<&dyn WasiConnection, Error>; + fn get_connection_mut(&mut self, fd: u32) -> Result<&mut Box, Error>; +} +impl TableConnectionExt for crate::table::Table { + fn get_connection(&self, fd: u32) -> Result<&dyn WasiConnection, Error> { + self.get::>(fd).map(|f| f.as_ref()) + } + fn get_connection_mut(&mut self, fd: u32) -> Result<&mut Box, Error> { + self.get_mut::>(fd) + } +} diff --git a/wasi-common/src/ctx.rs b/wasi-common/src/ctx.rs index bf60963e..869cb304 100644 --- a/wasi-common/src/ctx.rs +++ b/wasi-common/src/ctx.rs @@ -1,7 +1,9 @@ use crate::clocks::WasiClocks; use crate::dir::WasiDir; use crate::file::WasiFile; +use crate::listener::WasiListener; use crate::sched::WasiSched; +use crate::stream::WasiStream; use crate::table::Table; use crate::Error; use cap_rand::RngCore; @@ -33,34 +35,46 @@ impl WasiCtx { } pub fn insert_file(&mut self, fd: u32, file: Box) { - self.table().insert_at(fd, Box::new(file)); + self.table_mut().insert_at(fd, Box::new(file)); + } + + pub fn insert_stream(&mut self, fd: u32, stream: Box) { + self.table_mut().insert_at(fd, Box::new(stream)); + } + + pub fn insert_listener(&mut self, fd: u32, listener: Box) { + self.table_mut().insert_at(fd, Box::new(listener)); } pub fn push_file(&mut self, file: Box) -> Result { - self.table().push(Box::new(file)) + self.table_mut().push(Box::new(file)) } pub fn insert_dir(&mut self, fd: u32, dir: Box) { - self.table().insert_at(fd, Box::new(dir)) + self.table_mut().insert_at(fd, Box::new(dir)) } pub fn push_dir(&mut self, dir: Box) -> Result { - self.table().push(Box::new(dir)) + self.table_mut().push(Box::new(dir)) + } + + pub fn table(&self) -> &Table { + &self.table } - pub fn table(&mut self) -> &mut Table { + pub fn table_mut(&mut self) -> &mut Table { &mut self.table } - pub fn set_stdin(&mut self, f: Box) { - self.insert_file(0, f); + pub fn set_stdin(&mut self, s: Box) { + self.insert_stream(0, s); } - pub fn set_stdout(&mut self, f: Box) { - self.insert_file(1, f); + pub fn set_stdout(&mut self, s: Box) { + self.insert_stream(1, s); } - pub fn set_stderr(&mut self, f: Box) { - self.insert_file(2, f); + pub fn set_stderr(&mut self, s: Box) { + self.insert_stream(2, s); } } diff --git a/wasi-common/src/file.rs b/wasi-common/src/file.rs index a6b597f1..4b3ef9dc 100644 --- a/wasi-common/src/file.rs +++ b/wasi-common/src/file.rs @@ -1,11 +1,12 @@ -use crate::{Error, ErrorExt, SystemTimeSpec}; +use crate::{Error, ErrorExt, SystemTimeSpec, WasiStream}; use bitflags::bitflags; use std::any::Any; +use std::io; #[async_trait::async_trait] pub trait WasiFile: Send + Sync { fn as_any(&self) -> &dyn Any; - async fn get_filetype(&mut self) -> Result; + async fn get_filetype(&self) -> Result; #[cfg(unix)] fn pollable(&self) -> Option { @@ -21,27 +22,7 @@ pub trait WasiFile: Send + Sync { false } - async fn sock_accept(&mut self, _fdflags: FdFlags) -> Result, Error> { - Err(Error::badf()) - } - - async fn sock_recv<'a>( - &mut self, - _ri_data: &mut [std::io::IoSliceMut<'a>], - _ri_flags: RiFlags, - ) -> Result<(u64, RoFlags), Error> { - Err(Error::badf()) - } - - async fn sock_send<'a>( - &mut self, - _si_data: &[std::io::IoSlice<'a>], - _si_flags: SiFlags, - ) -> Result { - Err(Error::badf()) - } - - async fn sock_shutdown(&mut self, _how: SdFlags) -> Result<(), Error> { + async fn try_clone(&mut self) -> Result, Error> { Err(Error::badf()) } @@ -53,7 +34,7 @@ pub trait WasiFile: Send + Sync { Ok(()) } - async fn get_fdflags(&mut self) -> Result { + async fn get_fdflags(&self) -> Result { Ok(FdFlags::empty()) } @@ -61,7 +42,7 @@ pub trait WasiFile: Send + Sync { Err(Error::badf()) } - async fn get_filestat(&mut self) -> Result { + async fn get_filestat(&self) -> Result { Ok(Filestat { device_id: 0, inode: 0, @@ -94,10 +75,7 @@ pub trait WasiFile: Send + Sync { Err(Error::badf()) } - async fn read_vectored<'a>( - &mut self, - _bufs: &mut [std::io::IoSliceMut<'a>], - ) -> Result { + async fn read_at<'a>(&mut self, _buf: &mut [u8], _offset: u64) -> Result<(u64, bool), Error> { Err(Error::badf()) } @@ -105,11 +83,15 @@ pub trait WasiFile: Send + Sync { &mut self, _bufs: &mut [std::io::IoSliceMut<'a>], _offset: u64, - ) -> Result { + ) -> Result<(u64, bool), Error> { Err(Error::badf()) } - async fn write_vectored<'a>(&mut self, _bufs: &[std::io::IoSlice<'a>]) -> Result { + fn is_read_vectored_at(&self) -> bool { + false + } + + async fn write_at<'a>(&mut self, _bufs: &[u8], _offset: u64) -> Result { Err(Error::badf()) } @@ -121,25 +103,13 @@ pub trait WasiFile: Send + Sync { Err(Error::badf()) } - async fn seek(&mut self, _pos: std::io::SeekFrom) -> Result { - Err(Error::badf()) - } - - async fn peek(&mut self, _buf: &mut [u8]) -> Result { - Err(Error::badf()) - } - - async fn num_ready_bytes(&self) -> Result { - Ok(0) + fn is_write_vectored_at(&self) -> bool { + false } - async fn readable(&self) -> Result<(), Error> { - Err(Error::badf()) - } + async fn readable(&self) -> Result<(), Error>; - async fn writable(&self) -> Result<(), Error> { - Err(Error::badf()) - } + async fn writable(&self) -> Result<(), Error>; } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -165,31 +135,6 @@ bitflags! { } } -bitflags! { - pub struct SdFlags: u32 { - const RD = 0b1; - const WR = 0b10; - } -} - -bitflags! { - pub struct SiFlags: u32 { - } -} - -bitflags! { - pub struct RiFlags: u32 { - const RECV_PEEK = 0b1; - const RECV_WAITALL = 0b10; - } -} - -bitflags! { - pub struct RoFlags: u32 { - const RECV_DATA_TRUNCATED = 0b1; - } -} - bitflags! { pub struct OFlags: u32 { const CREATE = 0b1; @@ -239,3 +184,210 @@ pub enum Advice { DontNeed, NoReuse, } + +pub struct FileStream { + // Which file are we streaming? + file: Box, + + // Where in the file are we? + position: u64, + + // Reading or writing? + reading: bool, +} + +impl FileStream { + pub fn new_reader(file: Box, position: u64) -> Self { + Self { + file, + position, + reading: true, + } + } + + pub fn new_writer(file: Box, position: u64) -> Self { + Self { + file, + position, + reading: false, + } + } + + pub fn new_appender(_file: Box) -> Self { + todo!() + } + + pub async fn seek(&mut self, pos: std::io::SeekFrom) -> Result { + match pos { + std::io::SeekFrom::Start(pos) => self.position = pos, + std::io::SeekFrom::Current(pos) => { + self.position = self.position.wrapping_add(pos as i64 as u64) + } + std::io::SeekFrom::End(pos) => { + self.position = self + .file + .get_filestat() + .await? + .size + .wrapping_add(pos as i64 as u64) + } + } + Ok(self.position) + } +} + +#[async_trait::async_trait] +impl WasiStream for FileStream { + fn as_any(&self) -> &dyn Any { + self + } + #[cfg(unix)] + fn pollable_read(&self) -> Option { + if self.reading { + self.file.pollable() + } else { + None + } + } + #[cfg(unix)] + fn pollable_write(&self) -> Option { + if self.reading { + None + } else { + self.file.pollable() + } + } + + #[cfg(windows)] + fn pollable_read(&self) -> Option { + if self.reading { + self.file.pollable() + } else { + None + } + } + #[cfg(windows)] + fn pollable_write(&self) -> Option { + if self.reading { + None + } else { + self.file.pollable() + } + } + + async fn read(&mut self, buf: &mut [u8]) -> Result<(u64, bool), Error> { + if !self.reading { + return Err(Error::badf()); + } + let (n, end) = self.file.read_at(buf, self.position).await?; + self.position = self.position.wrapping_add(n); + Ok((n, end)) + } + async fn read_vectored<'a>( + &mut self, + bufs: &mut [io::IoSliceMut<'a>], + ) -> Result<(u64, bool), Error> { + if !self.reading { + return Err(Error::badf()); + } + let (n, end) = self.file.read_vectored_at(bufs, self.position).await?; + self.position = self.position.wrapping_add(n); + Ok((n, end)) + } + #[cfg(can_vector)] + fn is_read_vectored_at(&self) -> bool { + if !self.reading { + return false; + } + self.file.is_read_vectored_at() + } + async fn write(&mut self, buf: &[u8]) -> Result { + if self.reading { + return Err(Error::badf()); + } + let n = self.file.write_at(buf, self.position).await? as i64 as u64; + self.position = self.position.wrapping_add(n); + Ok(n) + } + async fn write_vectored<'a>(&mut self, bufs: &[io::IoSlice<'a>]) -> Result { + if self.reading { + return Err(Error::badf()); + } + let n = self.file.write_vectored_at(bufs, self.position).await? as i64 as u64; + self.position = self.position.wrapping_add(n); + Ok(n) + } + #[cfg(can_vector)] + fn is_write_vectored_at(&self) -> bool { + if self.reading { + return false; + } + self.file.is_write_vectored_at() + } + + // TODO: Optimize for file streams. + /* + async fn splice( + &mut self, + dst: &mut dyn WasiStream, + nelem: u64, + ) -> Result { + todo!() + } + */ + + async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> { + // For a zero-length request, don't do the 1 byte check below. + if nelem == 0 { + return self.file.read_at(&mut [], 0).await; + } + + if !self.reading { + return Err(Error::badf()); + } + + let new_position = self + .position + .checked_add(nelem) + .ok_or_else(Error::overflow)?; + + let file_size = self.file.get_filestat().await?.size; + + let short_by = new_position.saturating_sub(file_size); + + self.position = new_position - short_by; + Ok((nelem - short_by, false)) + } + + // TODO: Optimize for file streams. + /* + async fn write_repeated( + &mut self, + byte: u8, + nelem: u64, + ) -> Result { + todo!() + } + */ + + async fn num_ready_bytes(&self) -> Result { + if !self.reading { + return Err(Error::badf()); + } + Ok(0) + } + + async fn readable(&self) -> Result<(), Error> { + if !self.reading { + return Err(Error::badf()); + } + self.file.readable().await + } + + async fn writable(&self) -> Result<(), Error> { + if self.reading { + return Err(Error::badf()); + } + self.file.writable().await + } +} diff --git a/wasi-common/src/lib.rs b/wasi-common/src/lib.rs index 9f122267..a9caa447 100644 --- a/wasi-common/src/lib.rs +++ b/wasi-common/src/lib.rs @@ -38,8 +38,8 @@ //! provide embedders with the same sort of implementation flexibility they //! get with WasiFile/WasiDir: //! -//! * Timekeeping: `WasiSystemClock` and `WasiMonotonicClock` provide the two -//! interfaces for a clock. `WasiSystemClock` represents time as a +//! * Timekeeping: `WasiWallClock` and `WasiMonotonicClock` provide the two +//! interfaces for a clock. `WasiWallClock` represents time as a //! `cap_std::time::SystemTime`, and `WasiMonotonicClock` represents time as //! `cap_std::time::Instant`. //! * Randomness: we re-use the `cap_rand::RngCore` trait to represent a @@ -52,20 +52,27 @@ //! `wasi_cap_std_sync::WasiCtxBuilder::new()` function uses this public //! interface to plug in its own implementations of each of these resources. pub mod clocks; +pub mod connection; mod ctx; pub mod dir; mod error; pub mod file; +pub mod listener; pub mod pipe; pub mod random; pub mod sched; +pub mod stream; pub mod table; +pub use cap_fs_ext::SystemTimeSpec; pub use cap_rand::RngCore; -pub use clocks::{SystemTimeSpec, WasiClocks, WasiMonotonicClock, WasiSystemClock}; +pub use clocks::{WasiClocks, WasiMonotonicClock, WasiWallClock}; +pub use connection::WasiConnection; pub use ctx::WasiCtx; pub use dir::WasiDir; pub use error::{Errno, Error, ErrorExt, I32Exit}; pub use file::WasiFile; +pub use listener::WasiListener; pub use sched::{Poll, WasiSched}; +pub use stream::WasiStream; pub use table::Table; diff --git a/wasi-common/src/listener.rs b/wasi-common/src/listener.rs new file mode 100644 index 00000000..2e2cc925 --- /dev/null +++ b/wasi-common/src/listener.rs @@ -0,0 +1,29 @@ +use crate::connection::WasiConnection; +/// Socket listeners. +use crate::Error; +use std::any::Any; + +/// A socket listener. +#[async_trait::async_trait] +pub trait WasiListener: Send + Sync { + fn as_any(&self) -> &dyn Any; + + async fn sock_accept(&mut self, nonblocking: bool) -> Result, Error>; + + fn get_nonblocking(&mut self) -> Result; + + fn set_nonblocking(&mut self, _flag: bool) -> Result<(), Error>; +} + +pub trait TableListenerExt { + fn get_listener(&self, fd: u32) -> Result<&dyn WasiListener, Error>; + fn get_listener_mut(&mut self, fd: u32) -> Result<&mut Box, Error>; +} +impl TableListenerExt for crate::table::Table { + fn get_listener(&self, fd: u32) -> Result<&dyn WasiListener, Error> { + self.get::>(fd).map(|f| f.as_ref()) + } + fn get_listener_mut(&mut self, fd: u32) -> Result<&mut Box, Error> { + self.get_mut::>(fd) + } +} diff --git a/wasi-common/src/pipe.rs b/wasi-common/src/pipe.rs index 9b9e2c57..dfefed8b 100644 --- a/wasi-common/src/pipe.rs +++ b/wasi-common/src/pipe.rs @@ -7,8 +7,8 @@ //! Some convenience constructors are included for common backing types like `Vec` and `String`, //! but the virtual pipes can be instantiated with any `Read` or `Write` type. //! -use crate::file::{FdFlags, FileType, WasiFile}; -use crate::Error; +use crate::stream::WasiStream; +use crate::{Error, ErrorExt}; use std::any::Any; use std::convert::TryInto; use std::io::{self, Read, Write}; @@ -99,24 +99,45 @@ impl From<&str> for ReadPipe> { } #[async_trait::async_trait] -impl WasiFile for ReadPipe { +impl WasiStream for ReadPipe { fn as_any(&self) -> &dyn Any { self } - async fn get_filetype(&mut self) -> Result { - Ok(FileType::Pipe) - } - async fn read_vectored<'a>(&mut self, bufs: &mut [io::IoSliceMut<'a>]) -> Result { - let n = self.borrow().read_vectored(bufs)?; - Ok(n.try_into()?) + + async fn read(&mut self, buf: &mut [u8]) -> Result<(u64, bool), Error> { + match self.borrow().read(buf) { + Ok(0) => Ok((0, true)), + Ok(n) => Ok((n.try_into()?, false)), + Err(e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, false)), + Err(e) => Err(e.into()), + } } - async fn read_vectored_at<'a>( + + // TODO: Optimize for pipes. + /* + async fn splice( &mut self, - bufs: &mut [io::IoSliceMut<'a>], - _offset: u64, + dst: &mut dyn WasiStream, + nelem: u64, ) -> Result { - let n = self.borrow().read_vectored(bufs)?; - Ok(n.try_into()?) + todo!() + } + */ + + async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> { + let num = io::copy( + &mut io::Read::take(&mut *self.borrow(), nelem), + &mut io::sink(), + )?; + Ok((num, num < nelem)) + } + + async fn readable(&self) -> Result<(), Error> { + Ok(()) + } + + async fn writable(&self) -> Result<(), Error> { + Err(Error::badf()) } } @@ -191,18 +212,40 @@ impl WritePipe>> { } #[async_trait::async_trait] -impl WasiFile for WritePipe { +impl WasiStream for WritePipe { fn as_any(&self) -> &dyn Any { self } - async fn get_filetype(&mut self) -> Result { - Ok(FileType::Pipe) + + async fn write(&mut self, buf: &[u8]) -> Result { + let n = self.borrow().write(buf)?; + Ok(n.try_into()?) } - async fn get_fdflags(&mut self) -> Result { - Ok(FdFlags::APPEND) + + // TODO: Optimize for pipes. + /* + async fn splice( + &mut self, + dst: &mut dyn WasiStream, + nelem: u64, + ) -> Result { + todo!() } - async fn write_vectored<'a>(&mut self, bufs: &[io::IoSlice<'a>]) -> Result { - let n = self.borrow().write_vectored(bufs)?; - Ok(n.try_into()?) + */ + + async fn write_repeated(&mut self, byte: u8, nelem: u64) -> Result { + let num = io::copy( + &mut io::Read::take(io::repeat(byte), nelem), + &mut *self.borrow(), + )?; + Ok(num) + } + + async fn readable(&self) -> Result<(), Error> { + Err(Error::badf()) + } + + async fn writable(&self) -> Result<(), Error> { + Ok(()) } } diff --git a/wasi-common/src/sched.rs b/wasi-common/src/sched.rs index 2af37d84..4d420cc5 100644 --- a/wasi-common/src/sched.rs +++ b/wasi-common/src/sched.rs @@ -1,7 +1,6 @@ use crate::clocks::WasiMonotonicClock; -use crate::file::WasiFile; -use crate::Error; -use cap_std::time::Instant; +use crate::stream::WasiStream; +use crate::{Error, ErrorExt}; pub mod subscription; pub use cap_std::time::Duration; @@ -43,32 +42,35 @@ impl<'a> Poll<'a> { pub fn subscribe_monotonic_clock( &mut self, clock: &'a dyn WasiMonotonicClock, - deadline: Instant, - precision: Duration, + deadline: u64, + absolute: bool, ud: Userdata, - ) { + ) -> Result<(), Error> { + let deadline = if absolute { + deadline + .checked_sub(clock.now()) + .ok_or_else(Error::overflow)? + } else { + deadline + }; self.subs.push(( - Subscription::MonotonicClock(MonotonicClockSubscription { - clock, - deadline, - precision, - }), + Subscription::MonotonicClock(MonotonicClockSubscription { clock, deadline }), ud, )); + Ok(()) } - pub fn subscribe_read(&mut self, file: &'a dyn WasiFile, ud: Userdata) { + pub fn subscribe_read(&mut self, stream: &'a dyn WasiStream, ud: Userdata) { self.subs - .push((Subscription::Read(RwSubscription::new(file)), ud)); + .push((Subscription::Read(RwSubscription::new(stream)), ud)); } - pub fn subscribe_write(&mut self, file: &'a dyn WasiFile, ud: Userdata) { + pub fn subscribe_write(&mut self, stream: &'a dyn WasiStream, ud: Userdata) { self.subs - .push((Subscription::Write(RwSubscription::new(file)), ud)); + .push((Subscription::Write(RwSubscription::new(stream)), ud)); } - pub fn results(self) -> Vec<(SubscriptionResult, Userdata)> { + pub fn results(self) -> impl Iterator + 'a { self.subs .into_iter() .filter_map(|(s, ud)| SubscriptionResult::from_subscription(s).map(|r| (r, ud))) - .collect() } pub fn is_empty(&self) -> bool { self.subs.is_empty() diff --git a/wasi-common/src/sched/subscription.rs b/wasi-common/src/sched/subscription.rs index 06054736..cdc8248d 100644 --- a/wasi-common/src/sched/subscription.rs +++ b/wasi-common/src/sched/subscription.rs @@ -1,8 +1,7 @@ use crate::clocks::WasiMonotonicClock; -use crate::file::WasiFile; +use crate::stream::WasiStream; use crate::Error; use bitflags::bitflags; -use cap_std::time::{Duration, Instant}; bitflags! { pub struct RwEventFlags: u32 { @@ -11,13 +10,16 @@ bitflags! { } pub struct RwSubscription<'a> { - pub file: &'a dyn WasiFile, + pub stream: &'a dyn WasiStream, status: Option>, } impl<'a> RwSubscription<'a> { - pub fn new(file: &'a dyn WasiFile) -> Self { - Self { file, status: None } + pub fn new(stream: &'a dyn WasiStream) -> Self { + Self { + stream, + status: None, + } } pub fn complete(&mut self, size: u64, flags: RwEventFlags) { self.status = Some(Ok((size, flags))) @@ -32,19 +34,18 @@ impl<'a> RwSubscription<'a> { pub struct MonotonicClockSubscription<'a> { pub clock: &'a dyn WasiMonotonicClock, - pub deadline: Instant, - pub precision: Duration, + pub deadline: u64, } impl<'a> MonotonicClockSubscription<'a> { - pub fn now(&self) -> Instant { - self.clock.now(self.precision) + pub fn now(&self) -> u64 { + self.clock.now() } - pub fn duration_until(&self) -> Option { - self.deadline.checked_duration_since(self.now()) + pub fn duration_until(&self) -> Option { + self.deadline.checked_sub(self.now()) } pub fn result(&self) -> Option> { - if self.now().checked_duration_since(self.deadline).is_some() { + if self.now().checked_sub(self.deadline).is_some() { Some(Ok(())) } else { None diff --git a/wasi-common/src/stream.rs b/wasi-common/src/stream.rs new file mode 100644 index 00000000..2bb845aa --- /dev/null +++ b/wasi-common/src/stream.rs @@ -0,0 +1,154 @@ +use crate::{Error, ErrorExt}; +use std::any::Any; + +/// A pseudo-stream. +/// +/// This is "pseudo" because the real streams will be a type in wit, and +/// built into the wit bindings, and will support async and type parameters. +/// This pseudo-stream abstraction is synchronous and only supports bytes. +#[async_trait::async_trait] +pub trait WasiStream: Send + Sync { + fn as_any(&self) -> &dyn Any; + + /// If this stream is reading from a host file descriptor, return it so + /// that it can be polled with a host poll. + #[cfg(unix)] + fn pollable_read(&self) -> Option { + None + } + + /// If this stream is reading from a host file descriptor, return it so + /// that it can be polled with a host poll. + #[cfg(windows)] + fn pollable_read(&self) -> Option { + None + } + + /// If this stream is writing from a host file descriptor, return it so + /// that it can be polled with a host poll. + #[cfg(unix)] + fn pollable_write(&self) -> Option { + None + } + + /// If this stream is writing from a host file descriptor, return it so + /// that it can be polled with a host poll. + #[cfg(windows)] + fn pollable_write(&self) -> Option { + None + } + + /// Read bytes. On success, returns a pair holding the number of bytes read + /// and a flag indicating whether the end of the stream was reached. + async fn read(&mut self, _buf: &mut [u8]) -> Result<(u64, bool), Error> { + Err(Error::badf()) + } + + /// Vectored-I/O form of `read`. + async fn read_vectored<'a>( + &mut self, + _bufs: &mut [std::io::IoSliceMut<'a>], + ) -> Result<(u64, bool), Error> { + Err(Error::badf()) + } + + /// Test whether vectored I/O reads are known to be optimized in the + /// underlying implementation. + fn is_read_vectored(&self) -> bool { + false + } + + /// Write bytes. On success, returns the number of bytes written. + async fn write(&mut self, _buf: &[u8]) -> Result { + Err(Error::badf()) + } + + /// Vectored-I/O form of `write`. + async fn write_vectored<'a>(&mut self, _bufs: &[std::io::IoSlice<'a>]) -> Result { + Err(Error::badf()) + } + + /// Test whether vectored I/O writes are known to be optimized in the + /// underlying implementation. + fn is_write_vectored(&self) -> bool { + false + } + + /// Transfer bytes directly from an input stream to an output stream. + async fn splice(&mut self, dst: &mut dyn WasiStream, nelem: u64) -> Result<(u64, bool), Error> { + let mut nspliced = 0; + let mut saw_end = false; + + // TODO: Optimize by splicing more than one byte at a time. + for _ in 0..nelem { + let mut buf = [0u8]; + let (num, end) = self.read(&mut buf).await?; + dst.write(&buf).await?; + nspliced += num; + if end { + saw_end = true; + break; + } + } + + Ok((nspliced, saw_end)) + } + + /// Read bytes from a stream and discard them. + async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> { + let mut nread = 0; + let mut saw_end = false; + + // TODO: Optimize by reading more than one byte at a time. + for _ in 0..nelem { + let (num, end) = self.read(&mut [0]).await?; + nread += num; + if end { + saw_end = true; + break; + } + } + + Ok((nread, saw_end)) + } + + /// Repeatedly write a byte to a stream. + async fn write_repeated(&mut self, byte: u8, nelem: u64) -> Result { + let mut nwritten = 0; + + // TODO: Optimize by writing more than one byte at a time. + for _ in 0..nelem { + let num = self.write(&[byte]).await?; + if num == 0 { + break; + } + nwritten += num; + } + + Ok(nwritten) + } + + /// Return the number of bytes that may be read without blocking. + async fn num_ready_bytes(&self) -> Result { + Ok(0) + } + + /// Test whether this stream is readable. + async fn readable(&self) -> Result<(), Error>; + + /// Test whether this stream is writeable. + async fn writable(&self) -> Result<(), Error>; +} + +pub trait TableStreamExt { + fn get_stream(&self, fd: u32) -> Result<&dyn WasiStream, Error>; + fn get_stream_mut(&mut self, fd: u32) -> Result<&mut Box, Error>; +} +impl TableStreamExt for crate::table::Table { + fn get_stream(&self, fd: u32) -> Result<&dyn WasiStream, Error> { + self.get::>(fd).map(|f| f.as_ref()) + } + fn get_stream_mut(&mut self, fd: u32) -> Result<&mut Box, Error> { + self.get_mut::>(fd) + } +} diff --git a/wasi-common/tokio/src/file.rs b/wasi-common/tokio/src/file.rs index 030e60e5..15852dc7 100644 --- a/wasi-common/tokio/src/file.rs +++ b/wasi-common/tokio/src/file.rs @@ -103,6 +103,10 @@ macro_rules! wasi_file_impl { fn pollable(&self) -> Option { Some(self.0.as_raw_handle_or_socket()) } + + async fn try_clone(&mut self) -> Result, Error> { + block_on_dummy_executor(|| self.0.try_clone()) + } async fn datasync(&mut self) -> Result<(), Error> { block_on_dummy_executor(|| self.0.datasync()) } @@ -130,12 +134,6 @@ macro_rules! wasi_file_impl { async fn allocate(&mut self, offset: u64, len: u64) -> Result<(), Error> { block_on_dummy_executor(move || self.0.allocate(offset, len)) } - async fn read_vectored<'a>( - &mut self, - bufs: &mut [io::IoSliceMut<'a>], - ) -> Result { - block_on_dummy_executor(move || self.0.read_vectored(bufs)) - } async fn read_vectored_at<'a>( &mut self, bufs: &mut [io::IoSliceMut<'a>], @@ -143,8 +141,8 @@ macro_rules! wasi_file_impl { ) -> Result { block_on_dummy_executor(move || self.0.read_vectored_at(bufs, offset)) } - async fn write_vectored<'a>(&mut self, bufs: &[io::IoSlice<'a>]) -> Result { - block_on_dummy_executor(move || self.0.write_vectored(bufs)) + fn is_read_vectored_at(&self) -> bool { + self.0.is_read_vectored_at() } async fn write_vectored_at<'a>( &mut self, @@ -153,12 +151,12 @@ macro_rules! wasi_file_impl { ) -> Result { block_on_dummy_executor(move || self.0.write_vectored_at(bufs, offset)) } + fn is_write_vectored_at(&self) -> bool { + self.0.is_write_vectored_at() + } async fn seek(&mut self, pos: std::io::SeekFrom) -> Result { block_on_dummy_executor(move || self.0.seek(pos)) } - async fn peek(&mut self, buf: &mut [u8]) -> Result { - block_on_dummy_executor(move || self.0.peek(buf)) - } async fn set_times( &mut self, atime: Option, diff --git a/wasi-common/tokio/src/lib.rs b/wasi-common/tokio/src/lib.rs index 577c6e2e..a649817f 100644 --- a/wasi-common/tokio/src/lib.rs +++ b/wasi-common/tokio/src/lib.rs @@ -97,14 +97,14 @@ impl WasiCtxBuilder { } pub fn preopened_socket(mut self, fd: u32, socket: impl Into) -> Result { let socket: Socket = socket.into(); - let file: Box = socket.into(); + let listener: Box = socket.into(); let caps = FileCaps::FDSTAT_SET_FLAGS | FileCaps::FILESTAT_GET | FileCaps::READ | FileCaps::POLL_READWRITE; - self.0.insert_file(fd, file, caps); + self.0.insert_listener(fd, listener, caps); Ok(self) } diff --git a/wasi-common/tokio/src/sched/unix.rs b/wasi-common/tokio/src/sched/unix.rs index 5ca3b120..6d99122b 100644 --- a/wasi-common/tokio/src/sched/unix.rs +++ b/wasi-common/tokio/src/sched/unix.rs @@ -56,12 +56,12 @@ pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { match s { Subscription::Read(f) => { futures.push(async move { - f.file + f.stream .readable() .await .map_err(|e| e.context("readable future"))?; f.complete( - f.file + f.stream .num_ready_bytes() .await .map_err(|e| e.context("read num_ready_bytes"))?, @@ -73,7 +73,7 @@ pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { Subscription::Write(f) => { futures.push(async move { - f.file + f.stream .writable() .await .map_err(|e| e.context("writable future"))?; diff --git a/wit/wasi.wit b/wit/wasi.wit index 03a3e048..c5d0bb6b 100644 --- a/wit/wasi.wit +++ b/wit/wasi.wit @@ -1,8 +1,13 @@ interface command { + /// TODO: `use` the `wasi-poll` version + type wasi-stream = u32 + + /// TODO: `use` the `wasi-filesystem` version type descriptor = u32 + command: func( - stdin: descriptor, - stdout: descriptor, + stdin: wasi-stream, + stdout: wasi-stream, args: list, env-vars: list>, preopens: list> @@ -37,10 +42,6 @@ interface wasi-clocks { /// It is intended for reporting the current date and time for humans. type wall-clock = u32 - /// This is a timer that counts down from a given starting time down to zero - /// on a monotonic clock. - type monotonic-timer = u32 - /// A timestamp in nanoseconds. type instant = u64 @@ -50,28 +51,19 @@ interface wasi-clocks { nanoseconds: u32, } - /// An asynchronous operation. In the future, this will be replaced by a handle type. + /// An asynchronous operation. See the comments on `wasi-future` in the + /// `wasi-poll` interface for details. /// TODO: `use` the `wasi-poll` version type wasi-future = u32 - /// Create a future which will resolve once the specified time has been reached. - subscribe-wall-clock: func(when: datetime, absolute: bool) -> wasi-future - - /// Create a future which will resolve once the specified time has been reached. - subscribe-monotonic-clock: func(when: instant, absolute: bool) -> wasi-future - /// Read the current value of the clock. /// /// As this the clock is monotonic, calling this function repeatedly will produce /// a sequence of non-decreasing values. - monotonic-clock-now: func(fd: monotonic-clock) -> instant + monotonic-clock-now: func(clock: monotonic-clock) -> instant /// Query the resolution of the clock. - monotonic-clock-resolution: func(fd: monotonic-clock) -> instant - - /// This creates a new `monotonic-timer` with the given starting time. It will - /// count down from this time until it reaches zero. - monotonic-clock-new-timer: func(fd: monotonic-clock, initial: instant) -> monotonic-timer + monotonic-clock-resolution: func(clock: monotonic-clock) -> instant /// Read the current value of the clock. /// @@ -86,15 +78,12 @@ interface wasi-clocks { /// /// [POSIX's Seconds Since the Epoch]: https://pubs.opengroup.org/onlinepubs/9699919799/xrat/V4_xbd_chap04.html#tag_21_04_16 /// [Unix Time]: https://en.wikipedia.org/wiki/Unix_time - wall-clock-now: func(fd: wall-clock) -> datetime + wall-clock-now: func(clock: wall-clock) -> datetime /// Query the resolution of the clock. /// /// The nanoseconds field of the output is always less than 1000000000. - wall-clock-resolution: func(fd: wall-clock) -> datetime - - /// Returns the amount of time left before this timer reaches zero. - monotonic-timer-current: func(fd: monotonic-timer) -> instant + wall-clock-resolution: func(clock: wall-clock) -> datetime } /// # WASI Default Clocks API @@ -106,7 +95,19 @@ interface wasi-default-clocks { type monotonic-clock = u32 type wall-clock = u32 + /// Return a default monotonic clock, suitable for general-purpose application + /// needs. + /// + /// This allocates a new handle, so applications with frequent need of a clock + /// handle should call this function once and reuse the handle instead of + /// calling this function each time. default-monotonic-clock: func() -> monotonic-clock + + /// Return a default wall clock, suitable for general-purpose application + /// needs. + /// + /// This allocates a new handle, so applications with frequent need of a clock + /// handle should call this function once and reuse the handle instead of default-wall-clock: func() -> wall-clock } @@ -142,6 +143,23 @@ interface wasi-logging { log: func(level: level, context: string, message: string) } +/// # Prototype stderr API +/// +/// This is a simple non-line-aware synchronous infallible output device, +/// suitable for implementing stderr. +interface wasi-stderr { + /// Print text to stderr. + print: func(message: string) + + /// Test whether stderr is known to be a terminal. + /// + /// This is similar to `isatty` in POSIX. + is-terminal: func() -> bool + + /// If stderr is a terminal and the number of columns can be determined, + /// return it. + num-columns: func() -> option +} /// # WASI Random API /// @@ -205,6 +223,9 @@ interface wasi-filesystem { /// actual stream. type dir-entry-stream = u32 + /// TODO: `use` the `wasi-poll` version + type wasi-stream = u32 + /// Size of a range of bytes in memory. type size = u32 @@ -250,9 +271,6 @@ interface wasi-filesystem { read, /// Write mode: Data can be written to. write, - /// Append mode: Data written to the file is always appended to the file's - /// end. - append, /// Write according to synchronized I/O data integrity completion. Only the /// data stored in the file is synchronized. dsync, @@ -517,16 +535,6 @@ interface wasi-filesystem { no-reuse, } - /// The position relative to which to set the offset of the descriptor. - variant seek-from { - /// Seek relative to start-of-file. - set(filesize), - /// Seek relative to current position. - cur(filedelta), - /// Seek relative to end-of-file. - end(filesize), - } - /// Provide file advisory information on a descriptor. /// /// This is similar to `posix_fadvise` in POSIX. @@ -613,7 +621,36 @@ interface wasi-filesystem { mtim: new-timestamp, ) -> result<_, errno> - /// Read from a descriptor, without using and updating the descriptor's offset. + /// Return a stream for reading from a file. + /// + /// Note: This allows using `read-stream`, which is similar to `read` in POSIX. + read-via-stream: func( + /// The resource to operate on. + fd: descriptor, + /// The offset within the file at which to start reading. + offset: filesize, + ) -> result + + /// Return a stream for writing to a file. + /// + /// Note: This allows using `write-stream`, which is similar to `write` in POSIX. + write-via-stream: func( + /// The resource to operate on. + fd: descriptor, + /// The offset within the file at which to start writing. + offset: filesize, + ) -> result + + /// Return a stream for appending to a file. + /// + /// Note: This allows using `write-stream`, which is similar to `write` with + /// `O_APPEND` in in POSIX. + append-via-stream: func( + /// The resource to operate on. + fd: descriptor, + ) -> result + + /// Read from a file at a given offset. /// /// Note: This is similar to `pread` in POSIX. pread: func( @@ -623,9 +660,9 @@ interface wasi-filesystem { len: size, /// The offset within the file at which to read. offset: filesize, - ) -> result, errno> + ) -> result, bool>, errno> - /// Write to a descriptor, without using and updating the descriptor's offset. + /// Write to a file at a given offset. /// /// Note: This is similar to `pwrite` in POSIX. pwrite: func( @@ -649,20 +686,6 @@ interface wasi-filesystem { /// Read a single directory entry from a `dir-entry-stream`. read-dir-entry: func(dir-stream: dir-entry-stream) -> result, errno> - /// Move the offset of a descriptor. - /// - /// The meaning of `seek` on a directory is unspecified. - /// - /// Returns new offset of the descriptor, relative to the start of the file. - /// - /// Note: This is similar to `lseek` in POSIX. - seek: func( - /// The resource to operate on. - fd: descriptor, - /// The method to compute the new offset. - %from: seek-from, - ) -> result - /// Synchronize the data and metadata of a file to disk. /// /// Note: This is similar to `fsync` in POSIX. @@ -671,16 +694,6 @@ interface wasi-filesystem { fd: descriptor, ) -> result<_, errno> - /// Return the current offset of a descriptor. - /// - /// Returns the current offset of the descriptor, relative to the start of the file. - /// - /// Note: This is similar to `lseek(fd, 0, SEEK_CUR)` in POSIX. - tell: func( - /// The resource to operate on. - fd: descriptor, - ) -> result - /// Create a directory. /// /// Note: This is similar to `mkdirat` in POSIX. @@ -980,12 +993,110 @@ interface wasi-filesystem { /// WASI Poll is a poll API intended to let users wait for I/O events on /// multiple handles at once. interface wasi-poll { - /// An asynchronous operation. In the future, this will be replaced by a handle type. + /// A "pseudo-future". + /// + /// This conceptually represents a `future<_>`, and serves as temporary + /// scaffolding until component-model's async features are ready. + /// + /// In the shorter term, it is a `u32` which will be replaced by a handle + /// when the component-model handles and resource features are ready. + /// + /// Pseudo-future lifetimes are not automatically managed. Users must + /// ensure that they do not outlive the resource they reference. type wasi-future = u32 + /// A "pseudo-stream". + /// + /// This conceptually represents a `stream`, and serves as temporary + /// scaffolding until component-model's async features are ready. + /// + /// In the shorter term, it is a `u32` which will be replaced by a handle + /// when the component-model handles and resource features are ready. + type wasi-stream = u32 + + /// TODO: `use` the `wasi-clocks` version + type wall-clock = u32 + + /// TODO: `use` the `wasi-clocks` version + type monotonic-clock = u32 + + /// TODO: `use` the `wasi-clocks` version + record datetime { + seconds: u64, + nanoseconds: u32, + } + + /// TODO: `use` the `wasi-clocks` version + type instant = u64 + /// Dispose of the specified future, after which it may no longer be used. drop-future: func(f: wasi-future) + /// Dispose of the specified stream, after which it may no longer be used. + drop-stream: func(f: wasi-stream) + + /// Size of a range of bytes in memory. + type size = u32 + + /// FIXME: This should just be `_` in the `result`'s below, but in the + /// bindings, `()` doesn't impl `std::error::Error`. + record stream-error {} + + /// Read bytes from a stream. + read-stream: func( + /// The stream to operate on. + %stream: wasi-stream, + /// The maximum number of bytes to read. + len: size, + ) -> result, bool>, stream-error> + + /// Write bytes to a stream. + write-stream: func( + /// The stream to operate on. + %stream: wasi-stream, + /// Data to write + buf: list, + ) -> result + + /// Skip bytes from a stream. + skip-stream: func( + /// The stream to operate on. + %stream: wasi-stream, + /// The maximum number of bytes to skip. + len: u64, + ) -> result, stream-error> + + /// Write a byte multiple times to a stream. + write-repeated-stream: func( + /// The stream to operate on. + %stream: wasi-stream, + /// The byte to write + byte: u8, + /// The number of times to write it. + len: u64, + ) -> result + + /// Read from one stream and write to another. + splice-stream: func( + /// The stream to read from. + src: wasi-stream, + /// The stream to write to. + dst: wasi-stream, + /// The number of bytes to splice. + len: u64, + ) -> result, stream-error> + + /// Create a future which will resolve once either the specified stream has bytes + /// available to read or the other end of the stream has been closed. + subscribe-read: func(s: wasi-stream) -> wasi-future + + /// Create a future which will resolve once either the specified stream is ready + /// to accept bytes or the other end of the stream has been closed. + subscribe-write: func(s: wasi-stream) -> wasi-future + + /// Create a future which will resolve once the specified time has been reached. + subscribe-monotonic-clock: func(clock: monotonic-clock, when: instant, absolute: bool) -> wasi-future + /// Poll for completion on a set of futures. /// /// The "oneoff" in the name refers to the fact that this function must do a @@ -1007,7 +1118,8 @@ interface wasi-tcp { /// A socket pseudo-handle. In the future, this will be replaced by a handle type. type socket = u32 - /// An asynchronous operation. In the future, this will be replaced by a handle type. + /// An asynchronous operation. See the comments on `wasi-future` in the + /// `wasi-poll` interface for details. // TODO: `use` the `wasi-poll` version type wasi-future = u32 @@ -1038,20 +1150,13 @@ interface wasi-tcp { /// Query the specified `socket` for the number of bytes ready to be accepted. bytes-writable: func(s: socket) -> result - - /// Create a future which will resolve once either the specified socket has bytes - /// available to read or the other end of the stream has been closed. - subscribe-read: func(s: socket) -> wasi-future - - /// Create a future which will resolve once either the specified socket is ready - /// to accept bytes or the other end of the stream has been closed. - subscribe-write: func(s: socket) -> wasi-future } world wasi { import wasi-clocks: wasi-clocks import wasi-default-clocks: wasi-default-clocks import wasi-logging: wasi-logging + import wasi-stderr: wasi-stderr import wasi-filesystem: wasi-filesystem import wasi-random: wasi-random import wasi-poll: wasi-poll