Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,61 @@ fn crate_task(input: i32) {
);
}

pub fn start_server<A: ToSocketAddrs>(
addr: A,
server_finished: Arc<(Mutex<bool>, Condvar)>,
) -> std::io::Result<()> {
let listener = TcpListener::bind(addr)?;
pub fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
let listener = TcpListener::bind(addr).expect("start server failed");
for stream in listener.incoming() {
let mut stream = stream?;
let mut socket = stream.expect("accept new connection failed");
let mut buffer1 = [0; 256];
for _ in 0..3 {
assert_eq!(12, stream.read(&mut buffer1)?);
assert_eq!(12, socket.read(&mut buffer1).expect("recv failed"));
println!("Server Received: {}", String::from_utf8_lossy(&buffer1));
assert_eq!(256, stream.write(&buffer1)?);
assert_eq!(256, socket.write(&buffer1).expect("send failed"));
println!("Server Send");
}
let mut buffer2 = [0; 256];
for _ in 0..3 {
let mut buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)];
assert_eq!(26, stream.read_vectored(&mut buffers)?);
assert_eq!(
26,
socket.read_vectored(&mut buffers).expect("readv failed")
);
println!(
"Server Received Multiple: {}{}",
String::from_utf8_lossy(&buffer1),
String::from_utf8_lossy(&buffer2)
);
let responses = [IoSlice::new(&buffer1), IoSlice::new(&buffer2)];
assert_eq!(512, stream.write_vectored(&responses)?);
assert_eq!(
512,
socket.write_vectored(&responses).expect("writev failed")
);
println!("Server Send Multiple");
}
println!("Server Shutdown Write");
stream.shutdown(Shutdown::Write).map(|()| {
if socket.shutdown(Shutdown::Write).is_ok() {
println!("Server Closed Connection");
})?;
let (lock, cvar) = &*server_finished;
let mut pending = lock.lock().unwrap();
*pending = false;
cvar.notify_one();
let (lock, cvar) = &*server_finished;
let mut pending = lock.lock().unwrap();
*pending = false;
cvar.notify_one();
println!("Server Closed");
return;
}
}
Ok(())
}

pub fn start_client<A: ToSocketAddrs>(addr: A) -> std::io::Result<()> {
let mut stream = connect_timeout(addr, Duration::from_secs(3))?;
pub fn start_client<A: ToSocketAddrs>(addr: A) {
let mut stream = connect_timeout(addr, Duration::from_secs(3)).expect("connect failed");
let mut buffer1 = [0; 256];
for i in 0..3 {
assert_eq!(12, stream.write(format!("RequestPart{i}").as_ref())?);
assert_eq!(
12,
stream
.write(format!("RequestPart{i}").as_ref())
.expect("send failed")
);
println!("Client Send");
assert_eq!(256, stream.read(&mut buffer1)?);
assert_eq!(256, stream.read(&mut buffer1).expect("recv failed"));
println!("Client Received: {}", String::from_utf8_lossy(&buffer1));
}
let mut buffer2 = [0; 256];
Expand All @@ -70,20 +79,22 @@ pub fn start_client<A: ToSocketAddrs>(addr: A) -> std::io::Result<()> {
IoSlice::new(request1.as_ref()),
IoSlice::new(request2.as_ref()),
];
assert_eq!(26, stream.write_vectored(&requests)?);
assert_eq!(26, stream.write_vectored(&requests).expect("writev failed"));
println!("Client Send Multiple");
let mut buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)];
assert_eq!(512, stream.read_vectored(&mut buffers)?);
assert_eq!(
512,
stream.read_vectored(&mut buffers).expect("readv failed")
);
println!(
"Client Received Multiple: {}{}",
String::from_utf8_lossy(&buffer1),
String::from_utf8_lossy(&buffer2)
);
}
println!("Client Shutdown Write");
stream.shutdown(Shutdown::Write).map(|()| {
println!("Client Closed");
})
stream.shutdown(Shutdown::Write).expect("shutdown failed");
println!("Client Closed");
}

fn connect_timeout<A: ToSocketAddrs>(addr: A, timeout: Duration) -> std::io::Result<TcpStream> {
Expand Down Expand Up @@ -111,7 +122,7 @@ pub fn crate_server(
crate_task(1);
let mut data: [u8; 512] = unsafe { std::mem::zeroed() };
data[511] = b'\n';
let listener = TcpListener::bind(format!("127.0.0.1:{port}"))
let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port))
.unwrap_or_else(|_| panic!("bind to 127.0.0.1:{port} failed !"));
server_started.store(true, Ordering::Release);
//invoke by libc::accept
Expand Down Expand Up @@ -205,7 +216,7 @@ pub fn crate_co_server(
crate_task(11);
let mut data: [u8; 512] = unsafe { std::mem::zeroed() };
data[511] = b'\n';
let listener = TcpListener::bind(format!("127.0.0.1:{port}"))
let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port))
.unwrap_or_else(|_| panic!("bind to 127.0.0.1:{port} failed !"));
server_started.store(true, Ordering::Release);
//invoke by libc::accept
Expand Down
10 changes: 2 additions & 8 deletions open-coroutine-core/src/net/selector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ pub(crate) trait Selector<I: Interest, E: Event, S: EventIterator<E>> {
READABLE_RECORDS.remove(&fd).is_some(),
"Clean READABLE_RECORDS failed !"
);
assert!(
READABLE_TOKEN_RECORDS.remove(&fd).is_some(),
"Clean READABLE_TOKEN_RECORDS failed !"
);
_ = READABLE_TOKEN_RECORDS.remove(&fd);
} else {
self.del_event(fd)?;
}
Expand All @@ -158,10 +155,7 @@ pub(crate) trait Selector<I: Interest, E: Event, S: EventIterator<E>> {
WRITABLE_RECORDS.remove(&fd).is_some(),
"Clean WRITABLE_RECORDS failed !"
);
assert!(
WRITABLE_TOKEN_RECORDS.remove(&fd).is_some(),
"Clean WRITABLE_TOKEN_RECORDS failed !"
);
_ = WRITABLE_TOKEN_RECORDS.remove(&fd);
} else {
self.del_event(fd)?;
}
Expand Down
20 changes: 0 additions & 20 deletions open-coroutine-core/src/syscall/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,6 @@ pub extern "C" fn pread(
CHAIN.pread(fn_ptr, fd, buf, count, offset)
}

#[must_use]
pub extern "C" fn readv(
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
CHAIN.readv(fn_ptr, fd, iov, iovcnt)
}

#[must_use]
pub extern "C" fn preadv(
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down Expand Up @@ -156,16 +146,6 @@ pub extern "C" fn pwrite(
CHAIN.pwrite(fn_ptr, fd, buf, count, offset)
}

#[must_use]
pub extern "C" fn writev(
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
CHAIN.writev(fn_ptr, fd, iov, iovcnt)
}

#[must_use]
pub extern "C" fn pwritev(
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down
20 changes: 0 additions & 20 deletions open-coroutine-core/src/syscall/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,6 @@ impl<I: UnixSyscall> UnixSyscall for IoUringLinuxSyscall<I> {
impl_io_uring!(self, pread, fn_ptr, fd, buf, count, offset)
}

extern "C" fn readv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
impl_io_uring!(self, readv, fn_ptr, fd, iov, iovcnt)
}

extern "C" fn preadv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down Expand Up @@ -168,16 +158,6 @@ impl<I: UnixSyscall> UnixSyscall for IoUringLinuxSyscall<I> {
impl_io_uring!(self, pwrite, fn_ptr, fd, buf, count, offset)
}

extern "C" fn writev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
impl_io_uring!(self, writev, fn_ptr, fd, iov, iovcnt)
}

extern "C" fn pwritev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down
16 changes: 0 additions & 16 deletions open-coroutine-core/src/syscall/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ pub trait UnixSyscall {
offset: off_t,
) -> ssize_t;

extern "C" fn readv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t;

extern "C" fn preadv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down Expand Up @@ -155,14 +147,6 @@ pub trait UnixSyscall {
offset: off_t,
) -> ssize_t;

extern "C" fn writev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t;

extern "C" fn pwritev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down
20 changes: 0 additions & 20 deletions open-coroutine-core/src/syscall/nio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,16 +397,6 @@ impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
impl_expected_read_hook!(self.inner, pread, fn_ptr, fd, buf, count, offset)
}

extern "C" fn readv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
impl_expected_batch_read_hook!(self.inner, readv, fn_ptr, fd, iov, iovcnt,)
}

extern "C" fn preadv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down Expand Up @@ -561,16 +551,6 @@ impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
impl_expected_write_hook!(self.inner, pwrite, fn_ptr, fd, buf, count, offset)
}

extern "C" fn writev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
impl_expected_batch_write_hook!(self.inner, writev, fn_ptr, fd, iov, iovcnt,)
}

extern "C" fn pwritev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down
28 changes: 0 additions & 28 deletions open-coroutine-core/src/syscall/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,6 @@ impl UnixSyscall for RawLinuxSyscall {
}
}

extern "C" fn readv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
if let Some(f) = fn_ptr {
(f)(fd, iov, iovcnt)
} else {
unsafe { libc::readv(fd, iov, iovcnt) }
}
}

extern "C" fn preadv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down Expand Up @@ -203,20 +189,6 @@ impl UnixSyscall for RawLinuxSyscall {
}
}

extern "C" fn writev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
if let Some(f) = fn_ptr {
(f)(fd, iov, iovcnt)
} else {
unsafe { libc::writev(fd, iov, iovcnt) }
}
}

extern "C" fn pwritev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down
20 changes: 0 additions & 20 deletions open-coroutine-core/src/syscall/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ impl<I: UnixSyscall> UnixSyscall for StateLinuxSyscall<I> {
syscall_state!(self, pread, fn_ptr, fd, buf, count, offset)
}

extern "C" fn readv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
syscall_state!(self, readv, fn_ptr, fd, iov, iovcnt)
}

extern "C" fn preadv(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down Expand Up @@ -178,16 +168,6 @@ impl<I: UnixSyscall> UnixSyscall for StateLinuxSyscall<I> {
syscall_state!(self, pwrite, fn_ptr, fd, buf, count, offset)
}

extern "C" fn writev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int) -> ssize_t>,
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
) -> ssize_t {
syscall_state!(self, writev, fn_ptr, fd, iov, iovcnt)
}

extern "C" fn pwritev(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
Expand Down
Loading