From 4f20e4833de95eeb5629058d358950a6e3f609a5 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Sat, 4 Jan 2025 22:55:04 +0800 Subject: [PATCH] add sendmsg/recvmsg test --- open-coroutine/examples/socket_co.rs | 57 +++++++++++++++++++++ open-coroutine/examples/socket_co_client.rs | 56 ++++++++++++++++++++ open-coroutine/examples/socket_co_server.rs | 57 +++++++++++++++++++++ open-coroutine/examples/socket_not_co.rs | 56 ++++++++++++++++++++ 4 files changed, 226 insertions(+) diff --git a/open-coroutine/examples/socket_co.rs b/open-coroutine/examples/socket_co.rs index ca854fdb..0680b85b 100644 --- a/open-coroutine/examples/socket_co.rs +++ b/open-coroutine/examples/socket_co.rs @@ -1,6 +1,8 @@ use open_coroutine::task; use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Read, Write}; use std::net::{Shutdown, TcpListener, ToSocketAddrs}; +#[cfg(unix)] +use std::os::fd::AsRawFd; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; @@ -36,6 +38,30 @@ pub fn start_co_server(addr: A, server_finished: Arc<(Mutex(), + msg_iovlen: buffers.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + assert_eq!(26, unsafe { + libc::recvmsg(socket.as_raw_fd(), &mut msg, 0) + }); + eprintln!( + "Server Received Message: {} {}", + String::from_utf8_lossy(&buffer1), + String::from_utf8_lossy(&buffer2) + ); + assert_eq!(512, unsafe { libc::sendmsg(socket.as_raw_fd(), &msg, 0) }); + eprintln!("Server Send Message"); + } println!("Server Shutdown Write"); if socket.shutdown(Shutdown::Write).is_ok() { println!("Server Closed Connection"); @@ -87,6 +113,37 @@ pub fn start_co_client(addr: A) { String::from_utf8_lossy(&buffer2) ); } + #[cfg(unix)] + for i in 0..3 { + let mut request1 = format!("MessagePart{i}1").into_bytes(); + let mut request2 = format!("MessagePart{i}2").into_bytes(); + let mut buffers = [ + IoSliceMut::new(request1.as_mut_slice()), + IoSliceMut::new(request2.as_mut_slice()), + ]; + let mut msg = libc::msghdr { + msg_name: std::ptr::null_mut(), + msg_namelen: 0, + msg_iov: buffers.as_mut_ptr().cast::(), + msg_iovlen: buffers.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + assert_eq!(26, unsafe { libc::sendmsg(stream.as_raw_fd(), &msg, 0) }); + eprintln!("Client Send Message"); + buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)]; + msg.msg_iov = buffers.as_mut_ptr().cast::(); + msg.msg_iovlen = buffers.len() as _; + assert_eq!(512, unsafe { + libc::recvmsg(stream.as_raw_fd(), &mut msg, 0) + }); + eprintln!( + "Client Received Message: {}{}", + String::from_utf8_lossy(&buffer1), + String::from_utf8_lossy(&buffer2) + ); + } println!("Client Shutdown Write"); stream.shutdown(Shutdown::Write).expect("shutdown failed"); println!("Client Closed"); diff --git a/open-coroutine/examples/socket_co_client.rs b/open-coroutine/examples/socket_co_client.rs index a7d497ee..82df216b 100644 --- a/open-coroutine/examples/socket_co_client.rs +++ b/open-coroutine/examples/socket_co_client.rs @@ -1,6 +1,8 @@ use open_coroutine::task; use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Read, Write}; use std::net::{Shutdown, TcpListener, ToSocketAddrs}; +#[cfg(unix)] +use std::os::fd::AsRawFd; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; @@ -34,6 +36,29 @@ pub fn start_server(addr: A, server_finished: Arc<(Mutex ); println!("Server Send Multiple"); } + #[cfg(unix)] + for _ in 0..3 { + let mut buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)]; + let mut msg = libc::msghdr { + msg_name: std::ptr::null_mut(), + msg_namelen: 0, + msg_iov: buffers.as_mut_ptr().cast::(), + msg_iovlen: buffers.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + assert_eq!(26, unsafe { + libc::recvmsg(socket.as_raw_fd(), &mut msg, 0) + }); + eprintln!( + "Server Received Message: {} {}", + String::from_utf8_lossy(&buffer1), + String::from_utf8_lossy(&buffer2) + ); + assert_eq!(512, unsafe { libc::sendmsg(socket.as_raw_fd(), &msg, 0) }); + eprintln!("Server Send Message"); + } println!("Server Shutdown Write"); if socket.shutdown(Shutdown::Write).is_ok() { println!("Server Closed Connection"); @@ -83,6 +108,37 @@ pub fn start_co_client(addr: A) { String::from_utf8_lossy(&buffer2) ); } + #[cfg(unix)] + for i in 0..3 { + let mut request1 = format!("MessagePart{i}1").into_bytes(); + let mut request2 = format!("MessagePart{i}2").into_bytes(); + let mut buffers = [ + IoSliceMut::new(request1.as_mut_slice()), + IoSliceMut::new(request2.as_mut_slice()), + ]; + let mut msg = libc::msghdr { + msg_name: std::ptr::null_mut(), + msg_namelen: 0, + msg_iov: buffers.as_mut_ptr().cast::(), + msg_iovlen: buffers.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + assert_eq!(26, unsafe { libc::sendmsg(stream.as_raw_fd(), &msg, 0) }); + eprintln!("Client Send Message"); + buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)]; + msg.msg_iov = buffers.as_mut_ptr().cast::(); + msg.msg_iovlen = buffers.len() as _; + assert_eq!(512, unsafe { + libc::recvmsg(stream.as_raw_fd(), &mut msg, 0) + }); + eprintln!( + "Client Received Message: {}{}", + String::from_utf8_lossy(&buffer1), + String::from_utf8_lossy(&buffer2) + ); + } println!("Client Shutdown Write"); stream.shutdown(Shutdown::Write).expect("shutdown failed"); println!("Client Closed"); diff --git a/open-coroutine/examples/socket_co_server.rs b/open-coroutine/examples/socket_co_server.rs index 888af1d5..c14d7ed3 100644 --- a/open-coroutine/examples/socket_co_server.rs +++ b/open-coroutine/examples/socket_co_server.rs @@ -1,6 +1,8 @@ use open_coroutine::task; use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Read, Write}; use std::net::{Shutdown, TcpListener, ToSocketAddrs}; +#[cfg(unix)] +use std::os::fd::AsRawFd; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; @@ -36,6 +38,30 @@ pub fn start_co_server(addr: A, server_finished: Arc<(Mutex(), + msg_iovlen: buffers.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + assert_eq!(26, unsafe { + libc::recvmsg(socket.as_raw_fd(), &mut msg, 0) + }); + eprintln!( + "Server Received Message: {} {}", + String::from_utf8_lossy(&buffer1), + String::from_utf8_lossy(&buffer2) + ); + assert_eq!(512, unsafe { libc::sendmsg(socket.as_raw_fd(), &msg, 0) }); + eprintln!("Server Send Message"); + } println!("Server Shutdown Write"); if socket.shutdown(Shutdown::Write).is_ok() { println!("Server Closed Connection"); @@ -87,6 +113,37 @@ pub fn start_client(addr: A) { String::from_utf8_lossy(&buffer2) ); } + #[cfg(unix)] + for i in 0..3 { + let mut request1 = format!("MessagePart{i}1").into_bytes(); + let mut request2 = format!("MessagePart{i}2").into_bytes(); + let mut buffers = [ + IoSliceMut::new(request1.as_mut_slice()), + IoSliceMut::new(request2.as_mut_slice()), + ]; + let mut msg = libc::msghdr { + msg_name: std::ptr::null_mut(), + msg_namelen: 0, + msg_iov: buffers.as_mut_ptr().cast::(), + msg_iovlen: buffers.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + assert_eq!(26, unsafe { libc::sendmsg(stream.as_raw_fd(), &msg, 0) }); + eprintln!("Client Send Message"); + buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)]; + msg.msg_iov = buffers.as_mut_ptr().cast::(); + msg.msg_iovlen = buffers.len() as _; + assert_eq!(512, unsafe { + libc::recvmsg(stream.as_raw_fd(), &mut msg, 0) + }); + eprintln!( + "Client Received Message: {}{}", + String::from_utf8_lossy(&buffer1), + String::from_utf8_lossy(&buffer2) + ); + } println!("Client Shutdown Write"); stream.shutdown(Shutdown::Write).expect("shutdown failed"); println!("Client Closed"); diff --git a/open-coroutine/examples/socket_not_co.rs b/open-coroutine/examples/socket_not_co.rs index 4b71eeff..c162f006 100644 --- a/open-coroutine/examples/socket_not_co.rs +++ b/open-coroutine/examples/socket_not_co.rs @@ -1,5 +1,7 @@ use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Read, Write}; use std::net::{Shutdown, TcpListener, ToSocketAddrs}; +#[cfg(unix)] +use std::os::fd::AsRawFd; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; @@ -33,6 +35,29 @@ fn start_server(addr: A, server_finished: Arc<(Mutex, Co ); eprintln!("Server Send Multiple"); } + #[cfg(unix)] + for _ in 0..3 { + let mut buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)]; + let mut msg = libc::msghdr { + msg_name: std::ptr::null_mut(), + msg_namelen: 0, + msg_iov: buffers.as_mut_ptr().cast::(), + msg_iovlen: buffers.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + assert_eq!(26, unsafe { + libc::recvmsg(socket.as_raw_fd(), &mut msg, 0) + }); + eprintln!( + "Server Received Message: {} {}", + String::from_utf8_lossy(&buffer1), + String::from_utf8_lossy(&buffer2) + ); + assert_eq!(512, unsafe { libc::sendmsg(socket.as_raw_fd(), &msg, 0) }); + eprintln!("Server Send Message"); + } eprintln!("Server Shutdown Write"); if socket.shutdown(Shutdown::Write).is_ok() { eprintln!("Server Closed Connection"); @@ -82,6 +107,37 @@ fn start_client(addr: A) { String::from_utf8_lossy(&buffer2) ); } + #[cfg(unix)] + for i in 0..3 { + let mut request1 = format!("MessagePart{i}1").into_bytes(); + let mut request2 = format!("MessagePart{i}2").into_bytes(); + let mut buffers = [ + IoSliceMut::new(request1.as_mut_slice()), + IoSliceMut::new(request2.as_mut_slice()), + ]; + let mut msg = libc::msghdr { + msg_name: std::ptr::null_mut(), + msg_namelen: 0, + msg_iov: buffers.as_mut_ptr().cast::(), + msg_iovlen: buffers.len() as _, + msg_control: std::ptr::null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + assert_eq!(26, unsafe { libc::sendmsg(stream.as_raw_fd(), &msg, 0) }); + eprintln!("Client Send Message"); + buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)]; + msg.msg_iov = buffers.as_mut_ptr().cast::(); + msg.msg_iovlen = buffers.len() as _; + assert_eq!(512, unsafe { + libc::recvmsg(stream.as_raw_fd(), &mut msg, 0) + }); + eprintln!( + "Client Received Message: {}{}", + String::from_utf8_lossy(&buffer1), + String::from_utf8_lossy(&buffer2) + ); + } eprintln!("Client Shutdown Write"); stream.shutdown(Shutdown::Write).expect("shutdown failed"); eprintln!("Client Closed");