Skip to content

Commit

Permalink
write_fixed_all (tokio-rs#184)
Browse files Browse the repository at this point in the history
Define write_all functions that take fixed buffers.
    
Adds write_fixed_all for TCP and Unix streams.
    
Adds write_fixed_all_at for File.

Also adds an example: tcp_listener_fixed_buffers that uses read_fixed
and write_fixed_all.
  • Loading branch information
FrankReh committed Dec 3, 2022
1 parent bb5a00c commit 4b92cdb
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 28 deletions.
98 changes: 98 additions & 0 deletions examples/tcp_listener_fixed_buffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// An example of an echo server using fixed buffers for reading and writing TCP streams.
// A buffer registry size of two is created, to allow a maximum of two simultaneous connections.

use std::{env, iter, net::SocketAddr};

use tokio_uring::{
buf::{fixed::FixedBufRegistry, BoundedBuf},
net::{TcpListener, TcpStream},
}; // BoundedBuf for slice method

// A contrived example, where just two fixed buffers are created.
const POOL_SIZE: usize = 2;

fn main() {
let args: Vec<_> = env::args().collect();

let socket_addr = if args.len() <= 1 {
"127.0.0.1:0"
} else {
args[1].as_ref()
};
let socket_addr: SocketAddr = socket_addr.parse().unwrap();

tokio_uring::start(accept_loop(socket_addr));
}

// Bind to address and accept connections, spawning an echo handler for each connection.
async fn accept_loop(listen_addr: SocketAddr) {
let listener = TcpListener::bind(listen_addr).unwrap();

println!(
"Listening on {}, fixed buffer pool size only {POOL_SIZE}",
listener.local_addr().unwrap()
);

// Other iterators may be passed to FixedBufRegistry::new also.
let registry = FixedBufRegistry::new(iter::repeat(vec![0; 4096]).take(POOL_SIZE));

// Register the buffers with the kernel, asserting the syscall passed.

registry.register().unwrap();

loop {
let (stream, peer) = listener.accept().await.unwrap();

tokio_uring::spawn(echo_handler(stream, peer, registry.clone()));
}
}

// A loop that echoes input to output. Use one fixed buffer for receiving and sending the response
// back. Once the connection is closed, the function returns and the fixed buffer is dropped,
// getting the fixed buffer index returned to the available pool kept by the registry.
async fn echo_handler(stream: TcpStream, peer: SocketAddr, registry: FixedBufRegistry) {
println!("peer {} connected", peer);

// Get one of the two fixed buffers.
// If neither is unavailable, print reason and return immediately, dropping this connection;
// be nice and shutdown the connection before dropping it so the client sees the connection is
// closed immediately.

let mut fbuf = registry.check_out(0);
if fbuf.is_none() {
fbuf = registry.check_out(1);
};
if fbuf.is_none() {
let _ = stream.shutdown(std::net::Shutdown::Write);
println!("peer {} closed, no fixed buffers available", peer);
return;
};

let mut fbuf = fbuf.unwrap();

let mut n = 0;
loop {
// Each time through the loop, use fbuf and then get it back for the next
// iteration.

let (result, fbuf1) = stream.read_fixed(fbuf).await;
fbuf = {
let read = result.unwrap();
if read == 0 {
break;
}
assert_eq!(4096, fbuf1.len()); // To prove a point.

let (res, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await;

let _ = res.unwrap();
println!("peer {} all {} bytes ping-ponged", peer, read);
n += read;

// Important. One of the points of this example.
nslice.into_inner() // Return the buffer we started with.
};
}
let _ = stream.shutdown(std::net::Shutdown::Write);
println!("peer {} closed, {} total ping-ponged", peer, n);
}
71 changes: 71 additions & 0 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,77 @@ impl File {
op.await
}

/// Attempts to write an entire buffer into this file at the specified offset.
///
/// This method will continuously call [`write_fixed_at`] until there is no more data
/// to be written or an error is returned.
/// This method will not return until the entire buffer has been successfully
/// written or an error occurs.
///
/// If the buffer contains no data, this will never call [`write_fixed_at`].
///
/// # Return
///
/// The method returns the operation result and the same buffer value passed
/// in as an argument.
///
/// # Errors
///
/// This function will return the first error that [`write_fixed_at`] returns.
///
/// [`write_fixed_at`]: Self::write_fixed_at
pub async fn write_fixed_all_at<T>(&self, buf: T, pos: u64) -> crate::BufResult<(), T>
where
T: BoundedBuf<Buf = FixedBuf>,
{
let orig_bounds = buf.bounds();
let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos).await;
(res, T::from_buf_bounds(buf, orig_bounds))
}

async fn write_fixed_all_at_slice(
&self,
mut buf: Slice<FixedBuf>,
mut pos: u64,
) -> crate::BufResult<(), FixedBuf> {
if pos.checked_add(buf.bytes_init() as u64).is_none() {
return (
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"buffer too large for file",
)),
buf.into_inner(),
);
}

while buf.bytes_init() != 0 {
let (res, slice) = self.write_fixed_at(buf, pos).await;
match res {
Ok(0) => {
return (
Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
slice.into_inner(),
)
}
Ok(n) => {
pos += n as u64;
buf = slice.slice(n..);
}

// No match on an EINTR error is performed because this
// crate's design ensures we are not calling the 'wait' option
// in the ENTER syscall. Only an Enter with 'wait' can generate
// an EINTR according to the io_uring man pages.
Err(e) => return (Err(e), slice.into_inner()),
};
}

(Ok(()), buf.into_inner())
}

/// Attempts to sync all OS-internal metadata to disk.
///
/// This function will attempt to ensure that all in-memory data reaches the
Expand Down
48 changes: 44 additions & 4 deletions src/io/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,40 @@ impl Socket {
op.await
}

pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
let orig_bounds = buf.bounds();
let (res, buf) = self.write_all_slice(buf.slice_full()).await;
(res, T::from_buf_bounds(buf, orig_bounds))
}

async fn write_all_slice<T: IoBuf>(&self, mut buf: Slice<T>) -> crate::BufResult<(), T> {
while buf.bytes_init() != 0 {
let res = self.write(buf).await;
match res {
(Ok(0), slice) => {
return (
Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
slice.into_inner(),
)
}
(Ok(n), slice) => {
buf = slice.slice(n..);
}

// No match on an EINTR error is performed because this
// crate's design ensures we are not calling the 'wait' option
// in the ENTER syscall. Only an Enter with 'wait' can generate
// an EINTR according to the io_uring man pages.
(Err(e), slice) => return (Err(e), slice.into_inner()),
}
}

(Ok(()), buf.into_inner())
}

pub(crate) async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
where
T: BoundedBuf<Buf = FixedBuf>,
Expand All @@ -54,15 +88,21 @@ impl Socket {
op.await
}

pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
pub(crate) async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
where
T: BoundedBuf<Buf = FixedBuf>,
{
let orig_bounds = buf.bounds();
let (res, buf) = self.write_all_slice(buf.slice_full()).await;
let (res, buf) = self.write_fixed_all_slice(buf.slice_full()).await;
(res, T::from_buf_bounds(buf, orig_bounds))
}

async fn write_all_slice<T: IoBuf>(&self, mut buf: Slice<T>) -> crate::BufResult<(), T> {
async fn write_fixed_all_slice(
&self,
mut buf: Slice<FixedBuf>,
) -> crate::BufResult<(), FixedBuf> {
while buf.bytes_init() != 0 {
let res = self.write(buf).await;
let res = self.write_fixed(buf).await;
match res {
(Ok(0), slice) => {
return (
Expand Down
56 changes: 38 additions & 18 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,6 @@ impl TcpStream {
self.inner.write(buf).await
}

/// Like [`write`], but using a pre-mapped buffer
/// registered with [`FixedBufRegistry`].
///
/// [`write`]: Self::write
/// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
///
/// # Errors
///
/// In addition to errors that can be reported by `write`,
/// this operation fails if the buffer is not registered in the
/// current `tokio-uring` runtime.
pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
where
T: BoundedBuf<Buf = FixedBuf>,
{
self.inner.write_fixed(buf).await
}

/// Attempts to write an entire buffer to the stream.
///
/// This method will continuously call [`write`] until there is no more data to be
Expand Down Expand Up @@ -172,6 +154,44 @@ impl TcpStream {
self.inner.write_all(buf).await
}

/// Like [`write`], but using a pre-mapped buffer
/// registered with [`FixedBufRegistry`].
///
/// [`write`]: Self::write
/// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
///
/// # Errors
///
/// In addition to errors that can be reported by `write`,
/// this operation fails if the buffer is not registered in the
/// current `tokio-uring` runtime.
pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
where
T: BoundedBuf<Buf = FixedBuf>,
{
self.inner.write_fixed(buf).await
}

/// Attempts to write an entire buffer to the stream.
///
/// This method will continuously call [`write_fixed`] until there is no more data to be
/// written or an error is returned. This method will not return until the entire
/// buffer has been successfully written or an error has occurred.
///
/// If the buffer contains no data, this will never call [`write_fixed`].
///
/// # Errors
///
/// This function will return the first error that [`write_fixed`] returns.
///
/// [`write_fixed`]: Self::write
pub async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
where
T: BoundedBuf<Buf = FixedBuf>,
{
self.inner.write_fixed_all(buf).await
}

/// Write data from buffers into this socket returning how many bytes were
/// written.
///
Expand Down
32 changes: 26 additions & 6 deletions src/net/unix/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,23 @@ impl UnixStream {
self.inner.write(buf).await
}

/// Attempts to write an entire buffer to the stream.
///
/// This method will continuously call [`write`] until there is no more data to be
/// written or an error is returned. This method will not return until the entire
/// buffer has been successfully written or an error has occurred.
///
/// If the buffer contains no data, this will never call [`write`].
///
/// # Errors
///
/// This function will return the first error that [`write`] returns.
///
/// [`write`]: Self::write
pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
self.inner.write_all(buf).await
}

/// Like [`write`], but using a pre-mapped buffer
/// registered with [`FixedBufRegistry`].
///
Expand All @@ -122,19 +139,22 @@ impl UnixStream {

/// Attempts to write an entire buffer to the stream.
///
/// This method will continuously call [`write`] until there is no more data to be
/// This method will continuously call [`write_fixed`] until there is no more data to be
/// written or an error is returned. This method will not return until the entire
/// buffer has been successfully written or an error has occurred.
///
/// If the buffer contains no data, this will never call [`write`].
/// If the buffer contains no data, this will never call [`write_fixed`].
///
/// # Errors
///
/// This function will return the first error that [`write`] returns.
/// This function will return the first error that [`write_fixed`] returns.
///
/// [`write`]: Self::write
pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
self.inner.write_all(buf).await
/// [`write_fixed`]: Self::write
pub async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
where
T: BoundedBuf<Buf = FixedBuf>,
{
self.inner.write_fixed_all(buf).await
}

/// Write data from buffers into this socket returning how many bytes were
Expand Down

0 comments on commit 4b92cdb

Please sign in to comment.