Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ci-preemptive.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive,ci
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive,ci --release
fi

# test IOCP
if [ "${OS}" = "windows-latest" ]; then
cd "${PROJECT_DIR}"/core
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci --release
cd "${PROJECT_DIR}"/open-coroutine
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci --release
fi
10 changes: 10 additions & 0 deletions .github/workflows/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,ci
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,ci --release
fi

# test IOCP
if [ "${OS}" = "windows-latest" ]; then
cd "${PROJECT_DIR}"/core
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci --release
cd "${PROJECT_DIR}"/open-coroutine
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci --release
fi
7 changes: 7 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ windows-sys = { workspace = true, features = [
"Win32_Networking_WinSock",
"Win32_System_SystemInformation",
"Win32_System_Diagnostics_Debug",
"Win32_System_WindowsProgramming",
] }
polling = { workspace = true, optional = true }

Expand Down Expand Up @@ -95,5 +96,11 @@ net = ["korosensei", "polling", "mio", "crossbeam-utils", "core_affinity"]
# Provide io_uring adaptation, this feature only works in linux.
io_uring = ["net", "io-uring"]

# Provide IOCP adaptation, this feature only works in windows.
iocp = ["net"]

# Provide completion IO adaptation
completion_io = ["io_uring", "iocp"]

# Provide syscall implementation.
syscall = ["net"]
84 changes: 80 additions & 4 deletions core/src/net/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,34 @@ cfg_if::cfg_if! {
}
}

cfg_if::cfg_if! {
if #[cfg(all(windows, feature = "iocp"))] {
use dashmap::DashMap;
use std::ffi::{c_longlong, c_uint};
use windows_sys::core::{PCSTR, PSTR};
use windows_sys::Win32::Networking::WinSock::{
LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, WSABUF,
};
use windows_sys::Win32::System::IO::OVERLAPPED;
}
}

#[repr(C)]
#[derive(Debug)]
pub(crate) struct EventLoop<'e> {
stop: Arc<(Mutex<bool>, Condvar)>,
shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
cpu: usize,
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
operator: crate::net::operator::Operator<'e>,
#[allow(clippy::type_complexity)]
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
syscall_wait_table: DashMap<usize, Arc<(Mutex<Option<c_longlong>>, Condvar)>>,
selector: Poller,
pool: CoroutinePool<'e>,
Expand Down Expand Up @@ -87,9 +105,15 @@ impl<'e> EventLoop<'e> {
stop: Arc::new((Mutex::new(false), Condvar::new())),
shared_stop,
cpu,
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
operator: crate::net::operator::Operator::new(cpu)?,
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
syscall_wait_table: DashMap::new(),
selector: Poller::new()?,
pool: CoroutinePool::new(name, stack_size, min_size, max_size, keep_alive_time),
Expand Down Expand Up @@ -222,6 +246,8 @@ impl<'e> EventLoop<'e> {
cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
left_time = self.adapt_io_uring(left_time)?;
} else if #[cfg(all(windows, feature = "iocp"))] {
left_time = self.adapt_iocp(left_time)?;
}
}

Expand Down Expand Up @@ -267,6 +293,28 @@ impl<'e> EventLoop<'e> {
Ok(left_time)
}

#[cfg(all(windows, feature = "iocp"))]
fn adapt_iocp(&self, mut left_time: Option<Duration>) -> std::io::Result<Option<Duration>> {
// use IOCP
let (count, mut cq, left) = self.operator.select(left_time, 0)?;
if count > 0 {
for cqe in &mut cq {
let token = cqe.token;
if let Some((_, pair)) = self.syscall_wait_table.remove(&token) {
let (lock, cvar) = &*pair;
let mut pending = lock.lock().expect("lock failed");
*pending = Some(cqe.result);
cvar.notify_one();
}
unsafe { self.resume(token) };
}
}
if left != left_time {
left_time = Some(left.unwrap_or(Duration::ZERO));
}
Ok(left_time)
}

unsafe fn resume(&self, token: usize) {
if COROUTINE_TOKENS.remove(&token).is_none() {
return;
Expand Down Expand Up @@ -446,6 +494,34 @@ impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);

macro_rules! impl_iocp {
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
#[cfg(all(windows, feature = "iocp"))]
impl EventLoop<'_> {
#[allow(non_snake_case, clippy::too_many_arguments)]
pub(super) fn $syscall(
&self,
$($arg: $arg_type),*
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
let token = EventLoop::token(SyscallName::$syscall);
self.operator.$syscall(token, $($arg, )*)?;
let arc = Arc::new((Mutex::new(None), Condvar::new()));
assert!(
self.syscall_wait_table.insert(token, arc.clone()).is_none(),
"The previous token was not retrieved in a timely manner"
);
Ok(arc)
}
}
}
}

impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);

#[cfg(all(test, not(all(unix, feature = "preemptive"))))]
mod tests {
use crate::net::event_loop::EventLoop;
Expand Down
37 changes: 36 additions & 1 deletion core/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,27 @@ cfg_if::cfg_if! {
}
}

cfg_if::cfg_if! {
if #[cfg(all(windows, feature = "iocp"))] {
use std::ffi::c_uint;
use windows_sys::core::{PCSTR, PSTR};
use windows_sys::Win32::Networking::WinSock::{
LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, WSABUF,
};
use windows_sys::Win32::System::IO::OVERLAPPED;
}
}

/// 做C兼容时会用到
pub type UserFunc = extern "C" fn(usize) -> usize;

mod selector;

#[allow(clippy::too_many_arguments)]
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[cfg(any(
all(target_os = "linux", feature = "io_uring"),
all(windows, feature = "iocp")
))]
mod operator;

#[allow(missing_docs)]
Expand Down Expand Up @@ -280,3 +294,24 @@ impl_io_uring!(fsync(fd: c_int) -> c_int);
impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);

macro_rules! impl_iocp {
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
#[allow(non_snake_case)]
#[cfg(all(windows, feature = "iocp"))]
impl EventLoops {
#[allow(missing_docs)]
pub fn $syscall(
$($arg: $arg_type),*
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
Self::event_loop().$syscall($($arg, )*)
}
}
}
}

impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
6 changes: 6 additions & 0 deletions core/src/net/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@
mod linux;
#[cfg(all(target_os = "linux", feature = "io_uring"))]
pub(crate) use linux::*;

#[allow(non_snake_case)]
#[cfg(all(windows, feature = "iocp"))]
mod windows;
#[cfg(all(windows, feature = "iocp"))]
pub(crate) use windows::*;
Loading
Loading