Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,22 @@ jobs:
toolchain: ${{ matrix.target == 'i686-pc-windows-gnu' && format('{0}-i686-pc-windows-gnu', matrix.channel) || matrix.channel }}
target: ${{ matrix.target }}
override: true
components: rustfmt
- uses: actions-rs/cargo@v1
components: rustfmt, clippy
- name: Check code format
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- name: Check clippy with default features
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all -- -D warnings
- name: Check clippy with all features
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all --all-features -- -D warnings
- name: Run cargo deny
if: ${{ contains(matrix.os, 'ubuntu') }}
uses: EmbarkStudios/cargo-deny-action@v2
Expand Down
2 changes: 2 additions & 0 deletions core/src/common/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub enum Syscall {
SetFilePointerEx,
#[cfg(windows)]
WaitOnAddress,
#[cfg(windows)]
WSAPoll,
}

impl Syscall {
Expand Down
2 changes: 1 addition & 1 deletion core/src/coroutine/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl<'c> CoroutineLocal<'c> {
pub fn put<V>(&self, key: &'c str, val: V) -> Option<V> {
let v = Box::leak(Box::new(val));
self.0
.insert(key, std::ptr::from_mut::<V>(v) as usize)
.insert(key, std::ptr::from_mut(v) as usize)
.map(|ptr| unsafe { *Box::from_raw((ptr as *mut c_void).cast::<V>()) })
}

Expand Down
5 changes: 2 additions & 3 deletions core/src/net/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<'e> EventLoop<'e> {
let syscall_mask = <Syscall as Into<&str>>::into(syscall).as_ptr() as usize;
let token = thread_id as usize ^ syscall_mask;
if Syscall::nio() != syscall {
eprintln!("{syscall} {token}");
eprintln!("generate token:{token} for {syscall}");
}
token
}
Expand Down Expand Up @@ -255,8 +255,7 @@ impl<'e> EventLoop<'e> {
continue;
}
// resolve completed read/write tasks
let result = cqe.result() as c_longlong;
eprintln!("io_uring finish {token} {result}");
let result = c_longlong::from(cqe.result());
if let Some((_, pair)) = self.syscall_wait_table.remove(&token) {
let (lock, cvar) = &*pair;
let mut pending = lock.lock().expect("lock failed");
Expand Down
12 changes: 8 additions & 4 deletions core/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

/// 做C兼容时会用到
pub type UserFunc = extern "C" fn(usize) -> usize;

cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t};
use std::ffi::{c_longlong, c_void};
}
}

/// 做C兼容时会用到
pub type UserFunc = extern "C" fn(usize) -> usize;

mod selector;

#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::too_many_arguments
)]
#[cfg(all(target_os = "linux", feature = "io_uring"))]
mod operator;

Expand Down
1 change: 0 additions & 1 deletion core/src/net/operator/linux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,6 @@ impl Operator<'_> {
)
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn sendto(
&self,
user_data: usize,
Expand Down
4 changes: 2 additions & 2 deletions core/src/syscall/unix/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
{
break;
}
let mut err: c_int = 0;
let mut err = 0;
unsafe {
let mut len: socklen_t = std::mem::zeroed();
r = libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_ERROR,
(std::ptr::addr_of_mut!(err)).cast::<c_void>(),
std::ptr::addr_of_mut!(err).cast::<c_void>(),
&mut len,
);
}
Expand Down
13 changes: 5 additions & 8 deletions core/src/syscall/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ macro_rules! impl_facade {
let new_state = $crate::common::constants::SyscallState::Executing;
if co.syscall((), syscall, new_state).is_err() {
$crate::error!("{} change to syscall {} {} failed !",
co.name(), syscall, new_state);
co.name(), syscall, new_state
);
}
}
let r = self.inner.$syscall(fn_ptr, $($arg, )*);
Expand All @@ -32,7 +33,7 @@ macro_rules! impl_facade {
$crate::error!("{} change to running state failed !", co.name());
}
}
$crate::info!("exit syscall {}", syscall);
$crate::info!("exit syscall {} {:?}", syscall, r);
r
}
}
Expand Down Expand Up @@ -66,9 +67,7 @@ macro_rules! impl_io_uring {
if co.syscall((), syscall, new_state).is_err() {
$crate::error!(
"{} change to syscall {} {} failed !",
co.name(),
syscall,
new_state
co.name(), syscall, new_state
);
}
}
Expand All @@ -84,9 +83,7 @@ macro_rules! impl_io_uring {
if co.syscall((), syscall, new_state).is_err() {
$crate::error!(
"{} change to syscall {} {} failed !",
co.name(),
syscall,
new_state
co.name(), syscall, new_state
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/syscall/unix/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<I: PollSyscall> PollSyscall for NioPollSyscall<I> {
let mut t = if timeout < 0 { c_int::MAX } else { timeout };
let mut x = 1;
let mut r;
// just check select every x ms
// just check poll every x ms
loop {
r = self.inner.poll(fn_ptr, fds, nfds, 0);
if r != 0 || t == 0 {
Expand Down
20 changes: 15 additions & 5 deletions core/src/syscall/unix/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ trait SelectSyscall {
}

impl_facade!(SelectSyscallFacade, SelectSyscall,
select(nfds: c_int, readfds: *mut fd_set, writefds: *mut fd_set,
errorfds: *mut fd_set, timeout: *mut timeval) -> c_int
select(
nfds: c_int,
readfds: *mut fd_set,
writefds: *mut fd_set,
errorfds: *mut fd_set,
timeout: *mut timeval
) -> c_int
);

#[repr(C)]
Expand Down Expand Up @@ -78,7 +83,7 @@ impl<I: SelectSyscall> SelectSyscall for NioSelectSyscall<I> {
}
let mut x = 1;
let mut r;
// just check poll every x ms
// just check select every x ms
loop {
r = self
.inner
Expand Down Expand Up @@ -111,6 +116,11 @@ impl<I: SelectSyscall> SelectSyscall for NioSelectSyscall<I> {
}

impl_raw!(RawSelectSyscall, SelectSyscall,
select(nfds: c_int, readfds: *mut fd_set, writefds: *mut fd_set,
errorfds: *mut fd_set, timeout: *mut timeval) -> c_int
select(
nfds: c_int,
readfds: *mut fd_set,
writefds: *mut fd_set,
errorfds: *mut fd_set,
timeout: *mut timeval
) -> c_int
);
70 changes: 70 additions & 0 deletions core/src/syscall/windows/WSAPoll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::net::EventLoops;
use once_cell::sync::Lazy;
use std::ffi::{c_int, c_uint};
use std::time::Duration;
use windows_sys::Win32::Networking::WinSock::WSAPOLLFD;

#[must_use]
pub extern "system" fn WSAPoll(
fn_ptr: Option<&extern "system" fn(*mut WSAPOLLFD, c_uint, c_int) -> c_int>,
fds: *mut WSAPOLLFD,
nfds: c_uint,
timeout: c_int,
) -> c_int {
static CHAIN: Lazy<PollSyscallFacade<NioPollSyscall<RawPollSyscall>>> =
Lazy::new(Default::default);
CHAIN.WSAPoll(fn_ptr, fds, nfds, timeout)
}

trait PollSyscall {
extern "system" fn WSAPoll(
&self,
fn_ptr: Option<&extern "system" fn(*mut WSAPOLLFD, c_uint, c_int) -> c_int>,
fds: *mut WSAPOLLFD,
nfds: c_uint,
timeout: c_int,
) -> c_int;
}

impl_facade!(PollSyscallFacade, PollSyscall,
WSAPoll(fds: *mut WSAPOLLFD, nfds: c_uint, timeout: c_int) -> c_int
);

#[repr(C)]
#[derive(Debug, Default)]
struct NioPollSyscall<I: PollSyscall> {
inner: I,
}

impl<I: PollSyscall> PollSyscall for NioPollSyscall<I> {
extern "system" fn WSAPoll(
&self,
fn_ptr: Option<&extern "system" fn(*mut WSAPOLLFD, c_uint, c_int) -> c_int>,
fds: *mut WSAPOLLFD,
nfds: c_uint,
timeout: c_int,
) -> c_int {
let mut t = if timeout < 0 { c_int::MAX } else { timeout };
let mut x = 1;
let mut r;
// just check poll every x ms
loop {
r = self.inner.WSAPoll(fn_ptr, fds, nfds, 0);
if r != 0 || t == 0 {
break;
}
_ = EventLoops::wait_event(Some(Duration::from_millis(t.min(x) as u64)));
if t != c_int::MAX {
t = if t > x { t - x } else { 0 };
}
if x < 16 {
x <<= 1;
}
}
r
}
}

impl_raw!(RawPollSyscall, PollSyscall, windows_sys::Win32::Networking::WinSock,
WSAPoll(fds: *mut WSAPOLLFD, nfds: c_uint, timeout: c_int) -> c_int
);
108 changes: 108 additions & 0 deletions core/src/syscall/windows/connect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::net::EventLoops;
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_errno, set_non_blocking};
use once_cell::sync::Lazy;
use std::ffi::c_int;
use std::io::Error;
use windows_sys::Win32::Networking::WinSock::{getpeername, getsockopt, SO_ERROR, SOCKADDR, SOCKET, SOL_SOCKET, WSAEALREADY, WSAEINPROGRESS, WSAEINTR, WSAETIMEDOUT};

#[must_use]
pub extern "system" fn connect(
fn_ptr: Option<&extern "system" fn(SOCKET, *const SOCKADDR, c_int) -> c_int>,
socket: SOCKET,
address: *const SOCKADDR,
len: c_int,
) -> c_int {
static CHAIN: Lazy<ConnectSyscallFacade<NioConnectSyscall<RawConnectSyscall>>> =
Lazy::new(Default::default);
CHAIN.connect(fn_ptr, socket, address, len)
}

trait ConnectSyscall {
extern "system" fn connect(
&self,
fn_ptr: Option<&extern "system" fn(SOCKET, *const SOCKADDR, c_int) -> c_int>,
fd: SOCKET,
address: *const SOCKADDR,
len: c_int,
) -> c_int;
}

impl_facade!(ConnectSyscallFacade, ConnectSyscall,
connect(fd: SOCKET, address: *const SOCKADDR, len: c_int) -> c_int
);

#[repr(C)]
#[derive(Debug, Default)]
struct NioConnectSyscall<I: ConnectSyscall> {
inner: I,
}

impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
extern "system" fn connect(
&self,
fn_ptr: Option<&extern "system" fn(SOCKET, *const SOCKADDR, c_int) -> c_int>,
fd: SOCKET,
address: *const SOCKADDR,
len: c_int,
) -> c_int {
let blocking = is_blocking(fd);
if blocking {
set_non_blocking(fd);
}
let mut r = self.inner.connect(fn_ptr, fd, address, len);
loop {
if r == 0 {
reset_errno();
break;
}
let errno = Error::last_os_error().raw_os_error();
if errno == Some(WSAEINPROGRESS) || errno == Some(WSAEALREADY) {
//阻塞,直到写事件发生
if EventLoops::wait_write_event(
fd as _,
Some(crate::common::constants::SLICE)
).is_err() {
break;
}
let mut err = 0;
unsafe {
let mut len: c_int = std::mem::zeroed();
r = getsockopt(
fd,
SOL_SOCKET,
SO_ERROR,
std::ptr::addr_of_mut!(err).cast::<u8>(),
&mut len,
);
}
if r != 0 {
r = -1;
break;
}
if err != 0 {
set_errno(err);
r = -1;
break;
};
unsafe {
let mut address = std::mem::zeroed();
let mut address_len = std::mem::zeroed();
r = getpeername(fd, &mut address, &mut address_len);
}
} else if errno != Some(WSAEINTR) {
break;
}
}
if r == -1 && Error::last_os_error().raw_os_error() == Some(WSAETIMEDOUT) {
set_errno(WSAEINPROGRESS.try_into().expect("overflow"));
}
if blocking {
set_blocking(fd);
}
r
}
}

impl_raw!(RawConnectSyscall, ConnectSyscall, windows_sys::Win32::Networking::WinSock,
connect(fd: SOCKET, address: *const SOCKADDR, len: c_int) -> c_int
);
Loading
Loading