diff --git a/Cargo.toml b/Cargo.toml index bfc40470..5a6de363 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,21 @@ repository = "https://github.com/Berrysoft/compio" [package.metadata.docs.rs] all-features = true default-target = "x86_64-pc-windows-msvc" -targets = ["x86_64-pc-windows-msvc", "x86_64-unknown-linux-gnu"] +targets = [ + "x86_64-pc-windows-msvc", + "x86_64-unknown-linux-gnu", + "x86_64-apple-darwin", + "aarch64-apple-ios", + "aarch64-linux-android", + "x86_64-unknown-dragonfly", + "x86_64-unknown-freebsd", + "x86_64-unknown-illumos", + "x86_64-unknown-linux-gnu", + "x86_64-unknown-netbsd", + "x86_64-unknown-openbsd", +] +# Shared dependencies for all platforms [dependencies] async-task = { version = "4", optional = true } bytes = { version = "1", optional = true } @@ -25,12 +38,14 @@ once_cell = "1" slab = { version = "0.4", optional = true } socket2 = { version = "0.5", features = ["all"] } +# Shared dev dependencies for all platforms [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio"] } futures-channel = "0.3" tempfile = "3" tokio = { version = "1", features = ["fs", "io-util", "macros", "net", "rt"] } +# Windows specific dependencies [target.'cfg(target_os = "windows")'.dependencies] widestring = "1" windows-sys = { version = "0.48", features = [ @@ -45,13 +60,20 @@ windows-sys = { version = "0.48", features = [ "Win32_System_Threading", ] } +# Windows specific dev dependencies [target.'cfg(target_os = "windows")'.dev-dependencies] windows-sys = { version = "0.48", features = ["Win32_Security_Authorization"] } +# Linux specific dependencies [target.'cfg(target_os = "linux")'.dependencies] io-uring = "0.6" libc = "0.2" +# Other platform dependencies +[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] +mio = { version = "0.8.8", features = ["os-ext"] } +libc = "0.2" + [features] default = ["runtime"] runtime = ["dep:async-task", "dep:futures-util", "dep:slab"] diff --git a/README.md b/README.md index 636cefe7..024a8b5b 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![docs.rs](https://img.shields.io/badge/docs.rs-compio-latest)](https://docs.rs/compio) [![Azure DevOps builds](https://strawberry-vs.visualstudio.com/compio/_apis/build/status/Berrysoft.compio?branch=master)](https://strawberry-vs.visualstudio.com/compio/_build) -A thread-per-core Rust runtime with IOCP/io_uring. +A thread-per-core Rust runtime with IOCP/io_uring/mio. The name comes from "completion-based IO". This crate is inspired by [monoio](https://github.com/bytedance/monoio/). @@ -22,6 +22,7 @@ and `tokio` won't public APIs to control `mio` before `mio` reaches 1.0. ## Quick start With `runtime` feature enabled, we can use the high level APIs to perform fs & net IO. + ```rust,no_run use compio::{fs::File, task::block_on}; @@ -36,6 +37,7 @@ println!("{}", buffer); ``` While you can also control the low-level driver manually: + ```rust,no_run use compio::{ buf::IntoInner, diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b88e10bf..64614160 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -45,6 +45,25 @@ jobs: cargo test --features all displayName: TestStable + - job: Test_Mac + strategy: + matrix: + ventura: + image: macOS-13 + monterey: + image: macOS-12 + pool: + vmImage: $(image) + + steps: + - script: | + rustup toolchain install nightly + cargo +nightly test --features all,nightly --no-default-features + displayName: TestNightly + - script: | + cargo test --features all + displayName: TestStable + - job: Doc strategy: matrix: @@ -52,11 +71,13 @@ jobs: image: windows-latest linux: image: ubuntu-latest + macos: + image: macOS-latest pool: vmImage: $(image) steps: - script: | rustup toolchain install nightly - cargo +nightly doc --features all --no-deps + cargo +nightly doc --all-features --no-deps displayName: Build docs diff --git a/src/driver/iour/op.rs b/src/driver/iour/op.rs index bbaf6f79..0aac5b7b 100644 --- a/src/driver/iour/op.rs +++ b/src/driver/iour/op.rs @@ -1,16 +1,14 @@ -use std::io::{IoSlice, IoSliceMut}; - use io_uring::{ opcode, squeue::Entry, types::{Fd, FsyncFlags}, }; -use libc::{sockaddr_storage, socklen_t}; -use socket2::SockAddr; +use libc::sockaddr_storage; +pub use crate::driver::unix::op::*; use crate::{ - buf::{AsIoSlices, AsIoSlicesMut, IntoInner, IoBuf, IoBufMut, OneOrVec}, - driver::{OpCode, RawFd}, + buf::{AsIoSlices, AsIoSlicesMut, IoBuf, IoBufMut}, + driver::OpCode, op::*, }; @@ -44,29 +42,6 @@ impl OpCode for Sync { } } -/// Accept a connection. -pub struct Accept { - pub(crate) fd: RawFd, - pub(crate) buffer: sockaddr_storage, - pub(crate) addr_len: socklen_t, -} - -impl Accept { - /// Create [`Accept`]. - pub fn new(fd: RawFd) -> Self { - Self { - fd, - buffer: unsafe { std::mem::zeroed() }, - addr_len: std::mem::size_of::() as _, - } - } - - /// Get the remote address from the inner buffer. - pub fn into_addr(self) -> SockAddr { - unsafe { SockAddr::new(self.buffer, self.addr_len) } - } -} - impl OpCode for Accept { fn create_entry(&mut self) -> Entry { opcode::Accept::new( @@ -108,96 +83,18 @@ impl OpCode for SendImpl { } } -/// Receive data and source address. -pub struct RecvFromImpl { - pub(crate) fd: RawFd, - pub(crate) buffer: T, - pub(crate) addr: sockaddr_storage, - pub(crate) slices: OneOrVec>, - msg: libc::msghdr, -} - -impl RecvFromImpl { - /// Create [`RecvFrom`] or [`RecvFromVectored`]. - pub fn new(fd: RawFd, buffer: T::Inner) -> Self { - Self { - fd, - buffer: T::new(buffer), - addr: unsafe { std::mem::zeroed() }, - slices: OneOrVec::One(IoSliceMut::new(&mut [])), - msg: unsafe { std::mem::zeroed() }, - } - } -} - -impl IntoInner for RecvFromImpl { - type Inner = (T, sockaddr_storage, socklen_t); - - fn into_inner(self) -> Self::Inner { - (self.buffer, self.addr, self.msg.msg_namelen) - } -} - impl OpCode for RecvFromImpl { #[allow(clippy::no_effect)] fn create_entry(&mut self) -> Entry { - self.slices = unsafe { self.buffer.as_io_slices_mut() }; - self.msg = libc::msghdr { - msg_name: &mut self.addr as *mut _ as _, - msg_namelen: 128, - msg_iov: self.slices.as_mut_ptr() as _, - msg_iovlen: self.slices.len(), - msg_control: std::ptr::null_mut(), - msg_controllen: 0, - msg_flags: 0, - }; + self.set_msg(); opcode::RecvMsg::new(Fd(self.fd), &mut self.msg).build() } } -/// Send data to specified address. -pub struct SendToImpl { - pub(crate) fd: RawFd, - pub(crate) buffer: T, - pub(crate) addr: SockAddr, - pub(crate) slices: OneOrVec>, - msg: libc::msghdr, -} - -impl SendToImpl { - /// Create [`SendTo`] or [`SendToVectored`]. - pub fn new(fd: RawFd, buffer: T::Inner, addr: SockAddr) -> Self { - Self { - fd, - buffer: T::new(buffer), - addr, - slices: OneOrVec::One(IoSlice::new(&[])), - msg: unsafe { std::mem::zeroed() }, - } - } -} - -impl IntoInner for SendToImpl { - type Inner = T; - - fn into_inner(self) -> Self::Inner { - self.buffer - } -} - impl OpCode for SendToImpl { #[allow(clippy::no_effect)] fn create_entry(&mut self) -> Entry { - self.slices = unsafe { self.buffer.as_io_slices() }; - self.msg = libc::msghdr { - msg_name: self.addr.as_ptr() as _, - msg_namelen: self.addr.len(), - msg_iov: self.slices.as_mut_ptr() as _, - msg_iovlen: self.slices.len(), - msg_control: std::ptr::null_mut(), - msg_controllen: 0, - msg_flags: 0, - }; + self.set_msg(); opcode::SendMsg::new(Fd(self.fd), &self.msg).build() } } diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs new file mode 100644 index 00000000..c8b7ffb1 --- /dev/null +++ b/src/driver/mio/mod.rs @@ -0,0 +1,253 @@ +#[doc(no_inline)] +pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::{ + collections::{HashMap, VecDeque}, + io, + mem::MaybeUninit, + ops::ControlFlow, + time::Duration, +}; + +pub(crate) use libc::{sockaddr_storage, socklen_t}; +use mio::{ + event::{Event, Source}, + unix::SourceFd, + Events, Interest, Poll, Token, +}; + +use crate::driver::{Entry, Poller}; + +pub(crate) mod op; + +/// Abstraction of operations. +pub trait OpCode { + /// Perform the operation before submit, and return [`Decision`] to + /// indicate whether submitting the operation to mio is required. + fn pre_submit(&mut self) -> io::Result; + + /// Perform the operation after received corresponding + /// event. + fn on_event(&mut self, event: &Event) -> io::Result>; +} + +/// Result of [`OpCode::pre_submit`]. +pub enum Decision { + /// Instant operation, no need to submit + Completed(usize), + /// Async operation, needs to submit + Wait(WaitArg), +} + +impl Decision { + /// Decide to wait for the given fd with the given interest. + pub fn wait_for(fd: RawFd, interest: Interest) -> Self { + Self::Wait(WaitArg { fd, interest }) + } + + /// Decide to wait for the given fd to be readable. + pub fn wait_readable(fd: RawFd) -> Self { + Self::wait_for(fd, Interest::READABLE) + } + + /// Decide to wait for the given fd to be writable. + pub fn wait_writable(fd: RawFd) -> Self { + Self::wait_for(fd, Interest::WRITABLE) + } +} + +/// Meta of mio operations. +#[derive(Debug, Clone, Copy)] +pub struct WaitArg { + fd: RawFd, + interest: Interest, +} + +/// Low-level driver of mio. +pub struct Driver { + squeue: VecDeque, + cqueue: VecDeque, + events: Events, + poll: Poll, + waiting: HashMap, +} + +/// Entry in squeue +#[derive(Debug)] +struct MioEntry { + op: *mut dyn OpCode, + user_data: usize, +} + +impl MioEntry { + /// Safety: Caller mut guarantee that the op will live until it is + /// completed. + unsafe fn new(op: &mut (impl OpCode + 'static), user_data: usize) -> Self { + Self { + op: op as *mut dyn OpCode, + user_data, + } + } + + fn op_mut(&mut self) -> &mut dyn OpCode { + unsafe { &mut *self.op } + } +} + +/// Entry waiting for events +struct WaitEntry { + op: *mut dyn OpCode, + arg: WaitArg, + user_data: usize, +} + +impl WaitEntry { + fn new(mio_entry: MioEntry, arg: WaitArg) -> Self { + Self { + op: mio_entry.op, + arg, + user_data: mio_entry.user_data, + } + } + + fn op_mut(&mut self) -> &mut dyn OpCode { + unsafe { &mut *self.op } + } +} + +impl Driver { + /// Create a new mio driver with 1024 entries. + pub fn new() -> io::Result { + Self::with_entries(1024) + } + + /// Create a new mio driver with the given number of entries. + pub fn with_entries(entries: u32) -> io::Result { + let entries = entries as usize; // for the sake of consistency, use u32 like iour + + Ok(Self { + squeue: VecDeque::with_capacity(entries), + cqueue: VecDeque::with_capacity(entries), + events: Events::with_capacity(entries), + poll: Poll::new()?, + waiting: HashMap::new(), + }) + } +} + +impl Driver { + fn submit(&mut self, entry: MioEntry, arg: WaitArg) -> io::Result<()> { + let token = Token(entry.user_data); + + SourceFd(&arg.fd).register(self.poll.registry(), token, arg.interest)?; + + // Only insert the entry after it was registered successfully + self.waiting + .insert(entry.user_data, WaitEntry::new(entry, arg)); + + Ok(()) + } + + /// Register all operations in the squeue to mio. + fn submit_squeue(&mut self) -> io::Result<()> { + while let Some(mut entry) = self.squeue.pop_front() { + match entry.op_mut().pre_submit() { + Ok(Decision::Wait(arg)) => { + self.submit(entry, arg)?; + } + Ok(Decision::Completed(res)) => { + self.cqueue.push_back(Entry::new(entry.user_data, Ok(res))); + } + Err(err) => { + self.cqueue.push_back(Entry::new(entry.user_data, Err(err))); + } + } + } + + Ok(()) + } + + /// Poll all events from mio, call `perform` on op and push them into + /// cqueue. + fn poll_impl(&mut self, timeout: Option) -> io::Result<()> { + self.poll.poll(&mut self.events, timeout)?; + for event in &self.events { + let token = event.token(); + let entry = self + .waiting + .get_mut(&token.0) + .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? + match entry.op_mut().on_event(event) { + Ok(ControlFlow::Continue(_)) => { + continue; + } + Ok(ControlFlow::Break(res)) => { + self.cqueue.push_back(Entry::new(entry.user_data, Ok(res))); + } + Err(err) => { + self.cqueue.push_back(Entry::new(entry.user_data, Err(err))); + } + } + self.poll + .registry() + .deregister(&mut SourceFd(&entry.arg.fd))?; + self.waiting.remove(&token.0); + } + Ok(()) + } + + fn poll_completed(&mut self, entries: &mut [MaybeUninit]) -> usize { + let len = self.cqueue.len().min(entries.len()); + for entry in &mut entries[..len] { + entry.write(self.cqueue.pop_front().unwrap()); + } + len + } +} + +impl Poller for Driver { + fn attach(&mut self, _fd: RawFd) -> io::Result<()> { + Ok(()) + } + + unsafe fn push( + &mut self, + op: &mut (impl OpCode + 'static), + user_data: usize, + ) -> io::Result<()> { + self.squeue.push_back(MioEntry::new(op, user_data)); + Ok(()) + } + + fn cancel(&mut self, user_data: usize) { + let Some(entry) = self.waiting.remove(&user_data) else { + return; + }; + self.poll + .registry() + .deregister(&mut SourceFd(&entry.arg.fd)) + .ok(); + } + + fn poll( + &mut self, + timeout: Option, + entries: &mut [MaybeUninit], + ) -> io::Result { + self.submit_squeue()?; + if entries.is_empty() { + return Ok(0); + } + let len = self.poll_completed(entries); + if len > 0 { + return Ok(len); + } + self.poll_impl(timeout)?; + Ok(self.poll_completed(entries)) + } +} + +impl AsRawFd for Driver { + fn as_raw_fd(&self) -> RawFd { + self.poll.as_raw_fd() + } +} diff --git a/src/driver/mio/op.rs b/src/driver/mio/op.rs new file mode 100644 index 00000000..ef649d5b --- /dev/null +++ b/src/driver/mio/op.rs @@ -0,0 +1,202 @@ +use std::{io, ops::ControlFlow}; + +use mio::event::Event; + +pub use crate::driver::unix::op::*; +use crate::{ + buf::{AsIoSlices, AsIoSlicesMut, IoBuf, IoBufMut}, + driver::{Decision, OpCode}, + op::*, +}; + +/// Helper macro to execute a system call +macro_rules! syscall { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + #[allow(unused_unsafe)] + let res = unsafe { ::libc::$fn($($arg, )*) }; + if res == -1 { + Err(::std::io::Error::last_os_error()) + } else { + Ok(res as usize) + } + }}; + (break $fn: ident ( $($arg: expr),* $(,)* )) => { + syscall!( $fn ( $($arg, )* )).map(ControlFlow::Break) + }; + ($fn: ident ( $($arg: expr),* $(,)* ) or wait_writable($fd:expr)) => { + match syscall!( $fn ( $($arg, )* )) { + Ok(fd) => Ok(Decision::Completed(fd)), + Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error() == Some(libc::EINPROGRESS) + => Ok(Decision::wait_writable($fd)), + Err(e) => Err(e), + } + }; + ($fn: ident ( $($arg: expr),* $(,)* ) or wait_readable($fd:expr)) => { + match syscall!( $fn ( $($arg, )* )) { + Ok(fd) => Ok(Decision::Completed(fd)), + Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error() == Some(libc::EINPROGRESS) + => Ok(Decision::wait_readable($fd)), + Err(e) => Err(e), + } + } + +} + +impl OpCode for ReadAt { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_readable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result> { + debug_assert!(event.is_readable()); + + let slice = self.buffer.as_uninit_slice(); + + syscall!( + break pread( + self.fd, + slice.as_mut_ptr() as _, + slice.len() as _, + self.offset as _ + ) + ) + } +} + +impl OpCode for WriteAt { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_writable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result> { + debug_assert!(event.is_writable()); + + let slice = self.buffer.as_slice(); + + syscall!( + break pwrite( + self.fd, + slice.as_ptr() as _, + slice.len() as _, + self.offset as _ + ) + ) + } +} + +impl OpCode for Sync { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::Completed(syscall!(fsync(self.fd))?)) + } + + fn on_event(&mut self, _: &Event) -> std::io::Result> { + unreachable!("Sync operation should not be submitted to mio") + } +} + +impl OpCode for Accept { + fn pre_submit(&mut self) -> io::Result { + syscall!( + accept( + self.fd, + &mut self.buffer as *mut _ as *mut _, + &mut self.addr_len + ) or wait_readable(self.fd) + ) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result> { + debug_assert!(event.is_readable()); + + match syscall!(accept( + self.fd, + &mut self.buffer as *mut _ as *mut _, + &mut self.addr_len + )) { + Ok(fd) => Ok(ControlFlow::Break(fd)), + Err(e) if e.raw_os_error() == Some(libc::EINPROGRESS) => Ok(ControlFlow::Continue(())), + Err(e) => Err(e), + } + } +} + +impl OpCode for Connect { + fn pre_submit(&mut self) -> io::Result { + syscall!( + connect(self.fd, self.addr.as_ptr(), self.addr.len()) or wait_writable(self.fd) + ) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result> { + debug_assert!(event.is_writable()); + + let mut err: libc::c_int = 0; + let mut err_len = std::mem::size_of::() as libc::socklen_t; + + syscall!(getsockopt( + self.fd, + libc::SOL_SOCKET, + libc::SO_ERROR, + &mut err as *mut _ as *mut _, + &mut err_len + ))?; + + if err == 0 { + Ok(ControlFlow::Break(0)) + } else { + Err(io::Error::from_raw_os_error(err)) + } + } +} + +impl OpCode for RecvImpl { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_readable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result> { + debug_assert!(event.is_readable()); + + self.slices = unsafe { self.buffer.as_io_slices_mut() }; + syscall!(break readv(self.fd, self.slices.as_ptr() as _, self.slices.len() as _,)) + } +} + +impl OpCode for SendImpl { + fn pre_submit(&mut self) -> io::Result { + Ok(Decision::wait_writable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result> { + debug_assert!(event.is_writable()); + + self.slices = unsafe { self.buffer.as_io_slices() }; + syscall!(break writev(self.fd, self.slices.as_ptr() as _, self.slices.len() as _,)) + } +} + +impl OpCode for RecvFromImpl { + fn pre_submit(&mut self) -> io::Result { + self.set_msg(); + syscall!(recvmsg(self.fd, &mut self.msg, 0) or wait_readable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result> { + debug_assert!(event.is_readable()); + + syscall!(break recvmsg(self.fd, &mut self.msg, 0)) + } +} + +impl OpCode for SendToImpl { + fn pre_submit(&mut self) -> io::Result { + self.set_msg(); + syscall!(sendmsg(self.fd, &self.msg, 0) or wait_writable(self.fd)) + } + + fn on_event(&mut self, event: &Event) -> std::io::Result> { + debug_assert!(event.is_writable()); + + syscall!(break sendmsg(self.fd, &self.msg, 0)) + } +} diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 51bb41d0..30e85488 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -2,6 +2,8 @@ //! Some types differ by compilation target. use std::{io, mem::MaybeUninit, time::Duration}; +#[cfg(unix)] +mod unix; cfg_if::cfg_if! { if #[cfg(target_os = "windows")] { @@ -10,6 +12,9 @@ cfg_if::cfg_if! { } else if #[cfg(target_os = "linux")] { mod iour; pub use iour::*; + } else if #[cfg(unix)]{ + mod mio; + pub use self::mio::*; } } @@ -80,6 +85,7 @@ pub trait Poller { /// # Safety /// /// - `op` should be alive until [`Poller::poll`] returns its result. + /// - `user_data` should be unique. unsafe fn push(&mut self, op: &mut (impl OpCode + 'static), user_data: usize) -> io::Result<()>; diff --git a/src/driver/unix/mod.rs b/src/driver/unix/mod.rs new file mode 100644 index 00000000..73890845 --- /dev/null +++ b/src/driver/unix/mod.rs @@ -0,0 +1,4 @@ +//! This mod doesn't actually contain any driver, but meant to provide some +//! common op type and utilities for unix platform (for iour and mio). + +pub(crate) mod op; diff --git a/src/driver/unix/op.rs b/src/driver/unix/op.rs new file mode 100644 index 00000000..8e1a73c4 --- /dev/null +++ b/src/driver/unix/op.rs @@ -0,0 +1,120 @@ +use std::io::{IoSlice, IoSliceMut}; + +use libc::{sockaddr_storage, socklen_t}; +use socket2::SockAddr; + +#[cfg(doc)] +use crate::op::*; +use crate::{ + buf::{AsIoSlices, AsIoSlicesMut, IntoInner, OneOrVec}, + driver::RawFd, +}; + +/// Accept a connection. +pub struct Accept { + pub(crate) fd: RawFd, + pub(crate) buffer: sockaddr_storage, + pub(crate) addr_len: socklen_t, +} + +impl Accept { + /// Create [`Accept`]. + pub fn new(fd: RawFd) -> Self { + Self { + fd, + buffer: unsafe { std::mem::zeroed() }, + addr_len: std::mem::size_of::() as _, + } + } + + /// Get the remote address from the inner buffer. + pub fn into_addr(self) -> SockAddr { + unsafe { SockAddr::new(self.buffer, self.addr_len) } + } +} + +/// Receive data and source address. +pub struct RecvFromImpl { + pub(crate) fd: RawFd, + pub(crate) buffer: T, + pub(crate) addr: sockaddr_storage, + pub(crate) slices: OneOrVec>, + pub(crate) msg: libc::msghdr, +} + +impl RecvFromImpl { + /// Create [`RecvFrom`] or [`RecvFromVectored`]. + pub fn new(fd: RawFd, buffer: T::Inner) -> Self { + Self { + fd, + buffer: T::new(buffer), + addr: unsafe { std::mem::zeroed() }, + slices: OneOrVec::One(IoSliceMut::new(&mut [])), + msg: unsafe { std::mem::zeroed() }, + } + } + + pub(crate) fn set_msg(&mut self) { + self.slices = unsafe { self.buffer.as_io_slices_mut() }; + self.msg = libc::msghdr { + msg_name: &mut self.addr as *mut _ as _, + msg_namelen: std::mem::size_of_val(&self.addr) as _, + msg_iov: self.slices.as_mut_ptr() as _, + msg_iovlen: self.slices.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + } +} + +impl IntoInner for RecvFromImpl { + type Inner = (T, sockaddr_storage, socklen_t); + + fn into_inner(self) -> Self::Inner { + (self.buffer, self.addr, self.msg.msg_namelen) + } +} + +/// Send data to specified address. +pub struct SendToImpl { + pub(crate) fd: RawFd, + pub(crate) buffer: T, + pub(crate) addr: SockAddr, + pub(crate) slices: OneOrVec>, + pub(crate) msg: libc::msghdr, +} + +impl SendToImpl { + /// Create [`SendTo`] or [`SendToVectored`]. + pub fn new(fd: RawFd, buffer: T::Inner, addr: SockAddr) -> Self { + Self { + fd, + buffer: T::new(buffer), + addr, + slices: OneOrVec::One(IoSlice::new(&[])), + msg: unsafe { std::mem::zeroed() }, + } + } + + pub(crate) fn set_msg(&mut self) { + self.slices = unsafe { self.buffer.as_io_slices() }; + self.msg = libc::msghdr { + msg_name: self.addr.as_ptr() as _, + msg_namelen: self.addr.len(), + msg_iov: self.slices.as_mut_ptr() as _, + msg_iovlen: self.slices.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + } +} + +impl IntoInner for SendToImpl { + type Inner = T; + + fn into_inner(self) -> Self::Inner { + self.buffer + } +} diff --git a/src/event/mio.rs b/src/event/mio.rs new file mode 100644 index 00000000..f710dc11 --- /dev/null +++ b/src/event/mio.rs @@ -0,0 +1,74 @@ +use std::{ + io, + os::fd::{AsRawFd, BorrowedFd, RawFd}, +}; + +use mio::unix::pipe::{Receiver, Sender}; + +use crate::{op::Recv, task::RUNTIME}; + +/// An event that won't wake until [`EventHandle::notify`] is called +/// successfully. +#[derive(Debug)] +pub struct Event { + sender: Sender, + receiver: Receiver, +} + +impl Event { + /// Create [`Event`]. + pub fn new() -> io::Result { + let (sender, receiver) = mio::unix::pipe::new()?; + + Ok(Self { sender, receiver }) + } + + /// Get a notify handle. + pub fn handle(&self) -> EventHandle { + EventHandle::new(unsafe { BorrowedFd::borrow_raw(self.sender.as_raw_fd()) }) + } + + /// Wait for [`EventHandle::notify`] called. + pub async fn wait(&self) -> io::Result<()> { + let buffer = Vec::with_capacity(8); + // Trick: Recv uses readv which doesn't seek. + let op = Recv::new(self.receiver.as_raw_fd(), buffer); + let (res, _) = RUNTIME.with(|runtime| runtime.submit(op)).await; + res?; + Ok(()) + } +} + +impl AsRawFd for Event { + fn as_raw_fd(&self) -> RawFd { + self.receiver.as_raw_fd() + } +} + +/// A handle to [`Event`]. +pub struct EventHandle<'a> { + fd: BorrowedFd<'a>, +} + +impl<'a> EventHandle<'a> { + pub(crate) fn new(fd: BorrowedFd<'a>) -> Self { + Self { fd } + } + + /// Notify the event. + pub fn notify(&self) -> io::Result<()> { + let data = 1u64; + let res = unsafe { + libc::write( + self.fd.as_raw_fd(), + &data as *const _ as *const _, + std::mem::size_of::(), + ) + }; + if res < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } +} diff --git a/src/event/mod.rs b/src/event/mod.rs index d17ca66f..ea31b29a 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -9,5 +9,8 @@ cfg_if::cfg_if! { } else if #[cfg(target_os = "linux")] { mod iour; pub use iour::*; + } else if #[cfg(unix)] { + mod mio; + pub use self::mio::*; } } diff --git a/src/net/socket.rs b/src/net/socket.rs index b8b11d89..4e9111d4 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -78,7 +78,10 @@ impl Socket { } pub fn new(domain: Domain, ty: Type, protocol: Option) -> io::Result { - Self::from_socket2(Socket2::new(domain, ty, protocol)?) + let socket = Socket2::new(domain, ty, protocol)?; + #[cfg(unix)] + socket.set_nonblocking(true)?; + Self::from_socket2(socket) } pub fn bind(addr: &SockAddr, ty: Type, protocol: Option) -> io::Result { @@ -109,13 +112,13 @@ impl Socket { _op.update_context()?; Ok(()) } - #[cfg(target_os = "linux")] + #[cfg(unix)] { res.map(|_| ()) } } - #[cfg(all(feature = "runtime", target_os = "linux"))] + #[cfg(all(feature = "runtime", unix))] pub async fn accept(&self) -> io::Result<(Self, SockAddr)> { use std::os::fd::FromRawFd; diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 97f0e82c..345d7abe 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -92,8 +92,8 @@ impl TcpListener { /// /// let addr = listener.local_addr().expect("Couldn't get local address"); /// assert_eq!( - /// addr, - /// SockAddr::from(SocketAddr::V4(SocketAddrV4::new( + /// addr.as_socket().unwrap(), + /// SocketAddr::from(SocketAddr::V4(SocketAddrV4::new( /// Ipv4Addr::new(127, 0, 0, 1), /// 8080 /// ))) diff --git a/src/net/udp.rs b/src/net/udp.rs index e1981e01..e22011d4 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -159,10 +159,10 @@ impl UdpSocket { /// use compio::net::UdpSocket; /// use socket2::SockAddr; /// - /// let addr: SockAddr = "127.0.0.1:8080".parse::().unwrap().into(); + /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); /// let sock = UdpSocket::bind(&addr).unwrap(); /// // the address the socket is bound to - /// let local_addr = sock.local_addr().unwrap(); + /// let local_addr = sock.local_addr().unwrap().as_socket().unwrap(); /// assert_eq!(local_addr, addr); /// ``` pub fn local_addr(&self) -> io::Result { diff --git a/src/op.rs b/src/op.rs index 0cbbdb96..14ab5a28 100644 --- a/src/op.rs +++ b/src/op.rs @@ -7,6 +7,9 @@ use std::io::{IoSlice, IoSliceMut}; use socket2::SockAddr; +#[cfg(target_os = "windows")] +pub use crate::driver::op::ConnectNamedPipe; +pub use crate::driver::op::{Accept, RecvFromImpl, SendToImpl}; use crate::{ buf::{ AsIoSlices, AsIoSlicesMut, BufWrapper, IntoInner, IoBuf, IoBufMut, OneOrVec, @@ -58,10 +61,6 @@ impl RecvResultExt for BufResult { } } -#[cfg(target_os = "windows")] -pub use crate::driver::op::ConnectNamedPipe; -pub use crate::driver::op::{Accept, RecvFromImpl, SendToImpl}; - /// Read a file at specified position into specified buffer. #[derive(Debug)] pub struct ReadAt { diff --git a/src/signal/mod.rs b/src/signal/mod.rs index 67c64460..ff7b8f83 100644 --- a/src/signal/mod.rs +++ b/src/signal/mod.rs @@ -20,11 +20,11 @@ pub mod windows; #[doc(no_inline)] pub use windows::ctrl_c; -#[cfg(target_os = "linux")] +#[cfg(unix)] pub mod unix; /// Completes when a "ctrl-c" notification is sent to the process. -#[cfg(target_os = "linux")] +#[cfg(unix)] pub async fn ctrl_c() -> std::io::Result<()> { unix::signal(libc::SIGINT).await } diff --git a/tests/tcp_accept.rs b/tests/tcp_accept.rs index ab266a5e..1c196841 100644 --- a/tests/tcp_accept.rs +++ b/tests/tcp_accept.rs @@ -19,6 +19,7 @@ macro_rules! test_accept { $( #[test] fn $ident() { + println!("Testing {}...", stringify!($ident)); compio::task::block_on(test_impl($target)) } )* diff --git a/tests/tcp_connect.rs b/tests/tcp_connect.rs index 4fefd703..63af7b41 100644 --- a/tests/tcp_connect.rs +++ b/tests/tcp_connect.rs @@ -51,7 +51,10 @@ async fn test_connect_impl(mapping: impl FnOnce(&TcpListener) -> }; let client = async { - assert!(TcpStream::connect(addr).await.is_ok()); + match TcpStream::connect(addr).await { + Ok(_) => (), + Err(e) => panic!("Failed to connect: {}", e), + } }; futures_util::join!(server, client);