From cdb4de11d6c3b1b4bbac9fe3f09006ca26cff4ce Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Fri, 9 Feb 2024 22:55:05 +0800 Subject: [PATCH] try pass compile in windows --- .github/workflows/ci.sh | 76 ++++++++------- .github/workflows/ci.yml | 31 +++--- monoio-macros/src/entry.rs | 1 + monoio/Cargo.toml | 19 ++-- monoio/src/buf/io_vec_buf.rs | 20 ++-- monoio/src/buf/raw_buf.rs | 21 +++-- monoio/src/buf/slice.rs | 11 ++- monoio/src/buf/vec_wrapper.rs | 8 +- monoio/src/builder.rs | 3 +- monoio/src/driver/legacy/iocp/afd.rs | 3 + .../driver/legacy/iocp/{iocp.rs => core.rs} | 2 +- monoio/src/driver/legacy/iocp/mod.rs | 4 +- monoio/src/driver/legacy/iocp/state.rs | 9 ++ monoio/src/driver/legacy/iocp/waker.rs | 7 +- monoio/src/driver/legacy/mod.rs | 13 ++- monoio/src/driver/mod.rs | 10 +- monoio/src/driver/op.rs | 1 + monoio/src/driver/op/accept.rs | 12 ++- monoio/src/driver/op/close.rs | 2 +- monoio/src/driver/op/connect.rs | 38 +++++--- monoio/src/driver/op/fsync.rs | 13 +-- monoio/src/driver/op/open.rs | 6 +- monoio/src/driver/op/poll.rs | 2 +- monoio/src/driver/op/read.rs | 59 +++++++++++- monoio/src/driver/op/recv.rs | 94 +++++++++++++++---- monoio/src/driver/op/send.rs | 79 ++++++++++++++-- monoio/src/driver/op/write.rs | 59 +++++++++++- monoio/src/driver/ready.rs | 3 +- monoio/src/driver/shared_fd.rs | 75 +++++++++------ monoio/src/driver/util.rs | 4 +- monoio/src/fs/file.rs | 11 ++- monoio/src/fs/open_options.rs | 8 +- monoio/src/io/util/copy.rs | 7 +- monoio/src/lib.rs | 1 + monoio/src/net/mod.rs | 27 +++++- monoio/src/net/tcp/listener.rs | 35 +++++-- monoio/src/net/tcp/stream.rs | 78 +++++++++++---- monoio/src/net/udp.rs | 74 +++++++++++++-- monoio/src/runtime.rs | 5 +- monoio/src/utils/bind_to_cpu_set.rs | 2 + monoio/src/utils/mod.rs | 2 + 41 files changed, 698 insertions(+), 237 deletions(-) rename monoio/src/driver/legacy/iocp/{iocp.rs => core.rs} (98%) diff --git a/.github/workflows/ci.sh b/.github/workflows/ci.sh index 08aabd87..3224093f 100755 --- a/.github/workflows/ci.sh +++ b/.github/workflows/ci.sh @@ -1,56 +1,60 @@ #!/usr/bin/env sh -set -ex +if [ "${NO_RUN}" != "1" ] && [ "${NO_RUN}" != "true" ]; then -CARGO=cargo -if [ "${CROSS}" = "1" ]; then - export CARGO_NET_RETRY=5 - export CARGO_NET_TIMEOUT=10 + set -ex - cargo install cross - CARGO=cross -fi + CARGO=cargo + if [ "${CROSS}" = "1" ]; then + export CARGO_NET_RETRY=5 + export CARGO_NET_TIMEOUT=10 -# If a test crashes, we want to know which one it was. -export RUST_TEST_THREADS=1 -export RUST_BACKTRACE=1 + cargo install cross + CARGO=cross + fi -# test monoio mod -cd "${PROJECT_DIR}"/monoio + # If a test crashes, we want to know which one it was. + export RUST_TEST_THREADS=1 + export RUST_BACKTRACE=1 -# only enable legacy driver -"${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,legacy,macros,utils" -"${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,legacy,macros,utils" --release + # test monoio mod + cd "${PROJECT_DIR}"/monoio -if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ] || [ "${TARGET}" = "i686-unknown-linux-gnu" ]; then + # only enable legacy driver + "${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,legacy,macros,utils" + "${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,legacy,macros,utils" --release - # only enabled uring driver - "${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,iouring,macros,utils" - "${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,iouring,macros,utils" --release + if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ] || [ "${TARGET}" = "i686-unknown-linux-gnu" ]; then + # only enabled uring driver + "${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,iouring,macros,utils" + "${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,iouring,macros,utils" --release + fi - # enable uring+legacy driver - "${CARGO}" test --target "${TARGET}" - "${CARGO}" test --target "${TARGET}" --release + if [ "${TARGET}" != "aarch64-unknown-linux-gnu" ] && [ "${TARGET}" != "armv7-unknown-linux-gnueabihf" ] && + [ "${TARGET}" != "riscv64gc-unknown-linux-gnu" ] && [ "${TARGET}" != "s390x-unknown-linux-gnu" ]; then + # enable uring+legacy driver + "${CARGO}" test --target "${TARGET}" + "${CARGO}" test --target "${TARGET}" --release + fi - if [ "${CHANNEL}" == "nightly" ]; then + if [ "${CHANNEL}" == "nightly" ] && ( [ "${TARGET}" = "x86_64-unknown-linux-gnu" ] || [ "${TARGET}" = "i686-unknown-linux-gnu" ] ); then "${CARGO}" test --target "${TARGET}" --all-features "${CARGO}" test --target "${TARGET}" --all-features --release fi -fi + # test monoio-compat mod + cd "${PROJECT_DIR}"/monoio-compat -# test monoio-compat mod -cd "${PROJECT_DIR}"/monoio-compat + "${CARGO}" test --target "${TARGET}" + "${CARGO}" test --target "${TARGET}" --release -"${CARGO}" test --target "${TARGET}" -"${CARGO}" test --target "${TARGET}" --release + "${CARGO}" test --target "${TARGET}" --no-default-features --features hyper + "${CARGO}" test --target "${TARGET}" --no-default-features --features hyper --release -"${CARGO}" test --target "${TARGET}" --no-default-features --features hyper -"${CARGO}" test --target "${TARGET}" --no-default-features --features hyper --release + if [ "${CHANNEL}" == "nightly" ]; then + "${CARGO}" test --target "${TARGET}" --all-features + "${CARGO}" test --target "${TARGET}" --all-features --release + fi -if [ "${CHANNEL}" == "nightly" ]; then - "${CARGO}" test --target "${TARGET}" --all-features - "${CARGO}" test --target "${TARGET}" --all-features --release + # todo maybe we should test examples here ? fi - -# todo maybe we should test examples here ? diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 36142e01..b07fb910 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,6 +51,7 @@ jobs: TARGET: ${{ matrix.target }} OS: ${{ matrix.os }} PROJECT_DIR: ${{ github.workspace }} + NO_RUN: ${{ matrix.no_run }} run: sh .github/workflows/ci.sh strategy: @@ -69,11 +70,10 @@ jobs: x86_64-apple-darwin, aarch64-apple-darwin, - # unsupported yet -# x86_64-pc-windows-gnu, -# x86_64-pc-windows-msvc, -# i686-pc-windows-gnu, -# i686-pc-windows-msvc, + x86_64-pc-windows-gnu, + x86_64-pc-windows-msvc, + i686-pc-windows-gnu, + i686-pc-windows-msvc, ] channel: [stable, nightly] include: @@ -99,12 +99,15 @@ jobs: - target: aarch64-apple-darwin os: macos-14 - # unsupported yet -# - target: x86_64-pc-windows-msvc -# os: windows-latest -# - target: x86_64-pc-windows-gnu -# os: windows-latest -# - target: i686-pc-windows-msvc -# os: windows-latest -# - target: i686-pc-windows-gnu -# os: windows-latest + - target: x86_64-pc-windows-msvc + os: windows-latest + no_run: 1 + - target: x86_64-pc-windows-gnu + os: windows-latest + no_run: 1 + - target: i686-pc-windows-msvc + os: windows-latest + no_run: 1 + - target: i686-pc-windows-gnu + os: windows-latest + no_run: 1 diff --git a/monoio-macros/src/entry.rs b/monoio-macros/src/entry.rs index 7efa0e69..ed9d7a6a 100644 --- a/monoio-macros/src/entry.rs +++ b/monoio-macros/src/entry.rs @@ -377,6 +377,7 @@ fn token_stream_with_error(mut tokens: TokenStream, error: syn::Error) -> TokenS tokens } +#[cfg(unix)] pub(crate) fn main(args: TokenStream, item: TokenStream) -> TokenStream { // If any of the steps for this macro fail, we still want to expand to an item that is as close // to the expected output as possible. This helps out IDEs such that completions and other diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index 93011eff..6a5cce1c 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -24,23 +24,28 @@ memchr = "2.7" bytes = { version = "1", optional = true } flume = { version = "0.11", optional = true } mio = { version = "0.8", features = [ - "net", - "os-poll", - "os-ext", + "net", + "os-poll", + "os-ext", ], optional = true } threadpool = { version = "1", optional = true } tokio = { version = "1", default-features = false, optional = true } tracing = { version = "0.1", default-features = false, features = [ - "std", + "std", ], optional = true } ctrlc = { version = "3", optional = true } lazy_static = { version = "1", optional = true } once_cell = { version = "1.19.0", optional = true } # windows dependencies(will be added when windows support finished) -[target.'cfg(windows)'.dependencies.windows-sys] -features = ["Win32_Foundation", "Win32_Networking_WinSock"] -version = "0.48.0" +[target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.48.0", features = [ + "Win32_Foundation", + "Win32_Networking_WinSock", + "Win32_System_IO", + "Win32_Storage_FileSystem", + "Win32_Security" +] } # unix dependencies [target.'cfg(unix)'.dependencies] diff --git a/monoio/src/buf/io_vec_buf.rs b/monoio/src/buf/io_vec_buf.rs index 135fa392..8302f3b1 100644 --- a/monoio/src/buf/io_vec_buf.rs +++ b/monoio/src/buf/io_vec_buf.rs @@ -24,16 +24,18 @@ pub unsafe trait IoVecBuf: Unpin + 'static { #[cfg(unix)] fn read_iovec_ptr(&self) -> *const libc::iovec; - #[cfg(unix)] /// Returns the count of iovec struct behind the pointer. /// /// # Safety /// There must be really that number of iovec here. + #[cfg(unix)] fn read_iovec_len(&self) -> usize; + /// Returns a raw pointer to WSABUF struct. #[cfg(windows)] fn read_wsabuf_ptr(&self) -> *const WSABUF; + /// Returns the count of WSABUF struct behind the pointer. #[cfg(windows)] fn read_wsabuf_len(&self) -> usize; } @@ -110,7 +112,7 @@ impl From>> for VecBuf { .iter() .map(|v| WSABUF { buf: v.as_ptr() as _, - len: v.len(), + len: v.len() as _, }) .collect(); Self { wsabufs, raw: vs } @@ -187,7 +189,6 @@ impl From for Vec> { /// See the safety note of the methods. #[allow(clippy::unnecessary_safety_doc)] pub unsafe trait IoVecBufMut: Unpin + 'static { - #[cfg(unix)] /// Returns a raw mutable pointer to iovec struct. /// struct iovec { /// void *iov_base; /* Starting address */ @@ -200,15 +201,18 @@ pub unsafe trait IoVecBufMut: Unpin + 'static { /// The implementation must ensure that, while the runtime owns the value, /// the pointer returned by `write_iovec_ptr` **does not** change. /// Also, the value pointed must be a valid iovec struct. + #[cfg(unix)] fn write_iovec_ptr(&mut self) -> *mut libc::iovec; /// Returns the count of iovec struct behind the pointer. #[cfg(unix)] fn write_iovec_len(&mut self) -> usize; + /// Returns a raw mutable pointer to WSABUF struct. #[cfg(windows)] fn write_wsabuf_ptr(&mut self) -> *mut WSABUF; + /// Returns the count of WSABUF struct behind the pointer. #[cfg(windows)] fn write_wsabuf_len(&mut self) -> usize; @@ -252,19 +256,19 @@ unsafe impl IoVecBufMut for VecBuf { #[cfg(windows)] unsafe impl IoVecBufMut for VecBuf { fn write_wsabuf_ptr(&mut self) -> *mut WSABUF { - self.write_wsabuf_ptr() as *mut _ + self.read_wsabuf_ptr() as *mut _ } fn write_wsabuf_len(&mut self) -> usize { - self.write_wsabuf_len() + self.read_wsabuf_len() } unsafe fn set_init(&mut self, mut len: usize) { for (idx, wsabuf) in self.wsabufs.iter_mut().enumerate() { - if wsabuf.len <= len { + if wsabuf.len as usize <= len { // set_init all - self.raw[idx].set_len(wsabuf.len); - len -= wsabuf.len; + self.raw[idx].set_len(wsabuf.len as _); + len -= wsabuf.len as usize; } else { if len > 0 { self.raw[idx].set_len(len); diff --git a/monoio/src/buf/raw_buf.rs b/monoio/src/buf/raw_buf.rs index 6828de67..ad64bc41 100644 --- a/monoio/src/buf/raw_buf.rs +++ b/monoio/src/buf/raw_buf.rs @@ -69,18 +69,21 @@ impl RawBuf { /// make sure the pointer and length is valid when RawBuf is used. #[inline] pub unsafe fn new_from_iovec_mut(data: &mut T) -> Option { - if data.write_iovec_len() == 0 { - return None; - } #[cfg(unix)] { + if data.write_iovec_len() == 0 { + return None; + } let iovec = *data.write_iovec_ptr(); Some(Self::new(iovec.iov_base as *const u8, iovec.iov_len)) } #[cfg(windows)] { + if data.write_wsabuf_len() == 0 { + return None; + } let wsabuf = *data.write_wsabuf_ptr(); - Some(Self::new(wsabuf.buf as *const u8, wsabuf.len)) + Some(Self::new(wsabuf.buf as *const u8, wsabuf.len as _)) } } @@ -103,7 +106,7 @@ impl RawBuf { return None; } let wsabuf = *data.read_wsabuf_ptr(); - Some(Self::new(wsabuf.buf as *const u8, wsabuf.len)) + Some(Self::new(wsabuf.buf as *const u8, wsabuf.len as _)) } } } @@ -129,6 +132,10 @@ impl RawBufVectored { pub const unsafe fn new(ptr: *const libc::iovec, len: usize) -> Self { Self { ptr, len } } + + /// Create a new RawBuf with given pointer and length. + /// # Safety + /// make sure the pointer and length is valid when RawBuf is used. #[cfg(windows)] #[inline] pub const unsafe fn new(ptr: *const WSABUF, len: usize) -> Self { @@ -175,13 +182,13 @@ unsafe impl IoVecBufMut for RawBufVectored { #[cfg(windows)] #[inline] - fn write_wsabuf_ptr(&self) -> *mut WSABUF { + fn write_wsabuf_ptr(&mut self) -> *mut WSABUF { self.ptr as *mut WSABUF } #[cfg(windows)] #[inline] - fn write_wsabuf_len(&self) -> usize { + fn write_wsabuf_len(&mut self) -> usize { self.len } diff --git a/monoio/src/buf/slice.rs b/monoio/src/buf/slice.rs index fac2e864..05dfc443 100644 --- a/monoio/src/buf/slice.rs +++ b/monoio/src/buf/slice.rs @@ -310,7 +310,7 @@ unsafe impl IoBuf for IoVecWrapper { #[cfg(windows)] { let wsabuf = unsafe { *self.raw.read_wsabuf_ptr() }; - wsabuf.len + wsabuf.len as _ } } } @@ -325,9 +325,14 @@ impl IoVecWrapperMut { /// Create a new IoVecWrapperMut with something that impl IoVecBufMut. #[inline] pub fn new(mut iovec_buf: T) -> Result { + #[cfg(unix)] if iovec_buf.write_iovec_len() == 0 { return Err(iovec_buf); } + #[cfg(windows)] + if iovec_buf.write_wsabuf_len() == 0 { + return Err(iovec_buf); + } Ok(Self { raw: iovec_buf }) } @@ -348,7 +353,7 @@ unsafe impl IoBufMut for IoVecWrapperMut { #[cfg(windows)] { let wsabuf = unsafe { *self.raw.write_wsabuf_ptr() }; - wsabuf.buf as *mut u8 + wsabuf.buf } } @@ -361,7 +366,7 @@ unsafe impl IoBufMut for IoVecWrapperMut { #[cfg(windows)] { let wsabuf = unsafe { *self.raw.write_wsabuf_ptr() }; - wsabuf.len + wsabuf.len as _ } } diff --git a/monoio/src/buf/vec_wrapper.rs b/monoio/src/buf/vec_wrapper.rs index 18e799e5..3da704b5 100644 --- a/monoio/src/buf/vec_wrapper.rs +++ b/monoio/src/buf/vec_wrapper.rs @@ -1,5 +1,5 @@ #[cfg(windows)] -use windows_sys::Win32::Networking::WinSock::WSABUF; +use {std::ops::Add, windows_sys::Win32::Networking::WinSock::WSABUF}; use super::{IoVecBuf, IoVecBufMut}; @@ -44,6 +44,7 @@ pub(crate) fn read_vec_meta(buf: &T) -> IoVecMeta { data.push(wsabuf); len += wsabuf.len; } + let len = len as _; IoVecMeta { data, offset: 0, @@ -84,6 +85,7 @@ pub(crate) fn write_vec_meta(buf: &mut T) -> IoVecMeta { data.push(wsabuf); len += wsabuf.len; } + let len = len as _; IoVecMeta { data, offset: 0, @@ -93,6 +95,7 @@ pub(crate) fn write_vec_meta(buf: &mut T) -> IoVecMeta { } impl IoVecMeta { + #[allow(unused_mut)] pub(crate) fn consume(&mut self, mut amt: usize) { #[cfg(unix)] { @@ -124,6 +127,7 @@ impl IoVecMeta { } #[cfg(windows)] { + let mut amt = amt as _; if amt == 0 { return; } @@ -141,7 +145,7 @@ impl IoVecMeta { return; } std::cmp::Ordering::Greater => { - unsafe { wsabuf.len.add(amt) }; + _ = wsabuf.len.add(amt); wsabuf.len -= amt; self.offset = offset; return; diff --git a/monoio/src/builder.rs b/monoio/src/builder.rs index 6f6e9baa..4734c998 100644 --- a/monoio/src/builder.rs +++ b/monoio/src/builder.rs @@ -4,10 +4,11 @@ use std::{io, marker::PhantomData}; use crate::driver::IoUringDriver; #[cfg(all(unix, feature = "legacy"))] use crate::driver::LegacyDriver; +#[cfg(all(unix, any(feature = "legacy", feature = "iouring")))] +use crate::utils::thread_id::gen_id; use crate::{ driver::Driver, time::{driver::TimeDriver, Clock}, - utils::thread_id::gen_id, Runtime, }; diff --git a/monoio/src/driver/legacy/iocp/afd.rs b/monoio/src/driver/legacy/iocp/afd.rs index a4681a8f..05d73056 100644 --- a/monoio/src/driver/legacy/iocp/afd.rs +++ b/monoio/src/driver/legacy/iocp/afd.rs @@ -67,6 +67,7 @@ pub const KNOWN_EVENTS: u32 = POLL_RECEIVE | POLL_CONNECT_FAIL; #[repr(C)] +#[derive(Debug)] pub struct AfdPollHandleInfo { pub handle: HANDLE, pub events: u32, @@ -74,6 +75,7 @@ pub struct AfdPollHandleInfo { } #[repr(C)] +#[derive(Debug)] pub struct AfdPollInfo { pub timeout: i64, pub number_of_handles: u32, @@ -81,6 +83,7 @@ pub struct AfdPollInfo { pub handles: [AfdPollHandleInfo; 1], } +#[derive(Debug)] pub struct Afd { file: File, } diff --git a/monoio/src/driver/legacy/iocp/iocp.rs b/monoio/src/driver/legacy/iocp/core.rs similarity index 98% rename from monoio/src/driver/legacy/iocp/iocp.rs rename to monoio/src/driver/legacy/iocp/core.rs index 4b1a5c5f..d871fd10 100644 --- a/monoio/src/driver/legacy/iocp/iocp.rs +++ b/monoio/src/driver/legacy/iocp/core.rs @@ -31,7 +31,7 @@ impl CompletionPort { let result = unsafe { CreateIoCompletionPort(handle, self.handle, token, 0) }; if result == 0 { - return Err(std::io::Error::last_os_error()); + Err(std::io::Error::last_os_error()) } else { Ok(()) } diff --git a/monoio/src/driver/legacy/iocp/mod.rs b/monoio/src/driver/legacy/iocp/mod.rs index bfdcec64..59dc8b37 100644 --- a/monoio/src/driver/legacy/iocp/mod.rs +++ b/monoio/src/driver/legacy/iocp/mod.rs @@ -1,9 +1,10 @@ mod afd; +mod core; mod event; -mod iocp; mod state; mod waker; +pub use core::*; use std::{ collections::VecDeque, os::windows::prelude::RawSocket, @@ -17,7 +18,6 @@ use std::{ pub use afd::*; pub use event::*; -pub use iocp::*; pub use state::*; pub use waker::*; use windows_sys::Win32::{ diff --git a/monoio/src/driver/legacy/iocp/state.rs b/monoio/src/driver/legacy/iocp/state.rs index 8fb51010..a550eb6e 100644 --- a/monoio/src/driver/legacy/iocp/state.rs +++ b/monoio/src/driver/legacy/iocp/state.rs @@ -1,3 +1,4 @@ +use core::fmt::Debug; use std::{ marker::PhantomPinned, os::windows::prelude::RawSocket, @@ -23,6 +24,7 @@ pub enum SockPollStatus { Cancelled, } +#[derive(Debug)] pub struct SocketState { pub socket: RawSocket, pub inner: Option>>>, @@ -223,6 +225,13 @@ impl SockState { } } +impl Debug for SockState { + #[allow(unused_variables)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } +} + impl Drop for SockState { fn drop(&mut self) { self.mark_delete(); diff --git a/monoio/src/driver/legacy/iocp/waker.rs b/monoio/src/driver/legacy/iocp/waker.rs index b5d9b6b5..d5aa9a99 100644 --- a/monoio/src/driver/legacy/iocp/waker.rs +++ b/monoio/src/driver/legacy/iocp/waker.rs @@ -9,17 +9,18 @@ pub struct Waker { } impl Waker { + #[allow(unreachable_code, unused_variables)] pub fn new(poller: &Poller, token: mio::Token) -> io::Result { Ok(Waker { token, - port: poller.cp.clone(), + // port: poller.cp.clone(), + port: unimplemented!(), }) } pub fn wake(&self) -> io::Result<()> { let mut ev = Event::new(self.token); ev.set_readable(); - - self.port.post(ev.to_completion_status()) + self.port.post(ev.to_entry()) } } diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index 8a351a56..9d0f796e 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -16,6 +16,7 @@ use super::{ }; use crate::utils::slab::Slab; +#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)] #[cfg(windows)] pub(super) mod iocp; @@ -44,6 +45,7 @@ pub(crate) struct LegacyInner { } /// Driver with Poll-like syscall. +#[allow(unreachable_pub)] pub struct LegacyDriver { inner: Rc>, @@ -55,6 +57,7 @@ pub struct LegacyDriver { #[cfg(feature = "sync")] const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); +#[allow(dead_code)] impl LegacyDriver { const DEFAULT_ENTRIES: u32 = 1024; @@ -154,7 +157,11 @@ impl LegacyDriver { Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } - for event in events.iter() { + #[cfg(unix)] + let iter = events.iter(); + #[cfg(windows)] + let iter = events.events.iter(); + for event in iter { let token = event.token(); #[cfg(feature = "sync")] @@ -171,7 +178,7 @@ impl LegacyDriver { #[cfg(windows)] pub(crate) fn register( this: &Rc>, - state: &mut iocp::SockState, + state: &mut iocp::SocketState, interest: mio::Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; @@ -191,7 +198,7 @@ impl LegacyDriver { pub(crate) fn deregister( this: &Rc>, token: usize, - state: &mut iocp::SockState, + state: &mut iocp::SocketState, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index 6aeebe99..a3e753df 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -1,4 +1,5 @@ /// Monoio Driver. +#[allow(dead_code)] pub(crate) mod op; #[cfg(feature = "poll-io")] pub(crate) mod poll; @@ -6,6 +7,7 @@ pub(crate) mod poll; pub(crate) mod ready; #[cfg(any(feature = "legacy", feature = "poll-io"))] pub(crate) mod scheduled_io; +#[allow(dead_code)] pub(crate) mod shared_fd; #[cfg(feature = "sync")] pub(crate) mod thread; @@ -23,7 +25,8 @@ use std::{ time::Duration, }; -#[cfg(feature = "legacy")] +#[allow(unreachable_pub)] +#[cfg(all(feature = "legacy", unix))] pub use self::legacy::LegacyDriver; #[cfg(feature = "legacy")] use self::legacy::LegacyInner; @@ -96,8 +99,6 @@ pub(crate) enum Inner { impl Inner { fn submit_with(&self, data: T) -> io::Result> { match self { - #[cfg(windows)] - _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::submit_with_data(this, data), #[cfg(feature = "legacy")] @@ -107,7 +108,10 @@ impl Inner { not(all(target_os = "linux", feature = "iouring")) ))] _ => { + #[cfg(unix)] util::feature_panic(); + #[cfg(windows)] + unimplemented!(); } } } diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs index b3902320..3a1e60ef 100644 --- a/monoio/src/driver/op.rs +++ b/monoio/src/driver/op.rs @@ -61,6 +61,7 @@ pub(crate) trait OpAble { /// If legacy is enabled and iouring is not, we can expose io interface in a poll-like way. /// This can provide better compatibility for crates programmed in poll-like way. +#[allow(dead_code)] #[cfg(any(feature = "legacy", feature = "poll-io"))] pub(crate) trait PollLegacy { #[cfg(feature = "legacy")] diff --git a/monoio/src/driver/op/accept.rs b/monoio/src/driver/op/accept.rs index 596a8399..0b17fd50 100644 --- a/monoio/src/driver/op/accept.rs +++ b/monoio/src/driver/op/accept.rs @@ -1,3 +1,5 @@ +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +use std::os::unix::prelude::AsRawFd; use std::{ io, mem::{size_of, MaybeUninit}, @@ -13,12 +15,12 @@ use { accept, socklen_t, INVALID_SOCKET, SOCKADDR_STORAGE, }, }; -#[cfg(any(feature = "legacy", feature = "poll-io"))] -use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +use crate::syscall_u32; /// Accept pub(crate) struct Accept { @@ -62,7 +64,7 @@ impl OpAble for Accept { .build() } - #[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { self.fd.registered_index().map(|idx| (Direction::Read, idx)) @@ -74,10 +76,10 @@ impl OpAble for Accept { let addr = self.addr.0.as_mut_ptr() as *mut _; let len = &mut self.addr.1; - syscall!(accept(fd, addr, len), PartialEq::eq, INVALID_SOCKET) + syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); let addr = self.addr.0.as_mut_ptr() as *mut _; diff --git a/monoio/src/driver/op/close.rs b/monoio/src/driver/op/close.rs index 19557c5f..0ac7a731 100644 --- a/monoio/src/driver/op/close.rs +++ b/monoio/src/driver/op/close.rs @@ -50,6 +50,6 @@ impl OpAble for Close { return crate::syscall_u32!(close(self.fd)); #[cfg(windows)] - return syscall!(closesocket(self.fd), PartialEq::ne, 0); + return syscall!(closesocket(self.fd as _), PartialEq::ne, 0); } } diff --git a/monoio/src/driver/op/connect.rs b/monoio/src/driver/op/connect.rs index 6fcd60c1..8326f8ae 100644 --- a/monoio/src/driver/op/connect.rs +++ b/monoio/src/driver/op/connect.rs @@ -96,17 +96,22 @@ impl OpAble for Connect { } #[cfg(windows)] - match crate::syscall!( - connect( - self.fd.raw_socket(), - self.socket_addr.as_ptr(), - self.socket_addr_len, - ), - PartialEq::eq, - SOCKET_ERROR - ) { - Err(err) if err.kind() != io::ErrorKind::WouldBlock => Err(err), - _ => Ok(self.fd.raw_fd() as u32), + { + let res = unsafe { + connect( + self.fd.raw_socket() as _, + self.socket_addr.as_ptr().cast(), + self.socket_addr_len, + ) + }; + if res == SOCKET_ERROR { + let err = io::Error::last_os_error(); + if err.kind() != io::ErrorKind::WouldBlock { + return Err(err); + } + } + #[allow(clippy::unnecessary_cast)] + Ok(self.fd.raw_socket() as u32) } } } @@ -152,7 +157,7 @@ impl OpAble for ConnectUnix { None } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { match crate::syscall_u32!(connect( self.fd.raw_fd(), @@ -163,6 +168,11 @@ impl OpAble for ConnectUnix { _ => Ok(self.fd.raw_fd() as u32), } } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + unimplemented!() + } } /// A type with the same memory layout as `libc::sockaddr`. Used in converting Rust level @@ -201,7 +211,7 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) { }; let sockaddr_in = SOCKADDR_IN { - sin_family: AF_INET as u16, // 1 + sin_family: AF_INET, // 1 sin_port: addr.port().to_be(), sin_addr, sin_zero: [0; 8], @@ -223,7 +233,7 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) { }; let sockaddr_in6 = SOCKADDR_IN6 { - sin6_family: AF_INET6 as u16, // 23 + sin6_family: AF_INET6, // 23 sin6_port: addr.port().to_be(), sin6_addr, sin6_flowinfo: addr.flowinfo(), diff --git a/monoio/src/driver/op/fsync.rs b/monoio/src/driver/op/fsync.rs index db363b3d..0dfd1707 100644 --- a/monoio/src/driver/op/fsync.rs +++ b/monoio/src/driver/op/fsync.rs @@ -3,14 +3,15 @@ use std::io; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; #[cfg(windows)] -use windows_sys::Win32::Storage::FileSystem::FlushFileBuffers; +use { + crate::syscall, std::os::windows::prelude::AsRawHandle, + windows_sys::Win32::Storage::FileSystem::FlushFileBuffers, +}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; -#[cfg(windows)] -use crate::syscall; -#[cfg(any(feature = "legacy", feature = "poll-io"))] +#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] use crate::syscall_u32; pub(crate) struct Fsync { @@ -57,13 +58,13 @@ impl OpAble for Fsync { #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { syscall!( - FlushFileBuffers(self.handle.as_raw_handle()), + FlushFileBuffers(self.fd.as_raw_handle() as _), PartialEq::eq, 0 ) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { #[cfg(target_os = "linux")] if self.data_sync { diff --git a/monoio/src/driver/op/open.rs b/monoio/src/driver/op/open.rs index ad2badc9..1cb81dbd 100644 --- a/monoio/src/driver/op/open.rs +++ b/monoio/src/driver/op/open.rs @@ -10,7 +10,7 @@ use super::{Op, OpAble}; use crate::driver::ready::Direction; #[cfg(windows)] use crate::syscall; -#[cfg(any(feature = "legacy", feature = "poll-io"))] +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] use crate::syscall_u32; use crate::{driver::util::cstr, fs::OpenOptions}; @@ -81,13 +81,13 @@ impl OpAble for Open { fn legacy_call(&mut self) -> io::Result { syscall!( CreateFileW( - self.path.as_c_str().as_ptr(), + self.path.as_c_str().as_ptr().cast(), self.opts.access_mode()?, self.opts.share_mode, self.opts.security_attributes, self.opts.creation_mode()?, self.opts.get_flags_and_attributes(), - std::ptr::null_mut(), + 0, ), PartialEq::eq, INVALID_HANDLE_VALUE diff --git a/monoio/src/driver/op/poll.rs b/monoio/src/driver/op/poll.rs index 1d8bc2c4..56aaae0e 100644 --- a/monoio/src/driver/op/poll.rs +++ b/monoio/src/driver/op/poll.rs @@ -107,7 +107,7 @@ impl OpAble for PollAdd { fn legacy_call(&mut self) -> io::Result { if !self.relaxed { let mut pollfd = WSAPOLLFD { - fd: self.fd.as_raw_socket(), + fd: self.fd.as_raw_socket() as _, events: if self.is_read { POLLIN as _ } else { diff --git a/monoio/src/driver/op/read.rs b/monoio/src/driver/op/read.rs index ff0d7357..17c17ba4 100644 --- a/monoio/src/driver/op/read.rs +++ b/monoio/src/driver/op/read.rs @@ -2,13 +2,19 @@ use std::io; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(any(feature = "legacy", feature = "poll-io"))] +#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { - crate::{driver::ready::Direction, syscall_u32}, - std::os::unix::prelude::AsRawFd, + crate::syscall, + std::ffi::c_void, + std::os::windows::io::AsRawSocket, + windows_sys::Win32::Networking::WinSock::{recv, WSAGetLastError, WSARecv, SOCKET_ERROR}, }; +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; use crate::{ buf::{IoBufMut, IoVecBufMut}, BufResult, @@ -72,7 +78,7 @@ impl OpAble for Read { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); let seek_offset = libc::off_t::try_from(self.offset) @@ -93,6 +99,24 @@ impl OpAble for Read { seek_offset )); } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let fd = self.fd.as_raw_socket(); + let seek_offset = libc::off_t::try_from(self.offset) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + syscall!( + recv( + fd as _, + (self.buf.write_ptr().cast::() as usize + seek_offset as usize) + as *mut c_void as *mut _, + self.buf.bytes_total() as i32 - seek_offset, + 0 + ), + PartialOrd::ge, + 0 + ) + } } pub(crate) struct ReadVec { @@ -139,7 +163,7 @@ impl OpAble for ReadVec { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { syscall_u32!(readv( self.fd.raw_fd(), @@ -147,4 +171,29 @@ impl OpAble for ReadVec { self.buf_vec.write_iovec_len().min(i32::MAX as usize) as _ )) } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let mut bytes_recved = 0; + let ret = unsafe { + WSARecv( + self.fd.raw_socket() as _, + self.buf_vec.write_wsabuf_ptr(), + self.buf_vec.write_wsabuf_len() as _, + &mut bytes_recved, + std::ptr::null_mut(), + std::ptr::null_mut(), + None, + ) + }; + match ret { + 0 => return Err(std::io::ErrorKind::WouldBlock.into()), + SOCKET_ERROR => { + let error = unsafe { WSAGetLastError() }; + return Err(std::io::Error::from_raw_os_error(error)); + } + _ => (), + } + Ok(bytes_recved) + } } diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index b0609d65..aac1116e 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -1,19 +1,26 @@ -use std::{ - io, - mem::{transmute, MaybeUninit}, - net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, -}; +use std::{io, net::SocketAddr}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(any(feature = "legacy", feature = "poll-io"))] +#[cfg(unix)] use { - crate::{driver::ready::Direction, syscall_u32}, - std::os::unix::prelude::AsRawFd, + crate::net::unix::SocketAddr as UnixSocketAddr, + libc::{socklen_t, AF_INET, AF_INET6}, + std::mem::{transmute, MaybeUninit}, + std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, }; +#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] +use { + crate::syscall, std::os::windows::io::AsRawSocket, + windows_sys::Win32::Networking::WinSock::recv, +}; +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; -use crate::{buf::IoBufMut, net::unix::SocketAddr as UnixSocketAddr, BufResult}; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; +use crate::{buf::IoBufMut, BufResult}; pub(crate) struct Recv { /// Holds a strong ref to the FD, preventing the file from being closed @@ -70,7 +77,7 @@ impl OpAble for Recv { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); syscall_u32!(recv( @@ -80,6 +87,21 @@ impl OpAble for Recv { 0 )) } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let fd = self.fd.as_raw_socket(); + syscall!( + recv( + fd as _, + self.buf.write_ptr(), + self.buf.bytes_total() as _, + 0 + ), + PartialOrd::ge, + 0 + ) + } } pub(crate) struct RecvMsg { @@ -90,6 +112,7 @@ pub(crate) struct RecvMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, + #[cfg(unix)] pub(crate) info: Box<( MaybeUninit, [libc::iovec; 1], @@ -97,6 +120,7 @@ pub(crate) struct RecvMsg { )>, } +#[cfg(unix)] impl Op> { pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { let iovec = [libc::iovec { @@ -112,7 +136,7 @@ impl Op> { info.2.msg_iov = info.1.as_mut_ptr(); info.2.msg_iovlen = 1; info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as libc::socklen_t; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; Op::submit_with(RecvMsg { fd, buf, info }) } @@ -127,7 +151,7 @@ impl Op> { let addr = unsafe { match storage.ss_family as libc::c_int { - libc::AF_INET => { + AF_INET => { // Safety: if the ss_family field is AF_INET then storage must be a // sockaddr_in. let addr: &libc::sockaddr_in = transmute(&storage); @@ -135,7 +159,7 @@ impl Op> { let port = u16::from_be(addr.sin_port); SocketAddr::V4(SocketAddrV4::new(ip, port)) } - libc::AF_INET6 => { + AF_INET6 => { // Safety: if the ss_family field is AF_INET6 then storage must be a // sockaddr_in6. let addr: &libc::sockaddr_in6 = transmute(&storage); @@ -165,6 +189,18 @@ impl Op> { } } +#[cfg(windows)] +impl Op> { + #[allow(unused_mut, unused_variables)] + pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { + unimplemented!() + } + + pub(crate) async fn wait(self) -> BufResult<(usize, SocketAddr), T> { + unimplemented!() + } +} + impl OpAble for RecvMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { @@ -177,11 +213,17 @@ impl OpAble for RecvMsg { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let _fd = self.fd.as_raw_socket(); + unimplemented!(); + } } pub(crate) struct RecvMsgUnix { @@ -192,6 +234,7 @@ pub(crate) struct RecvMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, + #[cfg(unix)] pub(crate) info: Box<( MaybeUninit, [libc::iovec; 1], @@ -199,6 +242,7 @@ pub(crate) struct RecvMsgUnix { )>, } +#[cfg(unix)] impl Op> { pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result { let iovec = [libc::iovec { @@ -214,7 +258,7 @@ impl Op> { info.2.msg_iov = info.1.as_mut_ptr(); info.2.msg_iovlen = 1; info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as libc::socklen_t; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; Op::submit_with(RecvMsgUnix { fd, buf, info }) } @@ -244,6 +288,18 @@ impl Op> { } } +#[cfg(windows)] +impl Op> { + #[allow(unused_mut, unused_variables)] + pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result { + unimplemented!() + } + + pub(crate) async fn wait(self) -> BufResult<(usize, SocketAddr), T> { + unimplemented!() + } +} + impl OpAble for RecvMsgUnix { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { @@ -256,9 +312,15 @@ impl OpAble for RecvMsgUnix { self.fd.registered_index().map(|idx| (Direction::Read, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let _fd = self.fd.as_raw_socket(); + unimplemented!(); + } } diff --git a/monoio/src/driver/op/send.rs b/monoio/src/driver/op/send.rs index 9a8e8508..6f8002bb 100644 --- a/monoio/src/driver/op/send.rs +++ b/monoio/src/driver/op/send.rs @@ -2,15 +2,20 @@ use std::{io, net::SocketAddr}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -use socket2::SockAddr; -#[cfg(any(feature = "legacy", feature = "poll-io"))] +#[cfg(unix)] +use {crate::net::unix::SocketAddr as UnixSocketAddr, socket2::SockAddr}; +#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { - crate::{driver::ready::Direction, syscall_u32}, - std::os::unix::prelude::AsRawFd, + crate::syscall, std::os::windows::io::AsRawSocket, + windows_sys::Win32::Networking::WinSock::send, }; +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; -use crate::{buf::IoBuf, net::unix::SocketAddr as UnixSocketAddr, BufResult}; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; +use crate::{buf::IoBuf, BufResult}; pub(crate) struct Send { /// Holds a strong ref to the FD, preventing the file from being closed @@ -82,7 +87,7 @@ impl OpAble for Send { .map(|idx| (Direction::Write, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); #[cfg(target_os = "linux")] @@ -98,6 +103,16 @@ impl OpAble for Send { flags )) } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let fd = self.fd.as_raw_socket(); + syscall!( + send(fd as _, self.buf.read_ptr(), self.buf.bytes_init() as _, 0), + PartialOrd::ge, + 0 + ) + } } pub(crate) struct SendMsg { @@ -108,9 +123,11 @@ pub(crate) struct SendMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, + #[cfg(unix)] pub(crate) info: Box<(Option, [libc::iovec; 1], libc::msghdr)>, } +#[cfg(unix)] impl Op> { pub(crate) fn send_msg( fd: SharedFd, @@ -151,6 +168,22 @@ impl Op> { } } +#[cfg(windows)] +impl Op> { + #[allow(unused_variables)] + pub(crate) fn send_msg( + fd: SharedFd, + buf: T, + socket_addr: Option, + ) -> io::Result { + unimplemented!() + } + + pub(crate) async fn wait(self) -> BufResult { + unimplemented!() + } +} + impl OpAble for SendMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { @@ -169,7 +202,7 @@ impl OpAble for SendMsg { .map(|idx| (Direction::Write, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { #[cfg(target_os = "linux")] #[allow(deprecated)] @@ -179,6 +212,12 @@ impl OpAble for SendMsg { let fd = self.fd.as_raw_fd(); syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS)) } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let _fd = self.fd.as_raw_socket(); + unimplemented!(); + } } pub(crate) struct SendMsgUnix { @@ -189,9 +228,11 @@ pub(crate) struct SendMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, + #[cfg(unix)] pub(crate) info: Box<(Option, [libc::iovec; 1], libc::msghdr)>, } +#[cfg(unix)] impl Op> { pub(crate) fn send_msg_unix( fd: SharedFd, @@ -232,6 +273,22 @@ impl Op> { } } +#[cfg(windows)] +impl Op> { + #[allow(unused_variables)] + pub(crate) fn send_msg_unix( + fd: SharedFd, + buf: T, + socket_addr: Option, + ) -> io::Result { + unimplemented!() + } + + pub(crate) async fn wait(self) -> BufResult { + unimplemented!() + } +} + impl OpAble for SendMsgUnix { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { @@ -250,7 +307,7 @@ impl OpAble for SendMsgUnix { .map(|idx| (Direction::Write, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] #[inline] fn legacy_call(&mut self) -> io::Result { #[cfg(target_os = "linux")] @@ -261,4 +318,10 @@ impl OpAble for SendMsgUnix { let fd = self.fd.as_raw_fd(); syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS)) } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let _fd = self.fd.as_raw_socket(); + unimplemented!(); + } } diff --git a/monoio/src/driver/op/write.rs b/monoio/src/driver/op/write.rs index f70e2c53..c437705d 100644 --- a/monoio/src/driver/op/write.rs +++ b/monoio/src/driver/op/write.rs @@ -2,13 +2,19 @@ use std::io; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(any(feature = "legacy", feature = "poll-io"))] +#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { - crate::{driver::ready::Direction, syscall_u32}, - std::os::unix::prelude::AsRawFd, + crate::syscall, + std::ffi::c_void, + std::os::windows::io::AsRawSocket, + windows_sys::Win32::Networking::WinSock::{send, WSAGetLastError, WSASend, SOCKET_ERROR}, }; +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +use crate::driver::ready::Direction; use crate::{ buf::{IoBuf, IoVecBuf}, BufResult, @@ -59,7 +65,7 @@ impl OpAble for Write { .map(|idx| (Direction::Write, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); let seek_offset = libc::off_t::try_from(self.offset) @@ -80,6 +86,24 @@ impl OpAble for Write { seek_offset )); } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let fd = self.fd.as_raw_socket(); + let seek_offset = libc::off_t::try_from(self.offset) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + syscall!( + send( + fd as _, + (self.buf.read_ptr().cast::() as usize + seek_offset as usize) + as *mut c_void as *mut _, + self.buf.bytes_init() as i32 - seek_offset, + 0 + ), + PartialOrd::ge, + 0 + ) + } } pub(crate) struct WriteVec { @@ -129,7 +153,7 @@ impl OpAble for WriteVec { .map(|idx| (Direction::Write, idx)) } - #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { syscall_u32!(writev( self.fd.raw_fd(), @@ -137,4 +161,29 @@ impl OpAble for WriteVec { self.buf_vec.read_iovec_len().min(i32::MAX as usize) as _ )) } + + #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] + fn legacy_call(&mut self) -> io::Result { + let mut bytes_sent = 0; + let ret = unsafe { + WSASend( + self.fd.raw_socket() as _, + self.buf_vec.read_wsabuf_ptr(), + self.buf_vec.read_wsabuf_len() as _, + &mut bytes_sent, + 0, + std::ptr::null_mut(), + None, + ) + }; + match ret { + 0 => return Err(std::io::ErrorKind::WouldBlock.into()), + SOCKET_ERROR => { + let error = unsafe { WSAGetLastError() }; + return Err(std::io::Error::from_raw_os_error(error)); + } + _ => (), + } + Ok(bytes_sent) + } } diff --git a/monoio/src/driver/ready.rs b/monoio/src/driver/ready.rs index 38d6a7a7..6262359a 100644 --- a/monoio/src/driver/ready.rs +++ b/monoio/src/driver/ready.rs @@ -46,7 +46,7 @@ impl Ready { pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED); #[cfg(windows)] - pub(crate) fn from_mio(event: &super::iocp::Event) -> Ready { + pub(crate) fn from_mio(event: &super::legacy::iocp::Event) -> Ready { let mut ready = Ready::EMPTY; if event.is_readable() { @@ -242,4 +242,5 @@ impl Direction { } } +#[allow(dead_code)] pub(crate) const RW_INTERESTS: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index ddf719ce..499ed3d2 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -1,7 +1,9 @@ #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; #[cfg(windows)] -use std::os::windows::io::{AsRawSocket, FromRawSocket, OwnedSocket, RawSocket}; +use std::os::windows::io::{ + AsRawHandle, AsRawSocket, FromRawSocket, OwnedSocket, RawHandle, RawSocket, +}; use std::{cell::UnsafeCell, io, rc::Rc}; #[cfg(windows)] @@ -27,7 +29,7 @@ struct Inner { enum State { #[cfg(all(target_os = "linux", feature = "iouring"))] Uring(UringState), - #[cfg(all(unix, feature = "legacy"))] + #[cfg(feature = "legacy")] Legacy(Option), } @@ -149,6 +151,13 @@ impl AsRawSocket for SharedFd { } } +#[cfg(windows)] +impl AsRawHandle for SharedFd { + fn as_raw_handle(&self) -> RawHandle { + self.raw_handle() + } +} + impl SharedFd { #[cfg(unix)] #[allow(unreachable_code, unused)] @@ -309,6 +318,11 @@ impl SharedFd { self.inner.fd.socket } + #[cfg(windows)] + pub(crate) fn raw_handle(&self) -> RawHandle { + unimplemented!() + } + #[cfg(unix)] /// Try unwrap Rc, then deregister if registered and return rawfd. /// Note: this action will consume self and return rawfd without closing it. @@ -363,32 +377,33 @@ impl SharedFd { /// Try unwrap Rc, then deregister if registered and return rawfd. /// Note: this action will consume self and return rawfd without closing it. pub(crate) fn try_unwrap(self) -> Result { - let fd = self.inner.fd; - match Rc::try_unwrap(self.inner) { - Ok(_inner) => { - let state = unsafe { &*_inner.state.get() }; - - #[allow(irrefutable_let_patterns)] - if let State::Legacy(idx) = state { - if CURRENT.is_set() { - CURRENT.with(|inner| { - match inner { - super::Inner::Legacy(inner) => { - // deregister it from driver(Poll and slab) and close fd - if let Some(idx) = idx { - let _ = super::legacy::LegacyDriver::deregister( - inner, *idx, &mut fd, - ); - } - } - } - }) - } - } - Ok(fd.socket) - } - Err(inner) => Err(Self { inner }), - } + // let mut fd = self.inner.fd; + // match Rc::try_unwrap(self.inner) { + // Ok(_inner) => { + // let state = unsafe { &*_inner.state.get() }; + // + // #[allow(irrefutable_let_patterns)] + // if let State::Legacy(idx) = state { + // if CURRENT.is_set() { + // CURRENT.with(|inner| { + // match inner { + // super::Inner::Legacy(inner) => { + // // deregister it from driver(Poll and slab) and close fd + // if let Some(idx) = idx { + // let _ = super::legacy::LegacyDriver::deregister( + // inner, *idx, &mut fd, + // ); + // } + // } + // } + // }) + // } + // } + // Ok(fd.socket) + // } + // Err(inner) => Err(Self { inner }), + // } + unimplemented!() } #[allow(unused)] @@ -499,6 +514,7 @@ impl Inner { } } +#[cfg(unix)] impl Drop for Inner { fn drop(&mut self) { let fd = self.fd; @@ -520,8 +536,9 @@ impl Drop for Inner { } } +#[allow(unused_mut)] #[cfg(feature = "legacy")] -fn drop_legacy(fd: RawFd, idx: Option) { +fn drop_legacy(mut fd: RawFd, idx: Option) { if CURRENT.is_set() { CURRENT.with(|inner| { #[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] diff --git a/monoio/src/driver/util.rs b/monoio/src/driver/util.rs index a5834006..53992338 100644 --- a/monoio/src/driver/util.rs +++ b/monoio/src/driver/util.rs @@ -1,5 +1,6 @@ use std::{ffi::CString, io, path::Path}; +#[allow(unused_variables)] pub(super) fn cstr(p: &Path) -> io::Result { #[cfg(unix)] { @@ -35,6 +36,7 @@ macro_rules! syscall { }}; } +/// Do syscall and return Result #[cfg(windows)] #[macro_export] macro_rules! syscall { @@ -43,7 +45,7 @@ macro_rules! syscall { if $err_test(&res, &$err_value) { Err(io::Error::last_os_error()) } else { - Ok(res) + Ok(res.try_into().unwrap()) } }}; } diff --git a/monoio/src/fs/file.rs b/monoio/src/fs/file.rs index 6df697d4..a354eb93 100644 --- a/monoio/src/fs/file.rs +++ b/monoio/src/fs/file.rs @@ -1,11 +1,14 @@ #[cfg(windows)] use std::os::windows::io::{AsRawHandle, RawHandle}; #[cfg(unix)] -use std::os::{ - fd::IntoRawFd, - unix::io::{AsRawFd, RawFd}, +use std::{ + fs::File as StdFile, + os::{ + fd::IntoRawFd, + unix::io::{AsRawFd, RawFd}, + }, }; -use std::{fs::File as StdFile, io, path::Path}; +use std::{io, path::Path}; use crate::{ buf::{IoBuf, IoBufMut}, diff --git a/monoio/src/fs/open_options.rs b/monoio/src/fs/open_options.rs index cabd3a8d..73c3d737 100644 --- a/monoio/src/fs/open_options.rs +++ b/monoio/src/fs/open_options.rs @@ -378,7 +378,7 @@ impl OpenOptions { (false, _, true, None) => Ok(FILE_GENERIC_WRITE & !FILE_WRITE_DATA), (true, _, true, None) => Ok(GENERIC_READ | (FILE_GENERIC_WRITE & !FILE_WRITE_DATA)), (false, false, false, None) => { - Err(io::Error::from_raw_os_error(ERROR_INVALID_PARAMETER)) + Err(io::Error::from_raw_os_error(ERROR_INVALID_PARAMETER as _)) } } } @@ -414,12 +414,12 @@ impl OpenOptions { (true, false) => {} (false, false) => { if self.truncate || self.create || self.create_new { - return Err(io::Error::from_raw_os_error(ERROR_INVALID_PARAMETER)); + return Err(io::Error::from_raw_os_error(ERROR_INVALID_PARAMETER as _)); } } (_, true) => { if self.truncate && !self.create_new { - return Err(io::Error::from_raw_os_error(ERROR_INVALID_PARAMETER)); + return Err(io::Error::from_raw_os_error(ERROR_INVALID_PARAMETER as _)); } } } @@ -439,7 +439,7 @@ impl OpenOptions { | self.attributes | self.security_qos_flags | if self.create_new { - FILE_FLAG_OPEN_REPARSE_POINT + FILE_FLAG_OPEN_REPARSE_POINT as _ } else { 0 } diff --git a/monoio/src/io/util/copy.rs b/monoio/src/io/util/copy.rs index 40278500..40800ee8 100644 --- a/monoio/src/io/util/copy.rs +++ b/monoio/src/io/util/copy.rs @@ -2,10 +2,9 @@ use std::io; -use crate::{ - io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt}, - net::unix::new_pipe, -}; +use crate::io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt}; +#[cfg(unix)] +use crate::net::unix::new_pipe; const BUF_SIZE: usize = 4 * 1024; diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs index b3a28f4a..513c1f1a 100644 --- a/monoio/src/lib.rs +++ b/monoio/src/lib.rs @@ -14,6 +14,7 @@ pub use monoio_macros::select_priv_declare_output_enum; #[macro_use] mod driver; pub(crate) mod builder; +#[allow(dead_code)] pub(crate) mod runtime; mod scheduler; pub mod time; diff --git a/monoio/src/net/mod.rs b/monoio/src/net/mod.rs index ebcdfbda..e1ddc3f0 100644 --- a/monoio/src/net/mod.rs +++ b/monoio/src/net/mod.rs @@ -7,6 +7,9 @@ pub mod udp; #[cfg(unix)] pub mod unix; +#[cfg(windows)] +use std::os::windows::prelude::{AsRawSocket, RawSocket}; + pub use listener_config::ListenerOpts; #[deprecated(since = "0.2.0", note = "use ListenerOpts")] pub use listener_config::ListenerOpts as ListenerConfig; @@ -15,6 +18,7 @@ pub use tcp::{TcpConnectOpts, TcpListener, TcpStream}; pub use unix::{Pipe, UnixDatagram, UnixListener, UnixStream}; // Copied from mio. +#[cfg(unix)] pub(crate) fn new_socket( domain: libc::c_int, socket_type: libc::c_int, @@ -73,8 +77,25 @@ pub(crate) fn new_socket( }) }); - #[cfg(windows)] - let socket: std::io::Result<_> = unimplemented!(); - socket } + +#[cfg(windows)] +pub(crate) fn new_socket( + domain: libc::c_int, + socket_type: libc::c_int, +) -> std::io::Result { + let socket = socket2::Socket::new( + socket2::Domain::from(Into::::into(domain)), + socket2::Type::from(socket_type), + None, + )?; + let raw_socket = socket.as_raw_socket(); + socket.set_nonblocking(true).map_err(|e| { + // If either of the `ioctlsocket` calls failed, ensure the socket is + // closed and return the error. + unsafe { windows_sys::Win32::Networking::WinSock::closesocket(raw_socket as _) }; + e + })?; + Ok(raw_socket) +} diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index 61ae8990..449c86eb 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -1,11 +1,15 @@ -#[cfg(unix)] -use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; #[cfg(windows)] -use std::os::windows::prelude::{AsRawHandle, FromRawSocket, RawHandle}; +use std::os::windows::prelude::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; use std::{ cell::UnsafeCell, io, - net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, + net::{SocketAddr, ToSocketAddrs}, +}; + +#[cfg(unix)] +use { + std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, + std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, }; use super::stream::TcpStream; @@ -25,6 +29,7 @@ pub struct TcpListener { } impl TcpListener { + #[allow(unreachable_code, clippy::diverging_sub_expression, unused_variables)] pub(crate) fn from_shared_fd(fd: SharedFd) -> Self { #[cfg(unix)] let sys_listener = unsafe { std::net::TcpListener::from_raw_fd(fd.raw_fd()) }; @@ -87,7 +92,7 @@ impl TcpListener { let fd = SharedFd::new::(sys_listener.into_raw_fd())?; #[cfg(windows)] - let fd = unimplemented!(); + let fd = SharedFd::new(sys_listener.into_raw_socket())?; Ok(Self::from_shared_fd(fd)) } @@ -246,6 +251,7 @@ impl TcpListener { } /// Creates new `TcpListener` from a `std::net::TcpListener`. + #[cfg(unix)] pub fn from_std(stdl: std::net::TcpListener) -> io::Result { match SharedFd::new::(stdl.as_raw_fd()) { Ok(shared) => { @@ -255,6 +261,18 @@ impl TcpListener { Err(e) => Err(e), } } + + /// Creates new `TcpListener` from a `std::net::TcpListener`. + #[cfg(windows)] + pub fn from_std(stdl: std::net::TcpListener) -> io::Result { + match SharedFd::new(stdl.as_raw_socket()) { + Ok(shared) => { + stdl.into_raw_socket(); + Ok(Self::from_shared_fd(shared)) + } + Err(e) => Err(e), + } + } } impl Stream for TcpListener { @@ -281,9 +299,10 @@ impl AsRawFd for TcpListener { } #[cfg(windows)] -impl AsRawHandle for TcpListener { - fn as_raw_handle(&self) -> RawHandle { - self.fd.raw_handle() +impl AsRawSocket for TcpListener { + #[inline] + fn as_raw_socket(&self) -> RawSocket { + self.fd.raw_socket() } } diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index d081e0a2..2f1a58b7 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -1,7 +1,3 @@ -#[cfg(unix)] -use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -#[cfg(windows)] -use std::os::windows::prelude::{AsRawHandle, IntoRawHandle, RawHandle}; use std::{ cell::UnsafeCell, future::Future, @@ -10,6 +6,17 @@ use std::{ time::Duration, }; +#[cfg(unix)] +use { + libc::{AF_INET, AF_INET6, SOCK_STREAM}, + std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, +}; +#[cfg(windows)] +use { + std::os::windows::prelude::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}, + windows_sys::Win32::Networking::WinSock::{AF_INET, AF_INET6, SOCK_STREAM}, +}; + use crate::{ buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}, driver::{op::Op, shared_fd::SharedFd}, @@ -71,7 +78,7 @@ impl TcpStream { #[cfg(unix)] let meta = StreamMeta::new(fd.raw_fd()); #[cfg(windows)] - let meta = StreamMeta::new(fd.raw_handle()); + let meta = StreamMeta::new(fd.raw_socket()); #[cfg(feature = "zero-copy")] // enable SOCK_ZEROCOPY meta.set_zero_copy(); @@ -102,9 +109,9 @@ impl TcpStream { Self::connect_addr_with_config(addr, &DEFAULT_OPTS).await } - #[cfg(windows)] /// Establish a connection to the specified `addr`. - pub async fn connect_addr(addr: SocketAddr) -> io::Result { + #[cfg(windows)] + pub async fn connect_addr(_addr: SocketAddr) -> io::Result { unimplemented!() } @@ -114,10 +121,13 @@ impl TcpStream { opts: &TcpConnectOpts, ) -> io::Result { let domain = match addr { - SocketAddr::V4(_) => libc::AF_INET, - SocketAddr::V6(_) => libc::AF_INET6, + SocketAddr::V4(_) => AF_INET, + SocketAddr::V6(_) => AF_INET6, }; - let socket = crate::net::new_socket(domain, libc::SOCK_STREAM)?; + #[cfg(unix)] + let socket = crate::net::new_socket(domain, SOCK_STREAM)?; + #[cfg(windows)] + let socket = crate::net::new_socket(domain.into(), SOCK_STREAM)?; #[allow(unused_mut)] let mut tfo = opts.tcp_fast_open; @@ -130,7 +140,10 @@ impl TcpStream { tfo = false; } } + #[cfg(unix)] let op = Op::connect(SharedFd::new::(socket)?, addr, tfo)?; + #[cfg(windows)] + let op = Op::connect(SharedFd::new(socket)?, addr, tfo)?; let completion = op.await; completion.meta.result?; @@ -159,9 +172,16 @@ impl TcpStream { stream.writable(true).await?; // getsockopt libc::SO_ERROR + #[cfg(unix)] let sys_socket = unsafe { std::net::TcpStream::from_raw_fd(stream.fd.raw_fd()) }; + #[cfg(windows)] + let sys_socket = + unsafe { std::net::TcpStream::from_raw_socket(stream.fd.raw_socket()) }; let err = sys_socket.take_error(); + #[cfg(unix)] let _ = sys_socket.into_raw_fd(); + #[cfg(windows)] + let _ = sys_socket.into_raw_socket(); if let Some(e) = err? { return Err(e); } @@ -205,6 +225,7 @@ impl TcpStream { } /// Creates new `TcpStream` from a `std::net::TcpStream`. + #[cfg(unix)] pub fn from_std(stream: std::net::TcpStream) -> io::Result { match SharedFd::new::(stream.as_raw_fd()) { Ok(shared) => { @@ -215,6 +236,18 @@ impl TcpStream { } } + /// Creates new `TcpStream` from a `std::net::TcpStream`. + #[cfg(windows)] + pub fn from_std(stream: std::net::TcpStream) -> io::Result { + match SharedFd::new(stream.as_raw_socket()) { + Ok(shared) => { + stream.into_raw_socket(); + Ok(Self::from_shared_fd(shared)) + } + Err(e) => Err(e), + } + } + /// Wait for read readiness. /// Note: Do not use it before every io. It is different from other runtimes! /// @@ -278,19 +311,20 @@ impl AsRawFd for TcpStream { } #[cfg(windows)] -impl IntoRawHandle for TcpStream { +impl IntoRawSocket for TcpStream { #[inline] - fn into_raw_handle(self) -> RawHandle { + fn into_raw_socket(self) -> RawSocket { self.fd .try_unwrap() .expect("unexpected multiple reference to rawfd") } } + #[cfg(windows)] -impl AsRawHandle for TcpStream { +impl AsRawSocket for TcpStream { #[inline] - fn as_raw_handle(&self) -> RawHandle { - self.fd.raw_handle() + fn as_raw_socket(&self) -> RawSocket { + self.fd.raw_socket() } } @@ -333,8 +367,8 @@ impl AsyncWriteRent for TcpStream { } #[cfg(windows)] - fn shutdown(&mut self) -> impl Future> { - async { unimplemented!() } + async fn shutdown(&mut self) -> std::io::Result<()> { + unimplemented!() } } @@ -392,8 +426,8 @@ impl CancelableAsyncWriteRent for TcpStream { } #[cfg(windows)] - fn cancelable_shutdown(&mut self, _c: CancelHandle) -> impl Future> { - async { unimplemented!() } + async fn cancelable_shutdown(&mut self, _c: CancelHandle) -> io::Result<()> { + unimplemented!() } } @@ -544,8 +578,11 @@ impl StreamMeta { meta: Default::default(), } } + + /// When operating files, we should use RawHandle; + /// When operating sockets, we should use RawSocket; #[cfg(windows)] - fn new(fd: RawHandle) -> Self { + fn new(_: RawSocket) -> Self { unimplemented!() } @@ -593,6 +630,7 @@ impl StreamMeta { self.socket.as_ref().unwrap().set_nodelay(no_delay) } + #[allow(unused_variables)] fn set_tcp_keepalive( &self, time: Option, diff --git a/monoio/src/net/udp.rs b/monoio/src/net/udp.rs index abeb3764..6b0ddded 100644 --- a/monoio/src/net/udp.rs +++ b/monoio/src/net/udp.rs @@ -1,9 +1,12 @@ //! UDP impl. +#[cfg(unix)] +use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd}; +#[cfg(windows)] +use std::os::windows::prelude::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; use std::{ io, net::{SocketAddr, ToSocketAddrs}, - os::fd::{AsRawFd, FromRawFd, IntoRawFd}, }; use crate::{ @@ -66,7 +69,7 @@ impl UdpSocket { #[cfg(unix)] let fd = SharedFd::new::(socket.into_raw_fd())?; #[cfg(windows)] - let fd = unimplemented!(); + let fd = SharedFd::new(socket.into_raw_socket())?; Ok(Self::from_shared_fd(fd)) } @@ -91,9 +94,15 @@ impl UdpSocket { /// Returns the socket address of the remote peer this socket was connected to. pub fn peer_addr(&self) -> io::Result { + #[cfg(unix)] let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) }; + #[cfg(windows)] + let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) }; let addr = socket.peer_addr(); + #[cfg(unix)] socket.into_raw_fd(); + #[cfg(windows)] + socket.into_raw_socket(); addr? .as_socket() .ok_or_else(|| io::ErrorKind::InvalidInput.into()) @@ -101,9 +110,15 @@ impl UdpSocket { /// Returns the socket address that this socket was created from. pub fn local_addr(&self) -> io::Result { + #[cfg(unix)] let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) }; + #[cfg(windows)] + let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) }; let addr = socket.local_addr(); + #[cfg(unix)] socket.into_raw_fd(); + #[cfg(windows)] + socket.into_raw_socket(); addr? .as_socket() .ok_or_else(|| io::ErrorKind::InvalidInput.into()) @@ -133,6 +148,7 @@ impl UdpSocket { } /// Creates new `UdpSocket` from a `std::net::UdpSocket`. + #[cfg(unix)] pub fn from_std(socket: std::net::UdpSocket) -> io::Result { match SharedFd::new::(socket.as_raw_fd()) { Ok(shared) => { @@ -143,19 +159,53 @@ impl UdpSocket { } } + /// Creates new `UdpSocket` from a `std::net::UdpSocket`. + #[cfg(windows)] + pub fn from_std(socket: std::net::UdpSocket) -> io::Result { + match SharedFd::new(socket.as_raw_socket()) { + Ok(shared) => { + socket.into_raw_socket(); + Ok(Self::from_shared_fd(shared)) + } + Err(e) => Err(e), + } + } + /// Set value for the `SO_REUSEADDR` option on this socket. + #[allow(unused_variables)] pub fn set_reuse_address(&self, reuse: bool) -> io::Result<()> { - let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) }; - let r = socket.set_reuse_address(reuse); - socket.into_raw_fd(); + #[cfg(unix)] + let r = { + let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) }; + let r = socket.set_reuse_address(reuse); + socket.into_raw_fd(); + r + }; + #[cfg(windows)] + let r = { + let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) }; + socket.into_raw_socket(); + Ok(()) + }; r } /// Set value for the `SO_REUSEPORT` option on this socket. + #[allow(unused_variables)] pub fn set_reuse_port(&self, reuse: bool) -> io::Result<()> { - let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) }; - let r = socket.set_reuse_port(reuse); - socket.into_raw_fd(); + #[cfg(unix)] + let r = { + let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) }; + let r = socket.set_reuse_port(reuse); + socket.into_raw_fd(); + r + }; + #[cfg(windows)] + let r = { + let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) }; + socket.into_raw_socket(); + Ok(()) + }; r } @@ -190,12 +240,20 @@ impl UdpSocket { } } +#[cfg(unix)] impl AsRawFd for UdpSocket { fn as_raw_fd(&self) -> std::os::fd::RawFd { self.fd.raw_fd() } } +#[cfg(windows)] +impl AsRawSocket for UdpSocket { + fn as_raw_socket(&self) -> RawSocket { + self.fd.raw_socket() + } +} + /// Cancelable related methods impl UdpSocket { /// Receives a single datagram message on the socket. On success, returns the number diff --git a/monoio/src/runtime.rs b/monoio/src/runtime.rs index bffd1a70..e1e4d7bb 100644 --- a/monoio/src/runtime.rs +++ b/monoio/src/runtime.rs @@ -1,6 +1,9 @@ use std::future::Future; -#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] +#[cfg(all( + unix, + any(all(target_os = "linux", feature = "iouring"), feature = "legacy") +))] use crate::time::TimeDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use crate::IoUringDriver; diff --git a/monoio/src/utils/bind_to_cpu_set.rs b/monoio/src/utils/bind_to_cpu_set.rs index fab9218e..64438355 100644 --- a/monoio/src/utils/bind_to_cpu_set.rs +++ b/monoio/src/utils/bind_to_cpu_set.rs @@ -2,6 +2,7 @@ #[cfg(unix)] pub type BindError = nix::Result; +/// Bind error #[cfg(windows)] pub type BindError = std::io::Result; @@ -25,6 +26,7 @@ pub fn bind_to_cpu_set(_: impl IntoIterator) -> BindError<()> { Ok(()) } +/// Bind current thread to given cpus #[cfg(windows)] pub fn bind_to_cpu_set(_: impl IntoIterator) -> BindError<()> { Ok(()) diff --git a/monoio/src/utils/mod.rs b/monoio/src/utils/mod.rs index cd5c32e1..4b2deb0c 100644 --- a/monoio/src/utils/mod.rs +++ b/monoio/src/utils/mod.rs @@ -2,7 +2,9 @@ pub(crate) mod box_into_inner; pub(crate) mod linked_list; +#[allow(dead_code)] pub(crate) mod slab; +#[allow(dead_code)] pub(crate) mod thread_id; pub(crate) mod uring_detect;