Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/common/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub enum Syscall {
kevent,
#[cfg(windows)]
iocp,
setsockopt,
recv,
#[cfg(windows)]
WSARecv,
Expand Down
5 changes: 4 additions & 1 deletion core/src/syscall/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
pub use crate::syscall::{is_blocking, is_non_blocking, set_blocking, set_errno, set_non_blocking};
pub use crate::syscall::{
is_blocking, is_non_blocking, recv_time_limit, send_time_limit, set_blocking, set_errno,
set_non_blocking,
};

pub extern "C" fn reset_errno() {
set_errno(0);
Expand Down
106 changes: 98 additions & 8 deletions core/src/syscall/unix/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::syscall_mod;
use dashmap::DashMap;
use once_cell::sync::Lazy;
use std::ffi::c_int;

macro_rules! impl_facade {
Expand Down Expand Up @@ -131,6 +133,7 @@ macro_rules! impl_nio_read {
if blocking {
$crate::syscall::common::set_non_blocking($fd);
}
let start_time = $crate::common::now();
let mut r;
loop {
r = self.inner.$syscall(fn_ptr, $fd, $($arg, )*);
Expand All @@ -141,9 +144,13 @@ macro_rules! impl_nio_read {
let error_kind = std::io::Error::last_os_error().kind();
if error_kind == std::io::ErrorKind::WouldBlock {
//wait read event
let wait_time = std::time::Duration::from_nanos(start_time
.saturating_add($crate::syscall::common::recv_time_limit($fd))
.saturating_sub($crate::common::now()))
.min($crate::common::constants::SLICE);
if $crate::net::EventLoops::wait_read_event(
$fd,
Some($crate::common::constants::SLICE),
Some(wait_time)
).is_err() {
break;
}
Expand Down Expand Up @@ -182,6 +189,7 @@ macro_rules! impl_nio_read_buf {
if blocking {
$crate::syscall::common::set_non_blocking($fd);
}
let start_time = $crate::common::now();
let mut received = 0;
let mut r = 0;
while received < $len {
Expand All @@ -203,12 +211,14 @@ macro_rules! impl_nio_read_buf {
let error_kind = std::io::Error::last_os_error().kind();
if error_kind == std::io::ErrorKind::WouldBlock {
//wait read event
let wait_time = std::time::Duration::from_nanos(start_time
.saturating_add($crate::syscall::common::recv_time_limit($fd))
.saturating_sub($crate::common::now()))
.min($crate::common::constants::SLICE);
if $crate::net::EventLoops::wait_read_event(
$fd,
Some($crate::common::constants::SLICE),
)
.is_err()
{
Some(wait_time)
).is_err() {
break;
}
} else if error_kind != std::io::ErrorKind::Interrupted {
Expand Down Expand Up @@ -247,6 +257,7 @@ macro_rules! impl_nio_read_iovec {
$crate::syscall::common::set_non_blocking($fd);
}
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
let start_time = $crate::common::now();
let mut length = 0;
let mut received = 0usize;
let mut r = 0;
Expand Down Expand Up @@ -296,9 +307,13 @@ macro_rules! impl_nio_read_iovec {
let error_kind = std::io::Error::last_os_error().kind();
if error_kind == std::io::ErrorKind::WouldBlock {
//wait read event
let wait_time = std::time::Duration::from_nanos(start_time
.saturating_add($crate::syscall::common::recv_time_limit($fd))
.saturating_sub($crate::common::now()))
.min($crate::common::constants::SLICE);
if $crate::net::EventLoops::wait_read_event(
$fd,
Some($crate::common::constants::SLICE)
Some(wait_time)
).is_err() {
std::mem::forget(vec);
if blocking {
Expand Down Expand Up @@ -350,6 +365,7 @@ macro_rules! impl_nio_write_buf {
if blocking {
$crate::syscall::common::set_non_blocking($fd);
}
let start_time = $crate::common::now();
let mut sent = 0;
let mut r = 0;
while sent < $len {
Expand All @@ -371,9 +387,13 @@ macro_rules! impl_nio_write_buf {
let error_kind = std::io::Error::last_os_error().kind();
if error_kind == std::io::ErrorKind::WouldBlock {
//wait write event
let wait_time = std::time::Duration::from_nanos(start_time
.saturating_add($crate::syscall::common::send_time_limit($fd))
.saturating_sub($crate::common::now()))
.min($crate::common::constants::SLICE);
if $crate::net::EventLoops::wait_write_event(
$fd,
Some($crate::common::constants::SLICE),
Some(wait_time),
)
.is_err()
{
Expand Down Expand Up @@ -415,6 +435,7 @@ macro_rules! impl_nio_write_iovec {
$crate::syscall::common::set_non_blocking($fd);
}
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
let start_time = $crate::common::now();
let mut length = 0;
let mut sent = 0usize;
let mut r = 0;
Expand Down Expand Up @@ -458,9 +479,13 @@ macro_rules! impl_nio_write_iovec {
let error_kind = std::io::Error::last_os_error().kind();
if error_kind == std::io::ErrorKind::WouldBlock {
//wait write event
let wait_time = std::time::Duration::from_nanos(start_time
.saturating_add($crate::syscall::common::send_time_limit($fd))
.saturating_sub($crate::common::now()))
.min($crate::common::constants::SLICE);
if $crate::net::EventLoops::wait_write_event(
$fd,
Some($crate::common::constants::SLICE)
Some(wait_time)
).is_err() {
std::mem::forget(vec);
if blocking {
Expand Down Expand Up @@ -541,6 +566,7 @@ syscall_mod!(
shutdown;
sleep;
socket;
setsockopt;
usleep;
write;
writev;
Expand All @@ -551,6 +577,10 @@ syscall_mod!(
unlink
);

static SEND_TIME_LIMIT: Lazy<DashMap<c_int, u64>> = Lazy::new(Default::default);

static RECV_TIME_LIMIT: Lazy<DashMap<c_int, u64>> = Lazy::new(Default::default);

extern "C" {
#[cfg(not(any(target_os = "dragonfly", target_os = "vxworks")))]
#[cfg_attr(
Expand Down Expand Up @@ -636,3 +666,63 @@ pub extern "C" fn is_non_blocking(fd: c_int) -> bool {
}
(flags & libc::O_NONBLOCK) != 0
}

#[must_use]
pub extern "C" fn send_time_limit(fd: c_int) -> u64 {
SEND_TIME_LIMIT.get(&fd).map_or_else(
|| unsafe {
let mut tv: libc::timeval = std::mem::zeroed();
let mut len = size_of::<libc::timeval>() as libc::socklen_t;
assert_eq!(
0,
libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_SNDTIMEO,
std::ptr::from_mut(&mut tv).cast(),
&mut len,
)
);
let mut time_limit = (tv.tv_sec as u64)
.saturating_mul(1_000_000_000)
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));
if 0 == time_limit {
// 取消超时
time_limit = u64::MAX;
}
assert!(SEND_TIME_LIMIT.insert(fd, time_limit).is_none());
time_limit
},
|v| *v.value(),
)
}

#[must_use]
pub extern "C" fn recv_time_limit(fd: c_int) -> u64 {
RECV_TIME_LIMIT.get(&fd).map_or_else(
|| unsafe {
let mut tv: libc::timeval = std::mem::zeroed();
let mut len = size_of::<libc::timeval>() as libc::socklen_t;
assert_eq!(
0,
libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVTIMEO,
std::ptr::from_mut(&mut tv).cast(),
&mut len,
)
);
let mut time_limit = (tv.tv_sec as u64)
.saturating_mul(1_000_000_000)
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));
if 0 == time_limit {
// 取消超时
time_limit = u64::MAX;
}
assert!(RECV_TIME_LIMIT.insert(fd, time_limit).is_none());
time_limit
},
|v| *v.value(),
)
}
82 changes: 82 additions & 0 deletions core/src/syscall/unix/setsockopt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::ffi::{c_int, c_void};
use libc::socklen_t;
use once_cell::sync::Lazy;
use crate::syscall::unix::{RECV_TIME_LIMIT, SEND_TIME_LIMIT};

#[must_use]
pub extern "C" fn setsockopt(
fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *const c_void, socklen_t) -> c_int>,
socket: c_int,
level: c_int,
name: c_int,
value: *const c_void,
option_len: socklen_t
) -> c_int{
static CHAIN: Lazy<SetsockoptSyscallFacade<NioSetsockoptSyscall<RawSetsockoptSyscall>>> =
Lazy::new(Default::default);
CHAIN.setsockopt(fn_ptr, socket, level, name, value, option_len)
}

trait SetsockoptSyscall {
extern "C" fn setsockopt(
&self,
fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *const c_void, socklen_t) -> c_int>,
socket: c_int,
level: c_int,
name: c_int,
value: *const c_void,
option_len: socklen_t
) -> c_int;
}

impl_facade!(SetsockoptSyscallFacade, SetsockoptSyscall,
setsockopt(socket: c_int, level: c_int, name: c_int, value: *const c_void, option_len: socklen_t) -> c_int
);

#[repr(C)]
#[derive(Debug, Default)]
struct NioSetsockoptSyscall<I: SetsockoptSyscall> {
inner: I,
}

impl<I: SetsockoptSyscall> SetsockoptSyscall for NioSetsockoptSyscall<I> {
extern "C" fn setsockopt(
&self,
fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *const c_void, socklen_t) -> c_int>,
socket: c_int,
level: c_int,
name: c_int,
value: *const c_void,
option_len: socklen_t
) -> c_int {
let r= self.inner.setsockopt(fn_ptr, socket, level, name, value, option_len);
if 0 == r && libc::SOL_SOCKET == level {
if libc::SO_SNDTIMEO == name {
let tv = unsafe { &*value.cast::<libc::timeval>() };
let mut time_limit = (tv.tv_sec as u64)
.saturating_mul(1_000_000_000)
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));
if 0 == time_limit {
// 取消超时
time_limit = u64::MAX;
}
assert!(SEND_TIME_LIMIT.insert(socket, time_limit).is_none());
} else if libc::SO_RCVTIMEO == name {
let tv = unsafe { &*value.cast::<libc::timeval>() };
let mut time_limit = (tv.tv_sec as u64)
.saturating_mul(1_000_000_000)
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));
if 0 == time_limit {
// 取消超时
time_limit = u64::MAX;
}
assert!(RECV_TIME_LIMIT.insert(socket, time_limit).is_none());
}
}
r
}
}

impl_raw!(RawSetsockoptSyscall, SetsockoptSyscall,
setsockopt(socket: c_int, level: c_int, name: c_int, value: *const c_void, option_len: socklen_t) -> c_int
);
Loading
Loading