Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion open-coroutine-core/src/net/event_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ static EVENT_LOOP_WORKERS: OnceCell<Box<[std::thread::JoinHandle<()>]>> = OnceCe

static EVENT_LOOP_STARTED: Lazy<AtomicBool> = Lazy::new(AtomicBool::default);

static EVENT_LOOP_START_COUNT: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));

static EVENT_LOOP_STOP: Lazy<Arc<(Mutex<AtomicUsize>, Condvar)>> =
Lazy::new(|| Arc::new((Mutex::new(AtomicUsize::new(0)), Condvar::new())));

Expand Down Expand Up @@ -151,6 +153,7 @@ impl EventLoops {
.name(format!("open-coroutine-event-loop-{i}"))
.spawn(move || {
warn!("open-coroutine-event-loop-{i} has started");
_ = EVENT_LOOP_START_COUNT.fetch_add(1, Ordering::Release);
if set_for_current(CoreId { id: i }) {
warn!("pin event-loop-{i} thread to CPU core-{i} failed !");
}
Expand Down Expand Up @@ -185,7 +188,10 @@ impl EventLoops {
.wait_timeout_while(
lock.lock().unwrap(),
Duration::from_millis(30000),
|stopped| stopped.load(Ordering::Acquire) < unsafe { EVENT_LOOPS.len() } - 1,
|stopped| {
stopped.load(Ordering::Acquire)
< EVENT_LOOP_START_COUNT.load(Ordering::Acquire) - 1
},
)
.unwrap()
.1;
Expand Down
24 changes: 1 addition & 23 deletions open-coroutine-core/src/syscall/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::syscall::LinuxSyscall;
use crate::syscall::UnixSyscall;
#[cfg(target_os = "linux")]
use libc::epoll_event;
use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
use once_cell::sync::Lazy;
use std::ffi::{c_int, c_void};

Expand Down Expand Up @@ -44,17 +44,6 @@ pub extern "C" fn pread(
CHAIN.pread(fn_ptr, fd, buf, count, offset)
}

#[must_use]
pub extern "C" fn preadv(
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
) -> ssize_t {
CHAIN.preadv(fn_ptr, fd, iov, iovcnt, offset)
}

#[must_use]
pub extern "C" fn recvmsg(
fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>,
Expand Down Expand Up @@ -103,17 +92,6 @@ pub extern "C" fn pwrite(
CHAIN.pwrite(fn_ptr, fd, buf, count, offset)
}

#[must_use]
pub extern "C" fn pwritev(
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
) -> ssize_t {
CHAIN.pwritev(fn_ptr, fd, iov, iovcnt, offset)
}

#[must_use]
pub extern "C" fn sendmsg(
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,
Expand Down
24 changes: 1 addition & 23 deletions open-coroutine-core/src/syscall/io_uring.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::syscall::LinuxSyscall;
use crate::syscall::UnixSyscall;
use libc::epoll_event;
use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
use std::ffi::{c_int, c_void};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -46,17 +46,6 @@ impl<I: UnixSyscall> UnixSyscall for IoUringLinuxSyscall<I> {
impl_io_uring!(self, pread, fn_ptr, fd, buf, count, offset)
}

extern "C" fn preadv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
) -> ssize_t {
impl_io_uring!(self, preadv, fn_ptr, fd, iov, iovcnt, offset)
}

extern "C" fn recvmsg(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>,
Expand Down Expand Up @@ -110,17 +99,6 @@ impl<I: UnixSyscall> UnixSyscall for IoUringLinuxSyscall<I> {
impl_io_uring!(self, pwrite, fn_ptr, fd, buf, count, offset)
}

extern "C" fn pwritev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
) -> ssize_t {
impl_io_uring!(self, pwritev, fn_ptr, fd, iov, iovcnt, offset)
}

extern "C" fn sendmsg(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,
Expand Down
20 changes: 1 addition & 19 deletions open-coroutine-core/src/syscall/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(target_os = "linux")]
use libc::epoll_event;
#[cfg(unix)]
use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
#[cfg(unix)]
use std::ffi::{c_int, c_void};

Expand Down Expand Up @@ -47,15 +47,6 @@ pub trait UnixSyscall {
offset: off_t,
) -> ssize_t;

extern "C" fn preadv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
) -> ssize_t;

extern "C" fn recvmsg(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>,
Expand Down Expand Up @@ -103,15 +94,6 @@ pub trait UnixSyscall {
offset: off_t,
) -> ssize_t;

extern "C" fn pwritev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
) -> ssize_t;

extern "C" fn sendmsg(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,
Expand Down
170 changes: 0 additions & 170 deletions open-coroutine-core/src/syscall/nio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,80 +61,6 @@ macro_rules! impl_expected_read_hook {
}};
}

macro_rules! impl_expected_batch_read_hook {
( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $iov:expr, $length:expr, $($arg: expr),* $(,)* ) => {{
let socket = $socket;
let blocking = $crate::syscall::common::is_blocking(socket);
if blocking {
$crate::syscall::common::set_non_blocking(socket);
}
let mut vec = std::collections::VecDeque::from(unsafe {
Vec::from_raw_parts($iov.cast_mut(), $length as usize, $length as usize)
});
let mut length = 0;
let mut pices = std::collections::VecDeque::new();
for iovec in &vec {
length += iovec.iov_len;
pices.push_back(length);
}
let mut received = 0;
let mut r = 0;
while received < length {
// find from-index
let mut from_index = 0;
for (i, v) in pices.iter().enumerate() {
if received < *v {
from_index = i;
break;
}
}
// calculate offset
let current_received_offset = if from_index > 0 {
received.saturating_sub(pices[from_index.saturating_sub(1)])
} else {
received
};
// remove already received
for _ in 0..from_index {
_ = vec.pop_front();
_ = pices.pop_front();
}
// build syscall args
vec[0] = iovec {
iov_base: (vec[0].iov_base as usize + current_received_offset) as *mut c_void,
iov_len: vec[0].iov_len - current_received_offset,
};
r = $invoker.$syscall($fn_ptr, $socket, vec.get(0).unwrap(), c_int::try_from(vec.len()).unwrap(), $($arg, )*);
if r != -1 {
$crate::syscall::common::reset_errno();
received += r as usize;
if received >= length || r == 0 {
r = received as ssize_t;
break;
}
}
let error_kind = std::io::Error::last_os_error().kind();
if error_kind == std::io::ErrorKind::WouldBlock {
//wait read event
if $crate::net::event_loop::EventLoops::wait_read_event(
socket,
Some(std::time::Duration::from_millis(10)),
)
.is_err()
{
break;
}
} else if error_kind != std::io::ErrorKind::Interrupted {
break;
}
}
if blocking {
$crate::syscall::common::set_blocking(socket);
}
r
}};
}

macro_rules! impl_expected_write_hook {
( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $buffer:expr, $length:expr, $($arg: expr),* $(,)* ) => {{
let socket = $socket;
Expand Down Expand Up @@ -182,80 +108,6 @@ macro_rules! impl_expected_write_hook {
}};
}

macro_rules! impl_expected_batch_write_hook {
( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $iov:expr, $length:expr, $($arg: expr),* $(,)* ) => {{
let socket = $socket;
let blocking = $crate::syscall::common::is_blocking(socket);
if blocking {
$crate::syscall::common::set_non_blocking(socket);
}
let mut vec = std::collections::VecDeque::from(unsafe {
Vec::from_raw_parts($iov.cast_mut(), $length as usize, $length as usize)
});
let mut length = 0;
let mut pices = std::collections::VecDeque::new();
for iovec in &vec {
length += iovec.iov_len;
pices.push_back(length);
}
let mut sent = 0;
let mut r = 0;
while sent < length {
// find from-index
let mut from_index = 0;
for (i, v) in pices.iter().enumerate() {
if sent < *v {
from_index = i;
break;
}
}
// calculate offset
let current_sent_offset = if from_index > 0 {
sent.saturating_sub(pices[from_index.saturating_sub(1)])
} else {
sent
};
// remove already sent
for _ in 0..from_index {
_ = vec.pop_front();
_ = pices.pop_front();
}
// build syscall args
vec[0] = iovec {
iov_base: (vec[0].iov_base as usize + current_sent_offset) as *mut c_void,
iov_len: vec[0].iov_len - current_sent_offset,
};
r = $invoker.$syscall($fn_ptr, $socket, vec.get(0).unwrap(), c_int::try_from(vec.len()).unwrap(), $($arg, )*);
if r != -1 {
$crate::syscall::common::reset_errno();
sent += r as usize;
if sent >= length {
r = sent as ssize_t;
break;
}
}
let error_kind = std::io::Error::last_os_error().kind();
if error_kind == std::io::ErrorKind::WouldBlock {
//wait write event
if $crate::net::event_loop::EventLoops::wait_write_event(
socket,
Some(std::time::Duration::from_millis(10)),
)
.is_err()
{
break;
}
} else if error_kind != std::io::ErrorKind::Interrupted {
break;
}
}
if blocking {
$crate::syscall::common::set_blocking(socket);
}
r
}};
}

impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
extern "C" fn read(
&self,
Expand All @@ -278,17 +130,6 @@ impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
impl_expected_read_hook!(self.inner, pread, fn_ptr, fd, buf, count, offset)
}

extern "C" fn preadv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
) -> ssize_t {
impl_expected_batch_read_hook!(self.inner, preadv, fn_ptr, fd, iov, iovcnt, offset)
}

extern "C" fn recvmsg(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>,
Expand Down Expand Up @@ -432,17 +273,6 @@ impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
impl_expected_write_hook!(self.inner, pwrite, fn_ptr, fd, buf, count, offset)
}

extern "C" fn pwritev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
) -> ssize_t {
impl_expected_batch_write_hook!(self.inner, pwritev, fn_ptr, fd, iov, iovcnt, offset)
}

extern "C" fn sendmsg(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,
Expand Down
Loading