From 0d241886e4753f4ca43a102d663a05e307eb84ab Mon Sep 17 00:00:00 2001 From: George Miao Date: Wed, 6 Sep 2023 19:03:57 -0400 Subject: [PATCH 01/24] feat: init mio driver mod --- src/driver/mio/fs.rs | 0 src/driver/mio/mod.rs | 3 +++ src/driver/mio/net.rs | 0 src/driver/mio/op.rs | 0 src/driver/mod.rs | 3 +++ 5 files changed, 6 insertions(+) create mode 100644 src/driver/mio/fs.rs create mode 100644 src/driver/mio/mod.rs create mode 100644 src/driver/mio/net.rs create mode 100644 src/driver/mio/op.rs diff --git a/src/driver/mio/fs.rs b/src/driver/mio/fs.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs new file mode 100644 index 00000000..39e1384f --- /dev/null +++ b/src/driver/mio/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod fs; +pub(crate) mod net; +pub(crate) mod op; diff --git a/src/driver/mio/net.rs b/src/driver/mio/net.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 51bb41d0..4a6fea3d 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -10,6 +10,9 @@ cfg_if::cfg_if! { } else if #[cfg(target_os = "linux")] { mod iour; pub use iour::*; + } else { + mod mio; + pub use mio::*; } } From 1df835e8d7f6791befa1b822f9f95788adbd587c Mon Sep 17 00:00:00 2001 From: George Miao Date: Wed, 6 Sep 2023 19:32:14 -0400 Subject: [PATCH 02/24] chore: add mio dependency --- Cargo.toml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index bfc40470..a6c02c2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ all-features = true default-target = "x86_64-pc-windows-msvc" targets = ["x86_64-pc-windows-msvc", "x86_64-unknown-linux-gnu"] +# Shared dependencies for all platforms [dependencies] async-task = { version = "4", optional = true } bytes = { version = "1", optional = true } @@ -25,12 +26,14 @@ once_cell = "1" slab = { version = "0.4", optional = true } socket2 = { version = "0.5", features = ["all"] } +# Shared dev dependencies for all platforms [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio"] } futures-channel = "0.3" tempfile = "3" tokio = { version = "1", features = ["fs", "io-util", "macros", "net", "rt"] } +# Windows specific dependencies [target.'cfg(target_os = "windows")'.dependencies] widestring = "1" windows-sys = { version = "0.48", features = [ @@ -45,13 +48,19 @@ windows-sys = { version = "0.48", features = [ "Win32_System_Threading", ] } +# Windows specific dev dependencies [target.'cfg(target_os = "windows")'.dev-dependencies] windows-sys = { version = "0.48", features = ["Win32_Security_Authorization"] } +# Linux specific dependencies [target.'cfg(target_os = "linux")'.dependencies] io-uring = "0.6" libc = "0.2" +# Other platform dependencies +[target.'cfg(not(any(target_os = "windows", target_os = "linux")))'.dependencies] +mio = "0.8.8" + [features] default = ["runtime"] runtime = ["dep:async-task", "dep:futures-util", "dep:slab"] From 745542acf9244cc32d9aa604167ea87f43be2c33 Mon Sep 17 00:00:00 2001 From: George Miao Date: Fri, 8 Sep 2023 16:39:05 -0400 Subject: [PATCH 03/24] feat: basic driver --- Cargo.toml | 5 +- src/driver/mio/fs.rs | 33 ++++++++ src/driver/mio/mod.rs | 174 ++++++++++++++++++++++++++++++++++++++++++ src/driver/mio/net.rs | 24 ++++++ src/driver/mio/op.rs | 1 + src/driver/mod.rs | 6 +- 6 files changed, 238 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a6c02c2c..cef69e4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,8 +58,9 @@ io-uring = "0.6" libc = "0.2" # Other platform dependencies -[target.'cfg(not(any(target_os = "windows", target_os = "linux")))'.dependencies] -mio = "0.8.8" +[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] +mio = { version = "0.8.8", features = ["os-ext"] } +slab = { version = "0.4" } [features] default = ["runtime"] diff --git a/src/driver/mio/fs.rs b/src/driver/mio/fs.rs index e69de29b..8edd7a92 100644 --- a/src/driver/mio/fs.rs +++ b/src/driver/mio/fs.rs @@ -0,0 +1,33 @@ +use std::{io, path::Path}; + +use crate::{ + driver::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, + fs::File, +}; + +pub fn file_with_options( + path: impl AsRef, + options: std::fs::OpenOptions, +) -> io::Result { + options.open(path) +} + +impl AsRawFd for File { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl FromRawFd for File { + unsafe fn from_raw_fd(fd: RawFd) -> Self { + Self { + inner: std::fs::File::from_raw_fd(fd), + } + } +} + +impl IntoRawFd for File { + fn into_raw_fd(self) -> RawFd { + self.inner.into_raw_fd() + } +} diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index 39e1384f..36e1308c 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -1,3 +1,177 @@ +#[doc(no_inline)] +pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::{cell::RefCell, io, mem::MaybeUninit, ops::DerefMut, time::Duration}; + +use mio::{ + event::{Event, Source}, + unix::SourceFd, + Events, Interest, Poll, Token, +}; +use slab::Slab; + +use crate::driver::{queue_with_capacity, Entry, Poller, Queue}; + pub(crate) mod fs; pub(crate) mod net; pub(crate) mod op; + +/// Abstraction of mio operations. +pub trait OpCode { + fn interests(&self) -> Interest; + fn source_fd(&self) -> SourceFd<'_>; + fn perform(&mut self, event: &Event) -> io::Result; +} + +/// Low-level driver of mio. +pub struct Driver { + inner: RefCell, + squeue: Queue, + cqueue: Queue, +} + +/// Inner state of [`Driver`]. +pub struct DriverInner { + events: Events, + poll: Poll, + registered: Slab, +} + +struct MioEntry { + op: *mut dyn OpCode, + user_data: usize, +} + +impl MioEntry { + /// Safety: Caller mut guarantee that the op will live until it is + /// completed. + unsafe fn new(op: &mut (impl OpCode + 'static), user_data: usize) -> Self { + Self { + op: op as *mut dyn OpCode, + user_data, + } + } + + fn op_mut(&self) -> &mut dyn OpCode { + unsafe { &mut *self.op } + } + + fn op(&self) -> &dyn OpCode { + unsafe { &*self.op } + } +} + +impl Driver { + /// Create a new mio driver with 1024 entries. + pub fn new() -> io::Result { + Self::with_entries(1024) + } + + /// Create a new mio driver with the given number of entries. + pub fn with_entries(entries: u32) -> io::Result { + let entries = entries as usize; // for the sake of consistency, use u32 like iour + + Ok(Self { + squeue: queue_with_capacity(entries), + cqueue: queue_with_capacity(entries), + inner: RefCell::new(DriverInner { + events: Events::with_capacity(entries), + poll: Poll::new()?, + registered: Slab::new(), + }), + }) + } + + /// Register all operations in the squeue to mio. + fn submit_squeue(&self) -> io::Result<()> { + self.with_inner(|inner| { + while let Some(entry) = self.squeue.pop() { + inner.submit(entry)?; + } + + Ok(()) + }) + } + + /// Poll all events from mio, call `perform` on op and push them into + /// cqueue. + fn poll(&self, timeout: Option) -> io::Result<()> { + self.with_inner(|inner| { + inner.poll.poll(&mut inner.events, timeout)?; + + for event in &inner.events { + let token = event.token(); + let entry = inner + .registered + .try_remove(token.0) + .expect("Unknown token returned by mio"); + let res = entry.op_mut().perform(event); + + self.cqueue.push(Entry::new(token.into(), res)) + } + Ok(()) + }) + } + + fn get_completed(&self, entries: &mut [MaybeUninit]) -> usize { + let len = self.cqueue.len().min(entries.len()); + for entry in &mut entries[..len] { + entry.write(self.cqueue.pop().unwrap()); + } + len + } + + fn with_inner(&self, f: F) -> R + where + F: FnOnce(&mut DriverInner) -> R, + { + f(self.inner.borrow_mut().deref_mut()) + } +} + +impl DriverInner { + fn submit(&mut self, entry: MioEntry) -> io::Result<()> { + let slot = self.registered.vacant_entry(); + let token = Token(slot.key()); + + entry + .op() + .source_fd() + .register(self.poll.registry(), token, entry.op().interests())?; + + // Only insert the entry after it was registered successfully + slot.insert(entry); + + Ok(()) + } +} + +impl Poller for Driver { + fn attach(&self, _fd: RawFd) -> io::Result<()> { + Ok(()) + } + + unsafe fn push(&self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()> { + self.squeue.push(MioEntry::new(op, user_data)); + Ok(()) + } + + fn post(&self, user_data: usize, result: usize) -> io::Result<()> { + todo!() + } + + fn poll( + &self, + timeout: Option, + entries: &mut [MaybeUninit], + ) -> io::Result { + self.submit_squeue()?; + if entries.is_empty() { + return Ok(0); + } + if self.get_completed(entries) > 0 { + return Ok(entries.len()); + } + self.poll(timeout)?; + Ok(self.get_completed(entries)) + } +} diff --git a/src/driver/mio/net.rs b/src/driver/mio/net.rs index e69de29b..d71765c8 100644 --- a/src/driver/mio/net.rs +++ b/src/driver/mio/net.rs @@ -0,0 +1,24 @@ +use crate::{ + driver::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, + net::Socket, +}; + +impl AsRawFd for Socket { + fn as_raw_fd(&self) -> RawFd { + self.as_socket2().as_raw_fd() + } +} + +impl FromRawFd for Socket { + unsafe fn from_raw_fd(fd: RawFd) -> Self { + Self { + socket: socket2::Socket::from_raw_fd(fd), + } + } +} + +impl IntoRawFd for Socket { + fn into_raw_fd(self) -> RawFd { + self.into_socket2().into_raw_fd() + } +} diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs index e69de29b..8b137891 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/mio/op.rs @@ -0,0 +1 @@ + diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 4a6fea3d..3fabd7cd 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -10,9 +10,9 @@ cfg_if::cfg_if! { } else if #[cfg(target_os = "linux")] { mod iour; pub use iour::*; - } else { + } else if #[cfg(all(not(target_os = "linux"), unix))]{ mod mio; - pub use mio::*; + pub use self::mio::*; } } @@ -82,7 +82,7 @@ pub trait Poller { /// /// # Safety /// - /// - `op` should be alive until [`Poller::poll`] returns its result. + /// `op` should be alive until [`Poller::poll`] returns its result. unsafe fn push(&mut self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()>; From 2e794e3c6072c581d357dd38e0c73967a11a91d0 Mon Sep 17 00:00:00 2001 From: George Miao Date: Sat, 9 Sep 2023 02:02:27 -0400 Subject: [PATCH 04/24] refactor: extract unix op into separate file from iour --- src/driver/iour/op.rs | 93 ++----------------------------------------- src/driver/mod.rs | 2 + src/driver/unix_op.rs | 92 ++++++++++++++++++++++++++++++++++++++++++ src/op.rs | 7 ++-- 4 files changed, 101 insertions(+), 93 deletions(-) create mode 100644 src/driver/unix_op.rs diff --git a/src/driver/iour/op.rs b/src/driver/iour/op.rs index bbaf6f79..f74a170e 100644 --- a/src/driver/iour/op.rs +++ b/src/driver/iour/op.rs @@ -1,16 +1,14 @@ -use std::io::{IoSlice, IoSliceMut}; - use io_uring::{ opcode, squeue::Entry, types::{Fd, FsyncFlags}, }; -use libc::{sockaddr_storage, socklen_t}; -use socket2::SockAddr; +use libc::sockaddr_storage; +pub use crate::driver::unix_op::*; use crate::{ - buf::{AsIoSlices, AsIoSlicesMut, IntoInner, IoBuf, IoBufMut, OneOrVec}, - driver::{OpCode, RawFd}, + buf::{AsIoSlices, AsIoSlicesMut, IoBuf, IoBufMut}, + driver::OpCode, op::*, }; @@ -44,29 +42,6 @@ impl OpCode for Sync { } } -/// Accept a connection. -pub struct Accept { - pub(crate) fd: RawFd, - pub(crate) buffer: sockaddr_storage, - pub(crate) addr_len: socklen_t, -} - -impl Accept { - /// Create [`Accept`]. - pub fn new(fd: RawFd) -> Self { - Self { - fd, - buffer: unsafe { std::mem::zeroed() }, - addr_len: std::mem::size_of::() as _, - } - } - - /// Get the remote address from the inner buffer. - pub fn into_addr(self) -> SockAddr { - unsafe { SockAddr::new(self.buffer, self.addr_len) } - } -} - impl OpCode for Accept { fn create_entry(&mut self) -> Entry { opcode::Accept::new( @@ -108,36 +83,6 @@ impl OpCode for SendImpl { } } -/// Receive data and source address. -pub struct RecvFromImpl { - pub(crate) fd: RawFd, - pub(crate) buffer: T, - pub(crate) addr: sockaddr_storage, - pub(crate) slices: OneOrVec>, - msg: libc::msghdr, -} - -impl RecvFromImpl { - /// Create [`RecvFrom`] or [`RecvFromVectored`]. - pub fn new(fd: RawFd, buffer: T::Inner) -> Self { - Self { - fd, - buffer: T::new(buffer), - addr: unsafe { std::mem::zeroed() }, - slices: OneOrVec::One(IoSliceMut::new(&mut [])), - msg: unsafe { std::mem::zeroed() }, - } - } -} - -impl IntoInner for RecvFromImpl { - type Inner = (T, sockaddr_storage, socklen_t); - - fn into_inner(self) -> Self::Inner { - (self.buffer, self.addr, self.msg.msg_namelen) - } -} - impl OpCode for RecvFromImpl { #[allow(clippy::no_effect)] fn create_entry(&mut self) -> Entry { @@ -155,36 +100,6 @@ impl OpCode for RecvFromImpl { } } -/// Send data to specified address. -pub struct SendToImpl { - pub(crate) fd: RawFd, - pub(crate) buffer: T, - pub(crate) addr: SockAddr, - pub(crate) slices: OneOrVec>, - msg: libc::msghdr, -} - -impl SendToImpl { - /// Create [`SendTo`] or [`SendToVectored`]. - pub fn new(fd: RawFd, buffer: T::Inner, addr: SockAddr) -> Self { - Self { - fd, - buffer: T::new(buffer), - addr, - slices: OneOrVec::One(IoSlice::new(&[])), - msg: unsafe { std::mem::zeroed() }, - } - } -} - -impl IntoInner for SendToImpl { - type Inner = T; - - fn into_inner(self) -> Self::Inner { - self.buffer - } -} - impl OpCode for SendToImpl { #[allow(clippy::no_effect)] fn create_entry(&mut self) -> Entry { diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 3fabd7cd..6d122b70 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -2,6 +2,8 @@ //! Some types differ by compilation target. use std::{io, mem::MaybeUninit, time::Duration}; +#[cfg(unix)] +mod unix_op; cfg_if::cfg_if! { if #[cfg(target_os = "windows")] { diff --git a/src/driver/unix_op.rs b/src/driver/unix_op.rs new file mode 100644 index 00000000..08a175a4 --- /dev/null +++ b/src/driver/unix_op.rs @@ -0,0 +1,92 @@ +use std::io::{IoSlice, IoSliceMut}; + +use libc::{sockaddr_storage, socklen_t}; +use socket2::SockAddr; + +use crate::{ + buf::{AsIoSlices, AsIoSlicesMut, IntoInner, OneOrVec}, + driver::RawFd, +}; + +/// Accept a connection. +pub struct Accept { + pub(crate) fd: RawFd, + pub(crate) buffer: sockaddr_storage, + pub(crate) addr_len: socklen_t, +} + +impl Accept { + /// Create [`Accept`]. + pub fn new(fd: RawFd) -> Self { + Self { + fd, + buffer: unsafe { std::mem::zeroed() }, + addr_len: std::mem::size_of::() as _, + } + } + + /// Get the remote address from the inner buffer. + pub fn into_addr(self) -> SockAddr { + unsafe { SockAddr::new(self.buffer, self.addr_len) } + } +} + +/// Receive data and source address. +pub struct RecvFromImpl { + pub(crate) fd: RawFd, + pub(crate) buffer: T, + pub(crate) addr: sockaddr_storage, + pub(crate) slices: OneOrVec>, + pub(crate) msg: libc::msghdr, +} + +impl RecvFromImpl { + /// Create [`RecvFrom`] or [`RecvFromVectored`]. + pub fn new(fd: RawFd, buffer: T::Inner) -> Self { + Self { + fd, + buffer: T::new(buffer), + addr: unsafe { std::mem::zeroed() }, + slices: OneOrVec::One(IoSliceMut::new(&mut [])), + msg: unsafe { std::mem::zeroed() }, + } + } +} + +impl IntoInner for RecvFromImpl { + type Inner = (T, sockaddr_storage, socklen_t); + + fn into_inner(self) -> Self::Inner { + (self.buffer, self.addr, self.msg.msg_namelen) + } +} + +/// Send data to specified address. +pub struct SendToImpl { + pub(crate) fd: RawFd, + pub(crate) buffer: T, + pub(crate) addr: SockAddr, + pub(crate) slices: OneOrVec>, + pub(crate) msg: libc::msghdr, +} + +impl SendToImpl { + /// Create [`SendTo`] or [`SendToVectored`]. + pub fn new(fd: RawFd, buffer: T::Inner, addr: SockAddr) -> Self { + Self { + fd, + buffer: T::new(buffer), + addr, + slices: OneOrVec::One(IoSlice::new(&[])), + msg: unsafe { std::mem::zeroed() }, + } + } +} + +impl IntoInner for SendToImpl { + type Inner = T; + + fn into_inner(self) -> Self::Inner { + self.buffer + } +} diff --git a/src/op.rs b/src/op.rs index 0cbbdb96..14ab5a28 100644 --- a/src/op.rs +++ b/src/op.rs @@ -7,6 +7,9 @@ use std::io::{IoSlice, IoSliceMut}; use socket2::SockAddr; +#[cfg(target_os = "windows")] +pub use crate::driver::op::ConnectNamedPipe; +pub use crate::driver::op::{Accept, RecvFromImpl, SendToImpl}; use crate::{ buf::{ AsIoSlices, AsIoSlicesMut, BufWrapper, IntoInner, IoBuf, IoBufMut, OneOrVec, @@ -58,10 +61,6 @@ impl RecvResultExt for BufResult { } } -#[cfg(target_os = "windows")] -pub use crate::driver::op::ConnectNamedPipe; -pub use crate::driver::op::{Accept, RecvFromImpl, SendToImpl}; - /// Read a file at specified position into specified buffer. #[derive(Debug)] pub struct ReadAt { From b9a901e852a783a5e667db5b350ce33ae2b51f6e Mon Sep 17 00:00:00 2001 From: George Miao Date: Sat, 9 Sep 2023 02:03:22 -0400 Subject: [PATCH 05/24] feat: mio driver & op structure --- Cargo.toml | 1 + src/driver/mio/mod.rs | 107 +++++++++++++++++++++++++++++++++--------- src/driver/mio/op.rs | 100 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cef69e4a..1a82bf6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ libc = "0.2" [target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] mio = { version = "0.8.8", features = ["os-ext"] } slab = { version = "0.4" } +libc = "0.2" [features] default = ["runtime"] diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index 36e1308c..fe9ad0e7 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -2,6 +2,7 @@ pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::{cell::RefCell, io, mem::MaybeUninit, ops::DerefMut, time::Duration}; +pub(crate) use libc::{sockaddr_storage, socklen_t}; use mio::{ event::{Event, Source}, unix::SourceFd, @@ -15,11 +16,66 @@ pub(crate) mod fs; pub(crate) mod net; pub(crate) mod op; -/// Abstraction of mio operations. +/// Helper macro to execute a system call that returns an `io::Result`. +macro_rules! syscall { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + #[allow(unused_unsafe)] + let res = unsafe { libc::$fn($($arg, )*) }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res as _) + } + }}; +} + +pub(crate) use syscall; + +/// Abstraction of operations. pub trait OpCode { - fn interests(&self) -> Interest; - fn source_fd(&self) -> SourceFd<'_>; - fn perform(&mut self, event: &Event) -> io::Result; + /// Perform the operation before submit, and return [`Decision`] to + /// indicate whether submitting the operation to mio is required. + fn pre_submit(&mut self) -> io::Result; + + /// Perform the operation after received corresponding + /// event. + fn on_event(&mut self, event: &Event) -> io::Result; +} + +/// Result of [`OpCode::pre_submit`]. +pub enum Decision { + /// Instant operation, no need to submit + Completed(usize), + /// Async operation, needs to submit + Wait(OpMeta), +} + +impl Decision { + /// Decide to complete the operation with the given result. + pub fn complete(result: usize) -> Self { + Self::Completed(result) + } + + /// Decide to wait for the given fd with the given interest. + pub fn wait_for(fd: RawFd, interest: Interest) -> Self { + Self::Wait(OpMeta { fd, interest }) + } + + /// Decide to wait for the given fd to be readable. + pub fn wait_readable(fd: RawFd) -> Self { + Self::wait_for(fd, Interest::READABLE) + } + + /// Decide to wait for the given fd to be writable. + pub fn wait_writable(fd: RawFd) -> Self { + Self::wait_for(fd, Interest::WRITABLE) + } +} + +/// Meta of mio operations. +pub struct OpMeta { + fd: RawFd, + interest: Interest, } /// Low-level driver of mio. @@ -30,12 +86,13 @@ pub struct Driver { } /// Inner state of [`Driver`]. -pub struct DriverInner { +struct DriverInner { events: Events, poll: Poll, registered: Slab, } +/// Internal representation of operation being submitted into the driver. struct MioEntry { op: *mut dyn OpCode, user_data: usize, @@ -51,7 +108,7 @@ impl MioEntry { } } - fn op_mut(&self) -> &mut dyn OpCode { + fn op_mut(&mut self) -> &mut dyn OpCode { unsafe { &mut *self.op } } @@ -84,8 +141,18 @@ impl Driver { /// Register all operations in the squeue to mio. fn submit_squeue(&self) -> io::Result<()> { self.with_inner(|inner| { - while let Some(entry) = self.squeue.pop() { - inner.submit(entry)?; + while let Some(mut entry) = self.squeue.pop() { + match entry.op_mut().pre_submit() { + Ok(Decision::Wait(meta)) => { + inner.submit(entry, meta)?; + } + Ok(Decision::Completed(res)) => { + self.cqueue.push(Entry::new(entry.user_data, Ok(res))); + } + Err(err) => { + self.cqueue.push(Entry::new(entry.user_data, Err(err))); + } + } } Ok(()) @@ -100,19 +167,19 @@ impl Driver { for event in &inner.events { let token = event.token(); - let entry = inner + let mut entry = inner .registered .try_remove(token.0) - .expect("Unknown token returned by mio"); - let res = entry.op_mut().perform(event); + .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? + let res = entry.op_mut().on_event(event); - self.cqueue.push(Entry::new(token.into(), res)) + self.cqueue.push(Entry::new(entry.user_data, res)) } Ok(()) }) } - fn get_completed(&self, entries: &mut [MaybeUninit]) -> usize { + fn poll_completed(&self, entries: &mut [MaybeUninit]) -> usize { let len = self.cqueue.len().min(entries.len()); for entry in &mut entries[..len] { entry.write(self.cqueue.pop().unwrap()); @@ -129,14 +196,11 @@ impl Driver { } impl DriverInner { - fn submit(&mut self, entry: MioEntry) -> io::Result<()> { + fn submit(&mut self, entry: MioEntry, meta: OpMeta) -> io::Result<()> { let slot = self.registered.vacant_entry(); let token = Token(slot.key()); - entry - .op() - .source_fd() - .register(self.poll.registry(), token, entry.op().interests())?; + SourceFd(&meta.fd).register(self.poll.registry(), token, meta.interest)?; // Only insert the entry after it was registered successfully slot.insert(entry); @@ -156,7 +220,8 @@ impl Poller for Driver { } fn post(&self, user_data: usize, result: usize) -> io::Result<()> { - todo!() + self.cqueue.push(Entry::new(user_data, Ok(result))); + Ok(()) } fn poll( @@ -168,10 +233,10 @@ impl Poller for Driver { if entries.is_empty() { return Ok(0); } - if self.get_completed(entries) > 0 { + if self.poll_completed(entries) > 0 { return Ok(entries.len()); } self.poll(timeout)?; - Ok(self.get_completed(entries)) + Ok(self.poll_completed(entries)) } } diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs index 8b137891..bae1c1ac 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/mio/op.rs @@ -1 +1,101 @@ +use std::io; +use mio::event::Event; + +pub use crate::driver::unix_op::*; +use crate::{ + buf::{AsIoSlices, AsIoSlicesMut, IoBuf, IoBufMut}, + driver::{syscall, Decision, OpCode}, + op::*, +}; + +impl OpCode for ReadAt { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_readable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + todo!() + } +} + +impl OpCode for WriteAt { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_writable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + todo!() + } +} + +impl OpCode for Sync { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::Completed(0)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + unreachable!("Sync operation should not be submitted to mio") + } +} + +impl OpCode for Accept { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_readable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + todo!() + } +} + +impl OpCode for Connect { + fn pre_submit(&mut self) -> io::Result { + let res = syscall!(connect(self.fd, self.addr.as_ptr(), self.addr.len(),))?; + Ok(Decision::Completed(res)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + todo!() + } +} + +impl OpCode for RecvImpl { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_readable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + todo!() + } +} + +impl OpCode for SendImpl { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_writable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + todo!() + } +} + +impl OpCode for RecvFromImpl { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::Completed(0)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + todo!() + } +} + +impl OpCode for SendToImpl { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::Completed(0)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result { + todo!() + } +} From 53595cf11227ea9ced81fc5747a368893e5245db Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 02:18:56 -0400 Subject: [PATCH 06/24] feat: add nonblocking for unix socket --- src/net/socket.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/net/socket.rs b/src/net/socket.rs index b8b11d89..72fcddbc 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -78,7 +78,16 @@ impl Socket { } pub fn new(domain: Domain, ty: Type, protocol: Option) -> io::Result { - Self::from_socket2(Socket2::new(domain, ty, protocol)?) + #[cfg(not(unix))] + { + Self::from_socket2(Socket2::new(domain, ty, protocol)?) + } + #[cfg(unix)] + { + let socket = Socket2::new(domain, ty, protocol)?; + socket.set_nonblocking(true)?; + Self::from_socket2(socket) + } } pub fn bind(addr: &SockAddr, ty: Type, protocol: Option) -> io::Result { From 5065825a4bb2e0fb570268f07f8865edeeaee7d2 Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 03:53:47 -0400 Subject: [PATCH 07/24] ci: add mac tests --- azure-pipelines.yml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b88e10bf..f20b2a5f 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -45,6 +45,25 @@ jobs: cargo test --features all displayName: TestStable + - job: Test_Mac + strategy: + matrix: + ventura: + image: macOS-13 + monterey: + image: macOS-12 + pool: + vmImage: $(image) + + steps: + - script: | + rustup toolchain install nightly + cargo +nightly test --features all,nightly --no-default-features + displayName: TestNightly + - script: | + cargo test --features all + displayName: TestStable + - job: Doc strategy: matrix: From 5099aafef08cee12209c19d287b7d641c8dcbac2 Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 03:56:34 -0400 Subject: [PATCH 08/24] feat: finishing up mio feature --- src/driver/mio/fs.rs | 33 ------- src/driver/mio/mod.rs | 211 +++++++++++++++++++++--------------------- src/driver/mio/net.rs | 24 ----- src/driver/mio/op.rs | 128 +++++++++++++++++++++---- src/event/mio.rs | 68 ++++++++++++++ src/event/mod.rs | 3 + src/net/socket.rs | 4 +- src/signal/mod.rs | 4 +- 8 files changed, 291 insertions(+), 184 deletions(-) delete mode 100644 src/driver/mio/fs.rs delete mode 100644 src/driver/mio/net.rs create mode 100644 src/event/mio.rs diff --git a/src/driver/mio/fs.rs b/src/driver/mio/fs.rs deleted file mode 100644 index 8edd7a92..00000000 --- a/src/driver/mio/fs.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::{io, path::Path}; - -use crate::{ - driver::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, - fs::File, -}; - -pub fn file_with_options( - path: impl AsRef, - options: std::fs::OpenOptions, -) -> io::Result { - options.open(path) -} - -impl AsRawFd for File { - fn as_raw_fd(&self) -> RawFd { - self.inner.as_raw_fd() - } -} - -impl FromRawFd for File { - unsafe fn from_raw_fd(fd: RawFd) -> Self { - Self { - inner: std::fs::File::from_raw_fd(fd), - } - } -} - -impl IntoRawFd for File { - fn into_raw_fd(self) -> RawFd { - self.inner.into_raw_fd() - } -} diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index fe9ad0e7..a423b3c3 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -1,6 +1,6 @@ #[doc(no_inline)] pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use std::{cell::RefCell, io, mem::MaybeUninit, ops::DerefMut, time::Duration}; +use std::{cell::RefCell, collections::VecDeque, io, mem::MaybeUninit, time::Duration}; pub(crate) use libc::{sockaddr_storage, socklen_t}; use mio::{ @@ -10,27 +10,10 @@ use mio::{ }; use slab::Slab; -use crate::driver::{queue_with_capacity, Entry, Poller, Queue}; +use crate::driver::{Entry, Poller}; -pub(crate) mod fs; -pub(crate) mod net; pub(crate) mod op; -/// Helper macro to execute a system call that returns an `io::Result`. -macro_rules! syscall { - ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ - #[allow(unused_unsafe)] - let res = unsafe { libc::$fn($($arg, )*) }; - if res == -1 { - Err(std::io::Error::last_os_error()) - } else { - Ok(res as _) - } - }}; -} - -pub(crate) use syscall; - /// Abstraction of operations. pub trait OpCode { /// Perform the operation before submit, and return [`Decision`] to @@ -47,18 +30,13 @@ pub enum Decision { /// Instant operation, no need to submit Completed(usize), /// Async operation, needs to submit - Wait(OpMeta), + Wait(WaitArg), } impl Decision { - /// Decide to complete the operation with the given result. - pub fn complete(result: usize) -> Self { - Self::Completed(result) - } - /// Decide to wait for the given fd with the given interest. pub fn wait_for(fd: RawFd, interest: Interest) -> Self { - Self::Wait(OpMeta { fd, interest }) + Self::Wait(WaitArg { fd, interest }) } /// Decide to wait for the given fd to be readable. @@ -73,26 +51,25 @@ impl Decision { } /// Meta of mio operations. -pub struct OpMeta { +#[derive(Debug, Clone, Copy)] +pub struct WaitArg { fd: RawFd, interest: Interest, } /// Low-level driver of mio. -pub struct Driver { - inner: RefCell, - squeue: Queue, - cqueue: Queue, -} +pub struct Driver(RefCell); /// Inner state of [`Driver`]. struct DriverInner { + squeue: VecDeque, + cqueue: VecDeque, events: Events, poll: Poll, - registered: Slab, + waiting: Slab, } -/// Internal representation of operation being submitted into the driver. +/// Entry in squeue struct MioEntry { op: *mut dyn OpCode, user_data: usize, @@ -111,9 +88,26 @@ impl MioEntry { fn op_mut(&mut self) -> &mut dyn OpCode { unsafe { &mut *self.op } } +} - fn op(&self) -> &dyn OpCode { - unsafe { &*self.op } +/// Entry waiting for events +struct WaitEntry { + op: *mut dyn OpCode, + arg: WaitArg, + user_data: usize, +} + +impl WaitEntry { + fn new(mio_entry: MioEntry, arg: WaitArg) -> Self { + Self { + op: mio_entry.op, + arg, + user_data: mio_entry.user_data, + } + } + + fn op_mut(&mut self) -> &mut dyn OpCode { + unsafe { &mut *self.op } } } @@ -127,86 +121,77 @@ impl Driver { pub fn with_entries(entries: u32) -> io::Result { let entries = entries as usize; // for the sake of consistency, use u32 like iour - Ok(Self { - squeue: queue_with_capacity(entries), - cqueue: queue_with_capacity(entries), - inner: RefCell::new(DriverInner { - events: Events::with_capacity(entries), - poll: Poll::new()?, - registered: Slab::new(), - }), - }) + Ok(Self(RefCell::new(DriverInner { + squeue: VecDeque::with_capacity(entries), + cqueue: VecDeque::with_capacity(entries), + events: Events::with_capacity(entries), + poll: Poll::new()?, + waiting: Slab::new(), + }))) + } + + fn inner(&self) -> std::cell::RefMut<'_, DriverInner> { + self.0.borrow_mut() + } +} + +impl DriverInner { + fn submit(&mut self, entry: MioEntry, meta: WaitArg) -> io::Result<()> { + let slot = self.waiting.vacant_entry(); + let token = Token(slot.key()); + + SourceFd(&meta.fd).register(self.poll.registry(), token, meta.interest)?; + + // Only insert the entry after it was registered successfully + slot.insert(WaitEntry::new(entry, meta)); + + Ok(()) } /// Register all operations in the squeue to mio. - fn submit_squeue(&self) -> io::Result<()> { - self.with_inner(|inner| { - while let Some(mut entry) = self.squeue.pop() { - match entry.op_mut().pre_submit() { - Ok(Decision::Wait(meta)) => { - inner.submit(entry, meta)?; - } - Ok(Decision::Completed(res)) => { - self.cqueue.push(Entry::new(entry.user_data, Ok(res))); - } - Err(err) => { - self.cqueue.push(Entry::new(entry.user_data, Err(err))); - } + fn submit_squeue(&mut self) -> io::Result<()> { + while let Some(mut entry) = self.squeue.pop_front() { + match entry.op_mut().pre_submit() { + Ok(Decision::Wait(meta)) => { + self.submit(entry, meta)?; + } + Ok(Decision::Completed(res)) => { + self.cqueue.push_back(Entry::new(entry.user_data, Ok(res))); + } + Err(err) => { + self.cqueue.push_back(Entry::new(entry.user_data, Err(err))); } } + } - Ok(()) - }) + Ok(()) } /// Poll all events from mio, call `perform` on op and push them into /// cqueue. - fn poll(&self, timeout: Option) -> io::Result<()> { - self.with_inner(|inner| { - inner.poll.poll(&mut inner.events, timeout)?; - - for event in &inner.events { - let token = event.token(); - let mut entry = inner - .registered - .try_remove(token.0) - .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? - let res = entry.op_mut().on_event(event); - - self.cqueue.push(Entry::new(entry.user_data, res)) - } - Ok(()) - }) + fn poll(&mut self, timeout: Option) -> io::Result<()> { + self.poll.poll(&mut self.events, timeout)?; + + for event in &self.events { + let token = event.token(); + let entry = self + .waiting + .get_mut(token.0) + .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? + let res = entry.op_mut().on_event(event); + + self.cqueue.push_back(Entry::new(entry.user_data, res)) + } + Ok(()) } - fn poll_completed(&self, entries: &mut [MaybeUninit]) -> usize { + fn poll_completed(&mut self, entries: &mut [MaybeUninit]) -> usize { let len = self.cqueue.len().min(entries.len()); for entry in &mut entries[..len] { - entry.write(self.cqueue.pop().unwrap()); + entry.write(self.cqueue.pop_front().unwrap()); } len } - - fn with_inner(&self, f: F) -> R - where - F: FnOnce(&mut DriverInner) -> R, - { - f(self.inner.borrow_mut().deref_mut()) - } -} - -impl DriverInner { - fn submit(&mut self, entry: MioEntry, meta: OpMeta) -> io::Result<()> { - let slot = self.registered.vacant_entry(); - let token = Token(slot.key()); - - SourceFd(&meta.fd).register(self.poll.registry(), token, meta.interest)?; - - // Only insert the entry after it was registered successfully - slot.insert(entry); - - Ok(()) - } } impl Poller for Driver { @@ -215,13 +200,23 @@ impl Poller for Driver { } unsafe fn push(&self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()> { - self.squeue.push(MioEntry::new(op, user_data)); + self.0 + .borrow_mut() + .squeue + .push_back(MioEntry::new(op, user_data)); Ok(()) } - fn post(&self, user_data: usize, result: usize) -> io::Result<()> { - self.cqueue.push(Entry::new(user_data, Ok(result))); - Ok(()) + fn cancel(&self, user_data: usize) { + let mut inner = self.inner(); + + let Some(entry) = inner.waiting.try_remove(user_data) + else { return }; + inner + .poll + .registry() + .deregister(&mut SourceFd(&entry.arg.fd)) + .ok(); } fn poll( @@ -229,14 +224,16 @@ impl Poller for Driver { timeout: Option, entries: &mut [MaybeUninit], ) -> io::Result { - self.submit_squeue()?; + let mut inner = self.inner(); + + inner.submit_squeue()?; if entries.is_empty() { return Ok(0); } - if self.poll_completed(entries) > 0 { + if inner.poll_completed(entries) > 0 { return Ok(entries.len()); } - self.poll(timeout)?; - Ok(self.poll_completed(entries)) + inner.poll(timeout)?; + Ok(inner.poll_completed(entries)) } } diff --git a/src/driver/mio/net.rs b/src/driver/mio/net.rs deleted file mode 100644 index d71765c8..00000000 --- a/src/driver/mio/net.rs +++ /dev/null @@ -1,24 +0,0 @@ -use crate::{ - driver::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, - net::Socket, -}; - -impl AsRawFd for Socket { - fn as_raw_fd(&self) -> RawFd { - self.as_socket2().as_raw_fd() - } -} - -impl FromRawFd for Socket { - unsafe fn from_raw_fd(fd: RawFd) -> Self { - Self { - socket: socket2::Socket::from_raw_fd(fd), - } - } -} - -impl IntoRawFd for Socket { - fn into_raw_fd(self) -> RawFd { - self.into_socket2().into_raw_fd() - } -} diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs index bae1c1ac..7a80d418 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/mio/op.rs @@ -5,17 +5,55 @@ use mio::event::Event; pub use crate::driver::unix_op::*; use crate::{ buf::{AsIoSlices, AsIoSlicesMut, IoBuf, IoBufMut}, - driver::{syscall, Decision, OpCode}, + driver::{Decision, OpCode}, op::*, }; +/// Helper macro to execute a system call that returns an `io::Result`. +macro_rules! syscall { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + #[allow(unused_unsafe)] + let res = unsafe { ::libc::$fn($($arg, )*) }; + if res == -1 { + Err(::std::io::Error::last_os_error()) + } else { + Ok(res as _) + } + }}; +} + +/// Execute a system call, if would block, wait for it to be readable. +macro_rules! syscall_or_wait_writable { + ($fn: ident ( $($arg: expr),* $(,)* ), $fd:expr) => {{ + match syscall!( $fn ( $($arg, )* )) { + Ok(fd) => Ok(Decision::Completed(fd)), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(Decision::wait_writable($fd)), + Err(e) => Err(e), + } + }}; +} + +/// Execute a system call, if would block, wait for it to be writable. +macro_rules! syscall_or_wait_readable { + ($fn: ident ( $($arg: expr),* $(,)* ), $fd:expr) => {{ + match syscall!( $fn ( $($arg, )* )) { + Ok(fd) => Ok(Decision::Completed(fd)), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(Decision::wait_readable($fd)), + Err(e) => Err(e), + } + }}; +} + impl OpCode for ReadAt { fn pre_submit(&mut self) -> io::Result { Ok(Decision::wait_readable(self.fd)) } fn on_event(&mut self, event: &Event) -> std::io::Result { - todo!() + debug_assert!(event.is_readable()); + + let (ptr, len) = (self.buffer.as_buf_mut_ptr(), self.buffer.buf_len()); + syscall!(pread(self.fd, ptr as _, len, self.offset as _)) } } @@ -25,38 +63,58 @@ impl OpCode for WriteAt { } fn on_event(&mut self, event: &Event) -> std::io::Result { - todo!() + debug_assert!(event.is_writable()); + + let (ptr, len) = (self.buffer.as_buf_ptr(), self.buffer.buf_len()); + syscall!(pwrite(self.fd, ptr as _, len, self.offset as _)) } } impl OpCode for Sync { fn pre_submit(&mut self) -> io::Result { - Ok(Decision::Completed(0)) + Ok(Decision::Completed(syscall!(fsync(self.fd))?)) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, _: &Event) -> std::io::Result { unreachable!("Sync operation should not be submitted to mio") } } impl OpCode for Accept { fn pre_submit(&mut self) -> io::Result { - Ok(Decision::wait_readable(self.fd)) + syscall_or_wait_writable!( + accept( + self.fd, + &mut self.buffer as *mut _ as *mut _, + &mut self.addr_len + ), + self.fd + ) } fn on_event(&mut self, event: &Event) -> std::io::Result { - todo!() + debug_assert!(event.is_readable()); + + syscall!(accept( + self.fd, + &mut self.buffer as *mut _ as *mut _, + &mut self.addr_len + )) } } impl OpCode for Connect { fn pre_submit(&mut self) -> io::Result { - let res = syscall!(connect(self.fd, self.addr.as_ptr(), self.addr.len(),))?; - Ok(Decision::Completed(res)) + syscall_or_wait_readable!( + connect(self.fd, self.addr.as_ptr(), self.addr.len()), + self.fd + ) } fn on_event(&mut self, event: &Event) -> std::io::Result { - todo!() + debug_assert!(event.is_writable()); + + syscall!(connect(self.fd, self.addr.as_ptr(), self.addr.len())) } } @@ -66,7 +124,14 @@ impl OpCode for RecvImpl { } fn on_event(&mut self, event: &Event) -> std::io::Result { - todo!() + debug_assert!(event.is_readable()); + + self.slices = unsafe { self.buffer.as_io_slices_mut() }; + syscall!(readv( + self.fd, + self.slices.as_ptr() as _, + self.slices.len() as _, + )) } } @@ -76,26 +141,57 @@ impl OpCode for SendImpl { } fn on_event(&mut self, event: &Event) -> std::io::Result { - todo!() + debug_assert!(event.is_writable()); + + self.slices = unsafe { self.buffer.as_io_slices() }; + syscall!(writev( + self.fd, + self.slices.as_ptr() as _, + self.slices.len() as _, + )) } } impl OpCode for RecvFromImpl { fn pre_submit(&mut self) -> io::Result { - Ok(Decision::Completed(0)) + self.slices = unsafe { self.buffer.as_io_slices_mut() }; + self.msg = libc::msghdr { + msg_name: &mut self.addr as *mut _ as _, + msg_namelen: 128, + msg_iov: self.slices.as_mut_ptr() as _, + msg_iovlen: self.slices.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + syscall_or_wait_readable!(recvmsg(self.fd, &mut self.msg, 0), self.fd) } fn on_event(&mut self, event: &Event) -> std::io::Result { - todo!() + debug_assert!(event.is_readable()); + + syscall!(recvmsg(self.fd, &mut self.msg, 0)) } } impl OpCode for SendToImpl { fn pre_submit(&mut self) -> io::Result { - Ok(Decision::Completed(0)) + self.slices = unsafe { self.buffer.as_io_slices() }; + self.msg = libc::msghdr { + msg_name: &mut self.addr as *mut _ as _, + msg_namelen: 128, + msg_iov: self.slices.as_mut_ptr() as _, + msg_iovlen: self.slices.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + syscall_or_wait_writable!(sendmsg(self.fd, &self.msg, 0), self.fd) } fn on_event(&mut self, event: &Event) -> std::io::Result { - todo!() + assert!(event.is_writable()); + + syscall!(sendmsg(self.fd, &self.msg, 0)) } } diff --git a/src/event/mio.rs b/src/event/mio.rs new file mode 100644 index 00000000..15da1f3f --- /dev/null +++ b/src/event/mio.rs @@ -0,0 +1,68 @@ +use std::{ + io, + os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd}, +}; + +use crate::{impl_raw_fd, op::ReadAt, task::RUNTIME}; + +/// An event that won't wake until [`EventHandle::notify`] is called +/// successfully. +#[derive(Debug)] +pub struct Event { + fd: OwnedFd, +} + +impl Event { + /// Create [`Event`]. + pub fn new() -> io::Result { + let (sender, _) = mio::unix::pipe::new()?; + + Ok(Self { + fd: unsafe { OwnedFd::from_raw_fd(sender.into_raw_fd()) }, + }) + } + + /// Get a notify handle. + pub fn handle(&self) -> EventHandle { + EventHandle::new(self.fd.as_fd()) + } + + /// Wait for [`EventHandle::notify`] called. + pub async fn wait(&self) -> io::Result<()> { + let buffer = Vec::with_capacity(8); + let op = ReadAt::new(self.as_raw_fd(), 0, buffer); + let (res, _) = RUNTIME.with(|runtime| runtime.submit(op)).await; + res?; + Ok(()) + } +} + +impl_raw_fd!(Event, fd); + +/// A handle to [`Event`]. +pub struct EventHandle<'a> { + fd: BorrowedFd<'a>, +} + +impl<'a> EventHandle<'a> { + pub(crate) fn new(fd: BorrowedFd<'a>) -> Self { + Self { fd } + } + + /// Notify the event. + pub fn notify(&self) -> io::Result<()> { + let data = 1u64; + let res = unsafe { + libc::write( + self.fd.as_raw_fd(), + &data as *const _ as *const _, + std::mem::size_of::(), + ) + }; + if res < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } +} diff --git a/src/event/mod.rs b/src/event/mod.rs index d17ca66f..a353e651 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -9,5 +9,8 @@ cfg_if::cfg_if! { } else if #[cfg(target_os = "linux")] { mod iour; pub use iour::*; + } else if #[cfg(all(unix, not(target_os = "linux")))] { + mod mio; + pub use self::mio::*; } } diff --git a/src/net/socket.rs b/src/net/socket.rs index 72fcddbc..2b495048 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -118,13 +118,13 @@ impl Socket { _op.update_context()?; Ok(()) } - #[cfg(target_os = "linux")] + #[cfg(unix)] { res.map(|_| ()) } } - #[cfg(all(feature = "runtime", target_os = "linux"))] + #[cfg(all(feature = "runtime", unix))] pub async fn accept(&self) -> io::Result<(Self, SockAddr)> { use std::os::fd::FromRawFd; diff --git a/src/signal/mod.rs b/src/signal/mod.rs index 67c64460..ff7b8f83 100644 --- a/src/signal/mod.rs +++ b/src/signal/mod.rs @@ -20,11 +20,11 @@ pub mod windows; #[doc(no_inline)] pub use windows::ctrl_c; -#[cfg(target_os = "linux")] +#[cfg(unix)] pub mod unix; /// Completes when a "ctrl-c" notification is sent to the process. -#[cfg(target_os = "linux")] +#[cfg(unix)] pub async fn ctrl_c() -> std::io::Result<()> { unix::signal(libc::SIGINT).await } From fafc9e07c2146c5c8e280983a378e76be709f38c Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 04:00:05 -0400 Subject: [PATCH 09/24] refactor: rename meta to arg --- src/driver/mio/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index a423b3c3..ad25d68d 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -136,14 +136,14 @@ impl Driver { } impl DriverInner { - fn submit(&mut self, entry: MioEntry, meta: WaitArg) -> io::Result<()> { + fn submit(&mut self, entry: MioEntry, arg: WaitArg) -> io::Result<()> { let slot = self.waiting.vacant_entry(); let token = Token(slot.key()); - SourceFd(&meta.fd).register(self.poll.registry(), token, meta.interest)?; + SourceFd(&arg.fd).register(self.poll.registry(), token, arg.interest)?; // Only insert the entry after it was registered successfully - slot.insert(WaitEntry::new(entry, meta)); + slot.insert(WaitEntry::new(entry, arg)); Ok(()) } @@ -152,8 +152,8 @@ impl DriverInner { fn submit_squeue(&mut self) -> io::Result<()> { while let Some(mut entry) = self.squeue.pop_front() { match entry.op_mut().pre_submit() { - Ok(Decision::Wait(meta)) => { - self.submit(entry, meta)?; + Ok(Decision::Wait(arg)) => { + self.submit(entry, arg)?; } Ok(Decision::Completed(res)) => { self.cqueue.push_back(Entry::new(entry.user_data, Ok(res))); From 8ef8e2a381ad7d207991e3c5c3de02fd0433aaba Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 05:14:59 -0400 Subject: [PATCH 10/24] refactor: remove unnecessary cfg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 王宇逸 --- src/driver/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 6d122b70..22f2a769 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -12,7 +12,7 @@ cfg_if::cfg_if! { } else if #[cfg(target_os = "linux")] { mod iour; pub use iour::*; - } else if #[cfg(all(not(target_os = "linux"), unix))]{ + } else if #[cfg(unix)]{ mod mio; pub use self::mio::*; } From 9c9e5f09273ba76ddfac9558694fccad3c420ed3 Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 05:15:36 -0400 Subject: [PATCH 11/24] refactor: remove unnecessary block MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 王宇逸 --- src/net/socket.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/net/socket.rs b/src/net/socket.rs index 2b495048..4e9111d4 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -78,16 +78,10 @@ impl Socket { } pub fn new(domain: Domain, ty: Type, protocol: Option) -> io::Result { - #[cfg(not(unix))] - { - Self::from_socket2(Socket2::new(domain, ty, protocol)?) - } + let socket = Socket2::new(domain, ty, protocol)?; #[cfg(unix)] - { - let socket = Socket2::new(domain, ty, protocol)?; - socket.set_nonblocking(true)?; - Self::from_socket2(socket) - } + socket.set_nonblocking(true)?; + Self::from_socket2(socket) } pub fn bind(addr: &SockAddr, ty: Type, protocol: Option) -> io::Result { From 1f4c723ca5d0da6951f4ab972bcab7a2511e2e54 Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 06:18:42 -0400 Subject: [PATCH 12/24] wip --- src/driver/mio/mod.rs | 28 ++++++++---- src/driver/mio/op.rs | 104 +++++++++++++++++++++++++++++++----------- src/event/mio.rs | 3 +- tests/event.rs | 17 +++++-- tests/tcp_accept.rs | 1 + tests/tcp_connect.rs | 5 +- 6 files changed, 117 insertions(+), 41 deletions(-) diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index ad25d68d..98081d6e 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -1,6 +1,8 @@ #[doc(no_inline)] pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use std::{cell::RefCell, collections::VecDeque, io, mem::MaybeUninit, time::Duration}; +use std::{ + cell::RefCell, collections::VecDeque, io, mem::MaybeUninit, ops::ControlFlow, time::Duration, +}; pub(crate) use libc::{sockaddr_storage, socklen_t}; use mio::{ @@ -22,7 +24,7 @@ pub trait OpCode { /// Perform the operation after received corresponding /// event. - fn on_event(&mut self, event: &Event) -> io::Result; + fn on_event(&mut self, event: &Event) -> io::Result>; } /// Result of [`OpCode::pre_submit`]. @@ -70,6 +72,7 @@ struct DriverInner { } /// Entry in squeue +#[derive(Debug)] struct MioEntry { op: *mut dyn OpCode, user_data: usize, @@ -171,16 +174,24 @@ impl DriverInner { /// cqueue. fn poll(&mut self, timeout: Option) -> io::Result<()> { self.poll.poll(&mut self.events, timeout)?; - + // println!("events: {:?}", self.events); for event in &self.events { let token = event.token(); let entry = self .waiting .get_mut(token.0) .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? - let res = entry.op_mut().on_event(event); - - self.cqueue.push_back(Entry::new(entry.user_data, res)) + match entry.op_mut().on_event(event) { + Ok(ControlFlow::Continue(_)) => {} + Ok(ControlFlow::Break(res)) => { + self.cqueue.push_back(Entry::new(entry.user_data, Ok(res))); + self.waiting.remove(token.0); + } + Err(err) => { + self.cqueue.push_back(Entry::new(entry.user_data, Err(err))); + self.waiting.remove(token.0); + } + } } Ok(()) } @@ -210,8 +221,9 @@ impl Poller for Driver { fn cancel(&self, user_data: usize) { let mut inner = self.inner(); - let Some(entry) = inner.waiting.try_remove(user_data) - else { return }; + let Some(entry) = inner.waiting.try_remove(user_data) else { + return; + }; inner .poll .registry() diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs index 7a80d418..98dd1fdc 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/mio/op.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{io, ops::ControlFlow}; use mio::event::Event; @@ -17,7 +17,20 @@ macro_rules! syscall { if res == -1 { Err(::std::io::Error::last_os_error()) } else { - Ok(res as _) + Ok(res as usize) + } + }}; +} + +/// Helper macro to execute a system call that returns an `io::Result`. +macro_rules! syscall_break { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + #[allow(unused_unsafe)] + let res = unsafe { ::libc::$fn($($arg, )*) }; + if res == -1 { + Err(::std::io::Error::last_os_error()) + } else { + Ok(ControlFlow::Break(res as _)) } }}; } @@ -27,7 +40,8 @@ macro_rules! syscall_or_wait_writable { ($fn: ident ( $($arg: expr),* $(,)* ), $fd:expr) => {{ match syscall!( $fn ( $($arg, )* )) { Ok(fd) => Ok(Decision::Completed(fd)), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(Decision::wait_writable($fd)), + Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error().map_or(false, |code| code == libc::EINPROGRESS) + => Ok(Decision::wait_writable($fd)), Err(e) => Err(e), } }}; @@ -38,7 +52,8 @@ macro_rules! syscall_or_wait_readable { ($fn: ident ( $($arg: expr),* $(,)* ), $fd:expr) => {{ match syscall!( $fn ( $($arg, )* )) { Ok(fd) => Ok(Decision::Completed(fd)), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(Decision::wait_readable($fd)), + Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error().map_or(false, |code| code == libc::EINPROGRESS) + => Ok(Decision::wait_readable($fd)), Err(e) => Err(e), } }}; @@ -49,11 +64,17 @@ impl OpCode for ReadAt { Ok(Decision::wait_readable(self.fd)) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, event: &Event) -> std::io::Result> { debug_assert!(event.is_readable()); - let (ptr, len) = (self.buffer.as_buf_mut_ptr(), self.buffer.buf_len()); - syscall!(pread(self.fd, ptr as _, len, self.offset as _)) + let slice = self.buffer.as_uninit_slice(); + + syscall_break!(pread( + self.fd, + slice.as_mut_ptr() as _, + slice.len() as _, + self.offset as _ + )) } } @@ -62,11 +83,17 @@ impl OpCode for WriteAt { Ok(Decision::wait_writable(self.fd)) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, event: &Event) -> std::io::Result> { debug_assert!(event.is_writable()); - let (ptr, len) = (self.buffer.as_buf_ptr(), self.buffer.buf_len()); - syscall!(pwrite(self.fd, ptr as _, len, self.offset as _)) + let slice = self.buffer.as_slice(); + + syscall_break!(pwrite( + self.fd, + slice.as_ptr() as _, + slice.len() as _, + self.offset as _ + )) } } @@ -75,14 +102,14 @@ impl OpCode for Sync { Ok(Decision::Completed(syscall!(fsync(self.fd))?)) } - fn on_event(&mut self, _: &Event) -> std::io::Result { + fn on_event(&mut self, _: &Event) -> std::io::Result> { unreachable!("Sync operation should not be submitted to mio") } } impl OpCode for Accept { fn pre_submit(&mut self) -> io::Result { - syscall_or_wait_writable!( + syscall_or_wait_readable!( accept( self.fd, &mut self.buffer as *mut _ as *mut _, @@ -92,29 +119,54 @@ impl OpCode for Accept { ) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, event: &Event) -> std::io::Result> { debug_assert!(event.is_readable()); - syscall!(accept( + match syscall!(accept( self.fd, &mut self.buffer as *mut _ as *mut _, &mut self.addr_len - )) + )) { + Ok(fd) => Ok(ControlFlow::Break(fd)), + Err(e) + if e.kind() == io::ErrorKind::WouldBlock + || e.raw_os_error() + .map_or(false, |code| code == libc::EINPROGRESS) => + { + Ok(ControlFlow::Continue(())) + } + Err(e) => Err(e), + } } } impl OpCode for Connect { fn pre_submit(&mut self) -> io::Result { - syscall_or_wait_readable!( + syscall_or_wait_writable!( connect(self.fd, self.addr.as_ptr(), self.addr.len()), self.fd ) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, event: &Event) -> std::io::Result> { debug_assert!(event.is_writable()); - syscall!(connect(self.fd, self.addr.as_ptr(), self.addr.len())) + let mut err: libc::c_int = 0; + let mut err_len = std::mem::size_of::() as libc::socklen_t; + + syscall!(getsockopt( + self.fd, + libc::SOL_SOCKET, + libc::SO_ERROR, + &mut err as *mut _ as *mut _, + &mut err_len + ))?; + + if err == 0 { + Ok(ControlFlow::Break(0)) + } else { + Err(io::Error::from_raw_os_error(err)) + } } } @@ -123,11 +175,11 @@ impl OpCode for RecvImpl { Ok(Decision::wait_readable(self.fd)) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, event: &Event) -> std::io::Result> { debug_assert!(event.is_readable()); self.slices = unsafe { self.buffer.as_io_slices_mut() }; - syscall!(readv( + syscall_break!(readv( self.fd, self.slices.as_ptr() as _, self.slices.len() as _, @@ -140,11 +192,11 @@ impl OpCode for SendImpl { Ok(Decision::wait_writable(self.fd)) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, event: &Event) -> std::io::Result> { debug_assert!(event.is_writable()); self.slices = unsafe { self.buffer.as_io_slices() }; - syscall!(writev( + syscall_break!(writev( self.fd, self.slices.as_ptr() as _, self.slices.len() as _, @@ -167,10 +219,10 @@ impl OpCode for RecvFromImpl { syscall_or_wait_readable!(recvmsg(self.fd, &mut self.msg, 0), self.fd) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, event: &Event) -> std::io::Result> { debug_assert!(event.is_readable()); - syscall!(recvmsg(self.fd, &mut self.msg, 0)) + syscall_break!(recvmsg(self.fd, &mut self.msg, 0)) } } @@ -189,9 +241,9 @@ impl OpCode for SendToImpl { syscall_or_wait_writable!(sendmsg(self.fd, &self.msg, 0), self.fd) } - fn on_event(&mut self, event: &Event) -> std::io::Result { + fn on_event(&mut self, event: &Event) -> std::io::Result> { assert!(event.is_writable()); - syscall!(sendmsg(self.fd, &self.msg, 0)) + syscall_break!(sendmsg(self.fd, &self.msg, 0)) } } diff --git a/src/event/mio.rs b/src/event/mio.rs index 15da1f3f..61ce05b3 100644 --- a/src/event/mio.rs +++ b/src/event/mio.rs @@ -15,7 +15,8 @@ pub struct Event { impl Event { /// Create [`Event`]. pub fn new() -> io::Result { - let (sender, _) = mio::unix::pipe::new()?; + let (sender, receiver) = mio::unix::pipe::new()?; + receiver.into_raw_fd(); // Preventing from closing. Ok(Self { fd: unsafe { OwnedFd::from_raw_fd(sender.into_raw_fd()) }, diff --git a/tests/event.rs b/tests/event.rs index 14b8c5ff..5a99adb8 100644 --- a/tests/event.rs +++ b/tests/event.rs @@ -2,12 +2,19 @@ use compio::event::Event; #[test] fn event_handle() { - compio::task::block_on(async { - let event = Event::new().unwrap(); + let event = Event::new().unwrap(); + + std::thread::scope(|scope| { let handle = event.handle(); - std::thread::scope(|scope| { - scope.spawn(|| handle.notify().unwrap()); + let wait = event.wait(); + scope.spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + handle.notify().unwrap() + }); + scope.spawn(move || { + compio::task::block_on(async { + wait.await.unwrap(); + }) }); - event.wait().await.unwrap(); }); } diff --git a/tests/tcp_accept.rs b/tests/tcp_accept.rs index ab266a5e..1c196841 100644 --- a/tests/tcp_accept.rs +++ b/tests/tcp_accept.rs @@ -19,6 +19,7 @@ macro_rules! test_accept { $( #[test] fn $ident() { + println!("Testing {}...", stringify!($ident)); compio::task::block_on(test_impl($target)) } )* diff --git a/tests/tcp_connect.rs b/tests/tcp_connect.rs index 4fefd703..63af7b41 100644 --- a/tests/tcp_connect.rs +++ b/tests/tcp_connect.rs @@ -51,7 +51,10 @@ async fn test_connect_impl(mapping: impl FnOnce(&TcpListener) -> }; let client = async { - assert!(TcpStream::connect(addr).await.is_ok()); + match TcpStream::connect(addr).await { + Ok(_) => (), + Err(e) => panic!("Failed to connect: {}", e), + } }; futures_util::join!(server, client); From 9f0fa774a0e403a6b9e0bf1628a01fd279aaf9fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 20:02:17 +0800 Subject: [PATCH 13/24] Remove DriverInner. --- src/driver/mio/mod.rs | 61 +++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index 98081d6e..136b58a5 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -1,8 +1,6 @@ #[doc(no_inline)] pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use std::{ - cell::RefCell, collections::VecDeque, io, mem::MaybeUninit, ops::ControlFlow, time::Duration, -}; +use std::{collections::VecDeque, io, mem::MaybeUninit, ops::ControlFlow, time::Duration}; pub(crate) use libc::{sockaddr_storage, socklen_t}; use mio::{ @@ -60,10 +58,7 @@ pub struct WaitArg { } /// Low-level driver of mio. -pub struct Driver(RefCell); - -/// Inner state of [`Driver`]. -struct DriverInner { +pub struct Driver { squeue: VecDeque, cqueue: VecDeque, events: Events, @@ -124,21 +119,17 @@ impl Driver { pub fn with_entries(entries: u32) -> io::Result { let entries = entries as usize; // for the sake of consistency, use u32 like iour - Ok(Self(RefCell::new(DriverInner { + Ok(Self { squeue: VecDeque::with_capacity(entries), cqueue: VecDeque::with_capacity(entries), events: Events::with_capacity(entries), poll: Poll::new()?, waiting: Slab::new(), - }))) - } - - fn inner(&self) -> std::cell::RefMut<'_, DriverInner> { - self.0.borrow_mut() + }) } } -impl DriverInner { +impl Driver { fn submit(&mut self, entry: MioEntry, arg: WaitArg) -> io::Result<()> { let slot = self.waiting.vacant_entry(); let token = Token(slot.key()); @@ -172,7 +163,7 @@ impl DriverInner { /// Poll all events from mio, call `perform` on op and push them into /// cqueue. - fn poll(&mut self, timeout: Option) -> io::Result<()> { + fn poll_impl(&mut self, timeout: Option) -> io::Result<()> { self.poll.poll(&mut self.events, timeout)?; // println!("events: {:?}", self.events); for event in &self.events { @@ -206,46 +197,48 @@ impl DriverInner { } impl Poller for Driver { - fn attach(&self, _fd: RawFd) -> io::Result<()> { + fn attach(&mut self, _fd: RawFd) -> io::Result<()> { Ok(()) } - unsafe fn push(&self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()> { - self.0 - .borrow_mut() - .squeue - .push_back(MioEntry::new(op, user_data)); + unsafe fn push( + &mut self, + op: &mut (impl OpCode + 'static), + user_data: usize, + ) -> io::Result<()> { + self.squeue.push_back(MioEntry::new(op, user_data)); Ok(()) } - fn cancel(&self, user_data: usize) { - let mut inner = self.inner(); - - let Some(entry) = inner.waiting.try_remove(user_data) else { + fn cancel(&mut self, user_data: usize) { + let Some(entry) = self.waiting.try_remove(user_data) else { return; }; - inner - .poll + self.poll .registry() .deregister(&mut SourceFd(&entry.arg.fd)) .ok(); } fn poll( - &self, + &mut self, timeout: Option, entries: &mut [MaybeUninit], ) -> io::Result { - let mut inner = self.inner(); - - inner.submit_squeue()?; + self.submit_squeue()?; if entries.is_empty() { return Ok(0); } - if inner.poll_completed(entries) > 0 { + if self.poll_completed(entries) > 0 { return Ok(entries.len()); } - inner.poll(timeout)?; - Ok(inner.poll_completed(entries)) + self.poll_impl(timeout)?; + Ok(self.poll_completed(entries)) + } +} + +impl AsRawFd for Driver { + fn as_raw_fd(&self) -> RawFd { + self.poll.as_raw_fd() } } From 14781259b69d3614c01baa643c34bc6897bc845f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 20:15:24 +0800 Subject: [PATCH 14/24] Fix return value of poll. --- src/driver/mio/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index 136b58a5..4c33b701 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -229,8 +229,9 @@ impl Poller for Driver { if entries.is_empty() { return Ok(0); } - if self.poll_completed(entries) > 0 { - return Ok(entries.len()); + let len = self.poll_completed(entries); + if len > 0 { + return Ok(len); } self.poll_impl(timeout)?; Ok(self.poll_completed(entries)) From cbfd0a11796cc7eb942af7d84bccb288ecdc682f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 20:33:42 +0800 Subject: [PATCH 15/24] Avoid comparing SockAddr. --- src/net/tcp.rs | 4 ++-- src/net/udp.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 97f0e82c..345d7abe 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -92,8 +92,8 @@ impl TcpListener { /// /// let addr = listener.local_addr().expect("Couldn't get local address"); /// assert_eq!( - /// addr, - /// SockAddr::from(SocketAddr::V4(SocketAddrV4::new( + /// addr.as_socket().unwrap(), + /// SocketAddr::from(SocketAddr::V4(SocketAddrV4::new( /// Ipv4Addr::new(127, 0, 0, 1), /// 8080 /// ))) diff --git a/src/net/udp.rs b/src/net/udp.rs index e1981e01..e22011d4 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -159,10 +159,10 @@ impl UdpSocket { /// use compio::net::UdpSocket; /// use socket2::SockAddr; /// - /// let addr: SockAddr = "127.0.0.1:8080".parse::().unwrap().into(); + /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); /// let sock = UdpSocket::bind(&addr).unwrap(); /// // the address the socket is bound to - /// let local_addr = sock.local_addr().unwrap(); + /// let local_addr = sock.local_addr().unwrap().as_socket().unwrap(); /// assert_eq!(local_addr, addr); /// ``` pub fn local_addr(&self) -> io::Result { From 66c4d7969efef159ec6acb07b559c951d53fd1fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 20:46:20 +0800 Subject: [PATCH 16/24] Fix event. --- src/event/mio.rs | 25 +++++++++++++++---------- src/event/mod.rs | 2 +- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/event/mio.rs b/src/event/mio.rs index 61ce05b3..f710dc11 100644 --- a/src/event/mio.rs +++ b/src/event/mio.rs @@ -1,44 +1,49 @@ use std::{ io, - os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd}, + os::fd::{AsRawFd, BorrowedFd, RawFd}, }; -use crate::{impl_raw_fd, op::ReadAt, task::RUNTIME}; +use mio::unix::pipe::{Receiver, Sender}; + +use crate::{op::Recv, task::RUNTIME}; /// An event that won't wake until [`EventHandle::notify`] is called /// successfully. #[derive(Debug)] pub struct Event { - fd: OwnedFd, + sender: Sender, + receiver: Receiver, } impl Event { /// Create [`Event`]. pub fn new() -> io::Result { let (sender, receiver) = mio::unix::pipe::new()?; - receiver.into_raw_fd(); // Preventing from closing. - Ok(Self { - fd: unsafe { OwnedFd::from_raw_fd(sender.into_raw_fd()) }, - }) + Ok(Self { sender, receiver }) } /// Get a notify handle. pub fn handle(&self) -> EventHandle { - EventHandle::new(self.fd.as_fd()) + EventHandle::new(unsafe { BorrowedFd::borrow_raw(self.sender.as_raw_fd()) }) } /// Wait for [`EventHandle::notify`] called. pub async fn wait(&self) -> io::Result<()> { let buffer = Vec::with_capacity(8); - let op = ReadAt::new(self.as_raw_fd(), 0, buffer); + // Trick: Recv uses readv which doesn't seek. + let op = Recv::new(self.receiver.as_raw_fd(), buffer); let (res, _) = RUNTIME.with(|runtime| runtime.submit(op)).await; res?; Ok(()) } } -impl_raw_fd!(Event, fd); +impl AsRawFd for Event { + fn as_raw_fd(&self) -> RawFd { + self.receiver.as_raw_fd() + } +} /// A handle to [`Event`]. pub struct EventHandle<'a> { diff --git a/src/event/mod.rs b/src/event/mod.rs index a353e651..ea31b29a 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -9,7 +9,7 @@ cfg_if::cfg_if! { } else if #[cfg(target_os = "linux")] { mod iour; pub use iour::*; - } else if #[cfg(all(unix, not(target_os = "linux")))] { + } else if #[cfg(unix)] { mod mio; pub use self::mio::*; } From 1c8e5d7c35a206405041ceccff243aa3b52ca49b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 20:57:05 +0800 Subject: [PATCH 17/24] Revert change of event test. --- tests/event.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/tests/event.rs b/tests/event.rs index 5a99adb8..14b8c5ff 100644 --- a/tests/event.rs +++ b/tests/event.rs @@ -2,19 +2,12 @@ use compio::event::Event; #[test] fn event_handle() { - let event = Event::new().unwrap(); - - std::thread::scope(|scope| { + compio::task::block_on(async { + let event = Event::new().unwrap(); let handle = event.handle(); - let wait = event.wait(); - scope.spawn(move || { - std::thread::sleep(std::time::Duration::from_secs(1)); - handle.notify().unwrap() - }); - scope.spawn(move || { - compio::task::block_on(async { - wait.await.unwrap(); - }) + std::thread::scope(|scope| { + scope.spawn(|| handle.notify().unwrap()); }); + event.wait().await.unwrap(); }); } From 2bb38a10676cd4956a7005a49b4bfe05cc27f3cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 21:01:08 +0800 Subject: [PATCH 18/24] Add mac doc build to CI. --- Cargo.toml | 6 +++++- azure-pipelines.yml | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1a82bf6e..d113d3db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,11 @@ repository = "https://github.com/Berrysoft/compio" [package.metadata.docs.rs] all-features = true default-target = "x86_64-pc-windows-msvc" -targets = ["x86_64-pc-windows-msvc", "x86_64-unknown-linux-gnu"] +targets = [ + "x86_64-pc-windows-msvc", + "x86_64-unknown-linux-gnu", + "x86_64-apple-darwin", +] # Shared dependencies for all platforms [dependencies] diff --git a/azure-pipelines.yml b/azure-pipelines.yml index f20b2a5f..64614160 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -71,11 +71,13 @@ jobs: image: windows-latest linux: image: ubuntu-latest + macos: + image: macOS-latest pool: vmImage: $(image) steps: - script: | rustup toolchain install nightly - cargo +nightly doc --features all --no-deps + cargo +nightly doc --all-features --no-deps displayName: Build docs From 2cc77b9afff03bb3107ffa4b2cdeb285b1951362 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 21:33:20 +0800 Subject: [PATCH 19/24] Deregister correctly. --- Cargo.toml | 1 - src/driver/mio/mod.rs | 33 ++++++++++++++++++++++----------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d113d3db..02de64a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,6 @@ libc = "0.2" # Other platform dependencies [target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] mio = { version = "0.8.8", features = ["os-ext"] } -slab = { version = "0.4" } libc = "0.2" [features] diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index 4c33b701..6f8834c2 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -1,6 +1,12 @@ #[doc(no_inline)] pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use std::{collections::VecDeque, io, mem::MaybeUninit, ops::ControlFlow, time::Duration}; +use std::{ + collections::{HashMap, VecDeque}, + io, + mem::MaybeUninit, + ops::ControlFlow, + time::Duration, +}; pub(crate) use libc::{sockaddr_storage, socklen_t}; use mio::{ @@ -8,7 +14,6 @@ use mio::{ unix::SourceFd, Events, Interest, Poll, Token, }; -use slab::Slab; use crate::driver::{Entry, Poller}; @@ -63,7 +68,7 @@ pub struct Driver { cqueue: VecDeque, events: Events, poll: Poll, - waiting: Slab, + waiting: HashMap, } /// Entry in squeue @@ -124,20 +129,20 @@ impl Driver { cqueue: VecDeque::with_capacity(entries), events: Events::with_capacity(entries), poll: Poll::new()?, - waiting: Slab::new(), + waiting: HashMap::new(), }) } } impl Driver { fn submit(&mut self, entry: MioEntry, arg: WaitArg) -> io::Result<()> { - let slot = self.waiting.vacant_entry(); - let token = Token(slot.key()); + let token = Token(entry.user_data); SourceFd(&arg.fd).register(self.poll.registry(), token, arg.interest)?; // Only insert the entry after it was registered successfully - slot.insert(WaitEntry::new(entry, arg)); + self.waiting + .insert(entry.user_data, WaitEntry::new(entry, arg)); Ok(()) } @@ -170,17 +175,23 @@ impl Driver { let token = event.token(); let entry = self .waiting - .get_mut(token.0) + .get_mut(&token.0) .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? match entry.op_mut().on_event(event) { Ok(ControlFlow::Continue(_)) => {} Ok(ControlFlow::Break(res)) => { self.cqueue.push_back(Entry::new(entry.user_data, Ok(res))); - self.waiting.remove(token.0); + self.poll + .registry() + .deregister(&mut SourceFd(&entry.arg.fd))?; + self.waiting.remove(&token.0); } Err(err) => { self.cqueue.push_back(Entry::new(entry.user_data, Err(err))); - self.waiting.remove(token.0); + self.poll + .registry() + .deregister(&mut SourceFd(&entry.arg.fd))?; + self.waiting.remove(&token.0); } } } @@ -211,7 +222,7 @@ impl Poller for Driver { } fn cancel(&mut self, user_data: usize) { - let Some(entry) = self.waiting.try_remove(user_data) else { + let Some(entry) = self.waiting.remove(&user_data) else { return; }; self.poll From 892f04e5de02bdc1f0e5fb9e74982e568c2e2e12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 21:35:00 +0800 Subject: [PATCH 20/24] Move unix_op to unix::op. --- src/driver/iour/op.rs | 2 +- src/driver/mio/op.rs | 2 +- src/driver/mod.rs | 2 +- src/driver/unix/mod.rs | 1 + src/driver/{unix_op.rs => unix/op.rs} | 0 5 files changed, 4 insertions(+), 3 deletions(-) create mode 100644 src/driver/unix/mod.rs rename src/driver/{unix_op.rs => unix/op.rs} (100%) diff --git a/src/driver/iour/op.rs b/src/driver/iour/op.rs index f74a170e..e733bc88 100644 --- a/src/driver/iour/op.rs +++ b/src/driver/iour/op.rs @@ -5,7 +5,7 @@ use io_uring::{ }; use libc::sockaddr_storage; -pub use crate::driver::unix_op::*; +pub use crate::driver::unix::op::*; use crate::{ buf::{AsIoSlices, AsIoSlicesMut, IoBuf, IoBufMut}, driver::OpCode, diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs index 98dd1fdc..7850d451 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/mio/op.rs @@ -2,7 +2,7 @@ use std::{io, ops::ControlFlow}; use mio::event::Event; -pub use crate::driver::unix_op::*; +pub use crate::driver::unix::op::*; use crate::{ buf::{AsIoSlices, AsIoSlicesMut, IoBuf, IoBufMut}, driver::{Decision, OpCode}, diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 22f2a769..d91f196a 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -3,7 +3,7 @@ use std::{io, mem::MaybeUninit, time::Duration}; #[cfg(unix)] -mod unix_op; +mod unix; cfg_if::cfg_if! { if #[cfg(target_os = "windows")] { diff --git a/src/driver/unix/mod.rs b/src/driver/unix/mod.rs new file mode 100644 index 00000000..c99151af --- /dev/null +++ b/src/driver/unix/mod.rs @@ -0,0 +1 @@ +pub(crate) mod op; diff --git a/src/driver/unix_op.rs b/src/driver/unix/op.rs similarity index 100% rename from src/driver/unix_op.rs rename to src/driver/unix/op.rs From 51257b9f9add290036f0f406e0560db8b52bbd91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 21:40:54 +0800 Subject: [PATCH 21/24] Extract msg_hdr set method. --- src/driver/iour/op.rs | 22 ++-------------------- src/driver/mio/op.rs | 22 ++-------------------- src/driver/unix/op.rs | 26 ++++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 40 deletions(-) diff --git a/src/driver/iour/op.rs b/src/driver/iour/op.rs index e733bc88..0aac5b7b 100644 --- a/src/driver/iour/op.rs +++ b/src/driver/iour/op.rs @@ -86,16 +86,7 @@ impl OpCode for SendImpl { impl OpCode for RecvFromImpl { #[allow(clippy::no_effect)] fn create_entry(&mut self) -> Entry { - self.slices = unsafe { self.buffer.as_io_slices_mut() }; - self.msg = libc::msghdr { - msg_name: &mut self.addr as *mut _ as _, - msg_namelen: 128, - msg_iov: self.slices.as_mut_ptr() as _, - msg_iovlen: self.slices.len(), - msg_control: std::ptr::null_mut(), - msg_controllen: 0, - msg_flags: 0, - }; + self.set_msg(); opcode::RecvMsg::new(Fd(self.fd), &mut self.msg).build() } } @@ -103,16 +94,7 @@ impl OpCode for RecvFromImpl { impl OpCode for SendToImpl { #[allow(clippy::no_effect)] fn create_entry(&mut self) -> Entry { - self.slices = unsafe { self.buffer.as_io_slices() }; - self.msg = libc::msghdr { - msg_name: self.addr.as_ptr() as _, - msg_namelen: self.addr.len(), - msg_iov: self.slices.as_mut_ptr() as _, - msg_iovlen: self.slices.len(), - msg_control: std::ptr::null_mut(), - msg_controllen: 0, - msg_flags: 0, - }; + self.set_msg(); opcode::SendMsg::new(Fd(self.fd), &self.msg).build() } } diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs index 7850d451..94f1b1b4 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/mio/op.rs @@ -206,16 +206,7 @@ impl OpCode for SendImpl { impl OpCode for RecvFromImpl { fn pre_submit(&mut self) -> io::Result { - self.slices = unsafe { self.buffer.as_io_slices_mut() }; - self.msg = libc::msghdr { - msg_name: &mut self.addr as *mut _ as _, - msg_namelen: 128, - msg_iov: self.slices.as_mut_ptr() as _, - msg_iovlen: self.slices.len() as _, - msg_control: std::ptr::null_mut(), - msg_controllen: 0, - msg_flags: 0, - }; + self.set_msg(); syscall_or_wait_readable!(recvmsg(self.fd, &mut self.msg, 0), self.fd) } @@ -228,16 +219,7 @@ impl OpCode for RecvFromImpl { impl OpCode for SendToImpl { fn pre_submit(&mut self) -> io::Result { - self.slices = unsafe { self.buffer.as_io_slices() }; - self.msg = libc::msghdr { - msg_name: &mut self.addr as *mut _ as _, - msg_namelen: 128, - msg_iov: self.slices.as_mut_ptr() as _, - msg_iovlen: self.slices.len() as _, - msg_control: std::ptr::null_mut(), - msg_controllen: 0, - msg_flags: 0, - }; + self.set_msg(); syscall_or_wait_writable!(sendmsg(self.fd, &self.msg, 0), self.fd) } diff --git a/src/driver/unix/op.rs b/src/driver/unix/op.rs index 08a175a4..2f868e09 100644 --- a/src/driver/unix/op.rs +++ b/src/driver/unix/op.rs @@ -51,6 +51,19 @@ impl RecvFromImpl { msg: unsafe { std::mem::zeroed() }, } } + + pub(crate) fn set_msg(&mut self) { + self.slices = unsafe { self.buffer.as_io_slices_mut() }; + self.msg = libc::msghdr { + msg_name: &mut self.addr as *mut _ as _, + msg_namelen: std::mem::size_of_val(&self.addr) as _, + msg_iov: self.slices.as_mut_ptr() as _, + msg_iovlen: self.slices.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + } } impl IntoInner for RecvFromImpl { @@ -81,6 +94,19 @@ impl SendToImpl { msg: unsafe { std::mem::zeroed() }, } } + + pub(crate) fn set_msg(&mut self) { + self.slices = unsafe { self.buffer.as_io_slices() }; + self.msg = libc::msghdr { + msg_name: self.addr.as_ptr() as _, + msg_namelen: self.addr.len(), + msg_iov: self.slices.as_mut_ptr() as _, + msg_iovlen: self.slices.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + } } impl IntoInner for SendToImpl { From 4d91e9667a803b1a554d8b563a067956ee297602 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 10 Sep 2023 21:47:00 +0800 Subject: [PATCH 22/24] Fix doc warnings. --- src/driver/unix/op.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/driver/unix/op.rs b/src/driver/unix/op.rs index 2f868e09..8e1a73c4 100644 --- a/src/driver/unix/op.rs +++ b/src/driver/unix/op.rs @@ -3,6 +3,8 @@ use std::io::{IoSlice, IoSliceMut}; use libc::{sockaddr_storage, socklen_t}; use socket2::SockAddr; +#[cfg(doc)] +use crate::op::*; use crate::{ buf::{AsIoSlices, AsIoSlicesMut, IntoInner, OneOrVec}, driver::RawFd, From f39a24b0b196afba5eeac1b8ba4e39a64c23e7f2 Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 14:10:09 -0400 Subject: [PATCH 23/24] refactor: code style --- src/driver/mio/mod.rs | 17 +++---- src/driver/mio/op.rs | 107 +++++++++++++++--------------------------- 2 files changed, 46 insertions(+), 78 deletions(-) diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs index 6f8834c2..c8b7ffb1 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/mio/mod.rs @@ -170,7 +170,6 @@ impl Driver { /// cqueue. fn poll_impl(&mut self, timeout: Option) -> io::Result<()> { self.poll.poll(&mut self.events, timeout)?; - // println!("events: {:?}", self.events); for event in &self.events { let token = event.token(); let entry = self @@ -178,22 +177,20 @@ impl Driver { .get_mut(&token.0) .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? match entry.op_mut().on_event(event) { - Ok(ControlFlow::Continue(_)) => {} + Ok(ControlFlow::Continue(_)) => { + continue; + } Ok(ControlFlow::Break(res)) => { self.cqueue.push_back(Entry::new(entry.user_data, Ok(res))); - self.poll - .registry() - .deregister(&mut SourceFd(&entry.arg.fd))?; - self.waiting.remove(&token.0); } Err(err) => { self.cqueue.push_back(Entry::new(entry.user_data, Err(err))); - self.poll - .registry() - .deregister(&mut SourceFd(&entry.arg.fd))?; - self.waiting.remove(&token.0); } } + self.poll + .registry() + .deregister(&mut SourceFd(&entry.arg.fd))?; + self.waiting.remove(&token.0); } Ok(()) } diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs index 94f1b1b4..ef649d5b 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/mio/op.rs @@ -9,7 +9,7 @@ use crate::{ op::*, }; -/// Helper macro to execute a system call that returns an `io::Result`. +/// Helper macro to execute a system call macro_rules! syscall { ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ #[allow(unused_unsafe)] @@ -20,43 +20,26 @@ macro_rules! syscall { Ok(res as usize) } }}; -} - -/// Helper macro to execute a system call that returns an `io::Result`. -macro_rules! syscall_break { - ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ - #[allow(unused_unsafe)] - let res = unsafe { ::libc::$fn($($arg, )*) }; - if res == -1 { - Err(::std::io::Error::last_os_error()) - } else { - Ok(ControlFlow::Break(res as _)) - } - }}; -} - -/// Execute a system call, if would block, wait for it to be readable. -macro_rules! syscall_or_wait_writable { - ($fn: ident ( $($arg: expr),* $(,)* ), $fd:expr) => {{ + (break $fn: ident ( $($arg: expr),* $(,)* )) => { + syscall!( $fn ( $($arg, )* )).map(ControlFlow::Break) + }; + ($fn: ident ( $($arg: expr),* $(,)* ) or wait_writable($fd:expr)) => { match syscall!( $fn ( $($arg, )* )) { Ok(fd) => Ok(Decision::Completed(fd)), - Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error().map_or(false, |code| code == libc::EINPROGRESS) + Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error() == Some(libc::EINPROGRESS) => Ok(Decision::wait_writable($fd)), Err(e) => Err(e), } - }}; -} - -/// Execute a system call, if would block, wait for it to be writable. -macro_rules! syscall_or_wait_readable { - ($fn: ident ( $($arg: expr),* $(,)* ), $fd:expr) => {{ + }; + ($fn: ident ( $($arg: expr),* $(,)* ) or wait_readable($fd:expr)) => { match syscall!( $fn ( $($arg, )* )) { Ok(fd) => Ok(Decision::Completed(fd)), - Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error().map_or(false, |code| code == libc::EINPROGRESS) + Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error() == Some(libc::EINPROGRESS) => Ok(Decision::wait_readable($fd)), Err(e) => Err(e), } - }}; + } + } impl OpCode for ReadAt { @@ -69,12 +52,14 @@ impl OpCode for ReadAt { let slice = self.buffer.as_uninit_slice(); - syscall_break!(pread( - self.fd, - slice.as_mut_ptr() as _, - slice.len() as _, - self.offset as _ - )) + syscall!( + break pread( + self.fd, + slice.as_mut_ptr() as _, + slice.len() as _, + self.offset as _ + ) + ) } } @@ -88,12 +73,14 @@ impl OpCode for WriteAt { let slice = self.buffer.as_slice(); - syscall_break!(pwrite( - self.fd, - slice.as_ptr() as _, - slice.len() as _, - self.offset as _ - )) + syscall!( + break pwrite( + self.fd, + slice.as_ptr() as _, + slice.len() as _, + self.offset as _ + ) + ) } } @@ -109,13 +96,12 @@ impl OpCode for Sync { impl OpCode for Accept { fn pre_submit(&mut self) -> io::Result { - syscall_or_wait_readable!( + syscall!( accept( self.fd, &mut self.buffer as *mut _ as *mut _, &mut self.addr_len - ), - self.fd + ) or wait_readable(self.fd) ) } @@ -128,13 +114,7 @@ impl OpCode for Accept { &mut self.addr_len )) { Ok(fd) => Ok(ControlFlow::Break(fd)), - Err(e) - if e.kind() == io::ErrorKind::WouldBlock - || e.raw_os_error() - .map_or(false, |code| code == libc::EINPROGRESS) => - { - Ok(ControlFlow::Continue(())) - } + Err(e) if e.raw_os_error() == Some(libc::EINPROGRESS) => Ok(ControlFlow::Continue(())), Err(e) => Err(e), } } @@ -142,9 +122,8 @@ impl OpCode for Accept { impl OpCode for Connect { fn pre_submit(&mut self) -> io::Result { - syscall_or_wait_writable!( - connect(self.fd, self.addr.as_ptr(), self.addr.len()), - self.fd + syscall!( + connect(self.fd, self.addr.as_ptr(), self.addr.len()) or wait_writable(self.fd) ) } @@ -179,11 +158,7 @@ impl OpCode for RecvImpl { debug_assert!(event.is_readable()); self.slices = unsafe { self.buffer.as_io_slices_mut() }; - syscall_break!(readv( - self.fd, - self.slices.as_ptr() as _, - self.slices.len() as _, - )) + syscall!(break readv(self.fd, self.slices.as_ptr() as _, self.slices.len() as _,)) } } @@ -196,36 +171,32 @@ impl OpCode for SendImpl { debug_assert!(event.is_writable()); self.slices = unsafe { self.buffer.as_io_slices() }; - syscall_break!(writev( - self.fd, - self.slices.as_ptr() as _, - self.slices.len() as _, - )) + syscall!(break writev(self.fd, self.slices.as_ptr() as _, self.slices.len() as _,)) } } impl OpCode for RecvFromImpl { fn pre_submit(&mut self) -> io::Result { self.set_msg(); - syscall_or_wait_readable!(recvmsg(self.fd, &mut self.msg, 0), self.fd) + syscall!(recvmsg(self.fd, &mut self.msg, 0) or wait_readable(self.fd)) } fn on_event(&mut self, event: &Event) -> std::io::Result> { debug_assert!(event.is_readable()); - syscall_break!(recvmsg(self.fd, &mut self.msg, 0)) + syscall!(break recvmsg(self.fd, &mut self.msg, 0)) } } impl OpCode for SendToImpl { fn pre_submit(&mut self) -> io::Result { self.set_msg(); - syscall_or_wait_writable!(sendmsg(self.fd, &self.msg, 0), self.fd) + syscall!(sendmsg(self.fd, &self.msg, 0) or wait_writable(self.fd)) } fn on_event(&mut self, event: &Event) -> std::io::Result> { - assert!(event.is_writable()); + debug_assert!(event.is_writable()); - syscall_break!(sendmsg(self.fd, &self.msg, 0)) + syscall!(break sendmsg(self.fd, &self.msg, 0)) } } From c5c38a1b1de35602094c8003662a0f7abaeed20b Mon Sep 17 00:00:00 2001 From: George Miao Date: Sun, 10 Sep 2023 14:13:04 -0400 Subject: [PATCH 24/24] doc: add some doc --- Cargo.toml | 8 ++++++++ README.md | 4 +++- src/driver/mod.rs | 3 ++- src/driver/unix/mod.rs | 3 +++ 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 02de64a2..5a6de363 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,14 @@ targets = [ "x86_64-pc-windows-msvc", "x86_64-unknown-linux-gnu", "x86_64-apple-darwin", + "aarch64-apple-ios", + "aarch64-linux-android", + "x86_64-unknown-dragonfly", + "x86_64-unknown-freebsd", + "x86_64-unknown-illumos", + "x86_64-unknown-linux-gnu", + "x86_64-unknown-netbsd", + "x86_64-unknown-openbsd", ] # Shared dependencies for all platforms diff --git a/README.md b/README.md index 636cefe7..024a8b5b 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![docs.rs](https://img.shields.io/badge/docs.rs-compio-latest)](https://docs.rs/compio) [![Azure DevOps builds](https://strawberry-vs.visualstudio.com/compio/_apis/build/status/Berrysoft.compio?branch=master)](https://strawberry-vs.visualstudio.com/compio/_build) -A thread-per-core Rust runtime with IOCP/io_uring. +A thread-per-core Rust runtime with IOCP/io_uring/mio. The name comes from "completion-based IO". This crate is inspired by [monoio](https://github.com/bytedance/monoio/). @@ -22,6 +22,7 @@ and `tokio` won't public APIs to control `mio` before `mio` reaches 1.0. ## Quick start With `runtime` feature enabled, we can use the high level APIs to perform fs & net IO. + ```rust,no_run use compio::{fs::File, task::block_on}; @@ -36,6 +37,7 @@ println!("{}", buffer); ``` While you can also control the low-level driver manually: + ```rust,no_run use compio::{ buf::IntoInner, diff --git a/src/driver/mod.rs b/src/driver/mod.rs index d91f196a..30e85488 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -84,7 +84,8 @@ pub trait Poller { /// /// # Safety /// - /// `op` should be alive until [`Poller::poll`] returns its result. + /// - `op` should be alive until [`Poller::poll`] returns its result. + /// - `user_data` should be unique. unsafe fn push(&mut self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()>; diff --git a/src/driver/unix/mod.rs b/src/driver/unix/mod.rs index c99151af..73890845 100644 --- a/src/driver/unix/mod.rs +++ b/src/driver/unix/mod.rs @@ -1 +1,4 @@ +//! This mod doesn't actually contain any driver, but meant to provide some +//! common op type and utilities for unix platform (for iour and mio). + pub(crate) mod op;