Skip to content

Commit

Permalink
Add async socket layer
Browse files Browse the repository at this point in the history
  • Loading branch information
evanrittenhouse committed Apr 24, 2024
1 parent 0b0fedf commit f73a965
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 27 deletions.
5 changes: 1 addition & 4 deletions dgram/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ name = "dgram"
version = "0.1.0"
edition = "2021"

[features]
async = []

[dependencies]
libc = "0.2.76"
nix = "0.26.2"
smallvec = { version = "1.10", features = ["union"] }

tokio = { version = "1.29", features = ["full", "test-util"] }
94 changes: 94 additions & 0 deletions dgram/src/async_socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::syscalls::RecvData;
use std::io::ErrorKind;
use std::io::Result;
use std::net::SocketAddr;
use std::os::fd::AsRawFd;

Check failure on line 5 in dgram/src/async_socket.rs

View workflow job for this annotation

GitHub Actions / quiche_windows (x86_64-pc-windows-msvc)

unresolved import `std::os::fd`

Check failure on line 5 in dgram/src/async_socket.rs

View workflow job for this annotation

GitHub Actions / quiche_windows (i686-pc-windows-msvc)

unresolved import `std::os::fd`
use std::time::Instant;
use tokio::net::UdpSocket;

#[cfg(target_os = "linux")]
mod linux_imports {
pub(super) use crate::syscalls::recv_msg;
pub(super) use crate::syscalls::send_msg;
pub(super) use nix::sys::socket::MsgFlags;
pub(super) use nix::sys::socket::SockaddrStorage;
pub(super) use tokio::io::Interest;
}

#[cfg(target_os = "linux")]
use self::linux_imports::*;

#[cfg(target_os = "linux")]
pub async fn send_to(
socket: &UdpSocket, send_buf: &[u8], segment_size: usize, num_pkts: usize,
tx_time: Option<Instant>, client_addr: &SocketAddr,
) -> Result<usize> {
loop {
// Important to use try_io so that Tokio can clear the socket's readiness
// flag
let res = socket.try_io(Interest::WRITABLE, || {
let fd = socket.as_raw_fd();
send_msg(
fd,
send_buf,
segment_size,
num_pkts,
tx_time,
&SockaddrStorage::from(*client_addr),
)
.map_err(Into::into)
});

match res {
Err(e) if e.kind() == ErrorKind::WouldBlock =>
socket.writable().await?,
res => return res,
}
}
}

#[cfg(target_os = "linux")]
pub async fn recv_from(
socket: &UdpSocket, read_buf: &mut [u8], cmsg_space: &mut Vec<u8>,
msg_flags: MsgFlags,
) -> Result<RecvData> {
loop {
// Important to use try_io so that Tokio can clear the socket's readiness
// flag
let res = socket.try_io(Interest::READABLE, || {
let fd = socket.as_raw_fd();
recv_msg(fd, read_buf, cmsg_space, Some(msg_flags))
.map_err(Into::into)
});

match res {
Err(e) if e.kind() == ErrorKind::WouldBlock =>
socket.readable().await?,
_ => return res,
}
}
}

#[cfg(not(target_os = "linux"))]
pub async fn send_to(
socket: &tokio::net::UdpSocket, client_addr: SocketAddr, send_buf: &[u8],
_segment_size: usize, _num_pkts: usize, _tx_time: Option<Instant>,
) -> io::Result<usize> {

Check failure on line 76 in dgram/src/async_socket.rs

View workflow job for this annotation

GitHub Actions / quiche_macos (macos-latest)

failed to resolve: use of undeclared crate or module `io`

Check failure on line 76 in dgram/src/async_socket.rs

View workflow job for this annotation

GitHub Actions / quiche_macos (macos-14)

failed to resolve: use of undeclared crate or module `io`

Check failure on line 76 in dgram/src/async_socket.rs

View workflow job for this annotation

GitHub Actions / quiche_ios (aarch64-apple-ios)

failed to resolve: use of undeclared crate or module `io`

Check failure on line 76 in dgram/src/async_socket.rs

View workflow job for this annotation

GitHub Actions / android_ndk_lts (aarch64-linux-android)

failed to resolve: use of undeclared crate or module `io`

Check failure on line 76 in dgram/src/async_socket.rs

View workflow job for this annotation

GitHub Actions / quiche_windows (x86_64-pc-windows-msvc)

failed to resolve: use of undeclared crate or module `io`

Check failure on line 76 in dgram/src/async_socket.rs

View workflow job for this annotation

GitHub Actions / quiche_windows (i686-pc-windows-msvc)

failed to resolve: use of undeclared crate or module `io`
socket.send_to(send_buf, client_addr).await
}

// Signature changes because we can't use MessageFlags outside of a *NIX context
#[cfg(not(target_os = "linux"))]
pub async fn recv_from(
socket: &UdpSocket, read_buf: &mut [u8], _cmsg_space: &mut Vec<u8>,
) -> Result<RecvData> {
let recv = socket.recv(read_buf).await?;

Ok(RecvData {
bytes: recv,
peer_addr: None,
cmsgs: vec![],
gro: None,
rx_time: None,
})
}
23 changes: 0 additions & 23 deletions dgram/src/syscalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,27 +313,4 @@ mod tests {

Ok(())
}

#[test]
fn recv_from_control_message() -> Result<()> {
let (send, recv) = new_sockets()?;
let addr = getsockname::<SockaddrStorage>(recv).unwrap();

let send_buf = b"jets";
let iov = [IoSlice::new(send_buf)];
sendmsg(send, &iov, &vec![], MsgFlags::empty(), Some(&addr))?;

let mut cmsg_space = cmsg_space!(TimeVal);
let mut read_buf = [0; 4];

let recv_data = recv_msg(recv, &mut read_buf, &mut cmsg_space, None)?;

println!("cmsgs: {:?}", recv_data.cmsgs);

// TODO: test cmsgs get transferred to cmsg_space
assert_eq!(recv_data.bytes, 4);
assert_eq!(&read_buf, b"jets");

Ok(())
}
}

0 comments on commit f73a965

Please sign in to comment.