Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions compio-io/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,43 @@ impl Inner {
#[inline]
fn reset(&mut self) {
self.pos = 0;
unsafe { self.buf.set_len(0) };
self.buf.clear();
}

#[inline]
fn slice(&self) -> &[u8] {
&self.buf[self.pos..]
}

pub fn reserve_exact(&mut self, additional: usize) {
self.buf.reserve_exact(additional);
}

pub fn extend_from_slice(&mut self, data: &[u8]) {
self.buf.extend_from_slice(data);
}

fn compact_to(&mut self, capacity: usize, max_capacity: usize) {
if self.pos > 0 && self.pos < self.buf.len() {
let buf_len = self.buf.len();
let remaining = buf_len - self.pos;
self.buf.copy_within(self.pos..buf_len, 0);

// SAFETY: We're setting the length to the amount of data we just moved.
// The data from 0..remaining is initialized (just moved from read_pos..buf_len)
unsafe {
self.buf.set_len(remaining);
}
self.pos = 0;
} else if self.pos >= self.buf.len() {
// All data consumed, reset buffer
self.reset();
if self.buf.capacity() > max_capacity {
self.buf.shrink_to(capacity);
}
}
}

#[inline]
pub(crate) fn into_slice(self) -> Slice<Self> {
let pos = self.pos;
Expand Down Expand Up @@ -138,6 +167,12 @@ impl Buffer {
self.inner_mut().buf.reserve(additional);
}

/// Compact the buffer to the given capacity, if the current capacity is
/// larger than the given maximum capacity.
pub fn compact_to(&mut self, capacity: usize, max_capacity: usize) {
self.inner_mut().compact_to(capacity, max_capacity);
}

/// Execute a funcition with ownership of the buffer, and restore the buffer
/// afterwards
pub async fn with<R, Fut, F>(&mut self, func: F) -> IoResult<R>
Expand Down Expand Up @@ -175,7 +210,7 @@ impl Buffer {
.await?;
if written == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
std::io::ErrorKind::WriteZero,
"cannot flush all buffer data",
));
}
Expand Down
172 changes: 2 additions & 170 deletions compio-io/src/compat.rs → compio-io/src/compat/async_stream.rs
Original file line number Diff line number Diff line change
@@ -1,180 +1,12 @@
//! Compat wrappers for interop with other crates.

use std::{
fmt::Debug,
io::{self, BufRead, Read, Write},
io::{self, BufRead},
mem::MaybeUninit,
pin::Pin,
task::{Context, Poll},
};

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};

use crate::{PinBoxFuture, buffer::Buffer, util::DEFAULT_BUF_SIZE};

/// A wrapper for [`AsyncRead`](crate::AsyncRead) +
/// [`AsyncWrite`](crate::AsyncWrite), providing sync traits impl.
///
/// The sync methods will return [`io::ErrorKind::WouldBlock`] error if the
/// inner buffer needs more data.
#[derive(Debug)]
pub struct SyncStream<S> {
stream: S,
eof: bool,
read_buffer: Buffer,
write_buffer: Buffer,
}

impl<S> SyncStream<S> {
/// Create [`SyncStream`] with the stream and default buffer size.
pub fn new(stream: S) -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE, stream)
}

/// Create [`SyncStream`] with the stream and buffer size.
pub fn with_capacity(cap: usize, stream: S) -> Self {
Self {
stream,
eof: false,
read_buffer: Buffer::with_capacity(cap),
write_buffer: Buffer::with_capacity(cap),
}
}

/// Get if the stream is at EOF.
pub fn is_eof(&self) -> bool {
self.eof
}

/// Get the reference of the inner stream.
pub fn get_ref(&self) -> &S {
&self.stream
}

/// Get the mutable reference of the inner stream.
pub fn get_mut(&mut self) -> &mut S {
&mut self.stream
}

fn flush_impl(&mut self) -> io::Result<()> {
if !self.write_buffer.is_empty() {
Err(would_block("need to flush the write buffer"))
} else {
Ok(())
}
}

/// Pull some bytes from this source into the specified buffer.
pub fn read_buf_uninit(&mut self, buf: &mut [MaybeUninit<u8>]) -> io::Result<usize> {
let slice = self.fill_buf()?;
let amt = buf.len().min(slice.len());
// SAFETY: the length is valid
buf[..amt]
.copy_from_slice(unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), amt) });
self.consume(amt);
Ok(amt)
}
}

impl<S> Read for SyncStream<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut slice = self.fill_buf()?;
slice.read(buf).inspect(|res| {
self.consume(*res);
})
}

#[cfg(feature = "read_buf")]
fn read_buf(&mut self, mut buf: io::BorrowedCursor<'_>) -> io::Result<()> {
let mut slice = self.fill_buf()?;
let old_written = buf.written();
slice.read_buf(buf.reborrow())?;
let len = buf.written() - old_written;
self.consume(len);
Ok(())
}
}

impl<S> BufRead for SyncStream<S> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.read_buffer.all_done() {
self.read_buffer.reset();
}

if self.read_buffer.slice().is_empty() && !self.eof {
return Err(would_block("need to fill the read buffer"));
}

Ok(self.read_buffer.slice())
}

fn consume(&mut self, amt: usize) {
self.read_buffer.advance(amt);
}
}

impl<S> Write for SyncStream<S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.write_buffer.need_flush() {
self.flush_impl()?;
}

let written = self.write_buffer.with_sync(|mut inner| {
let len = buf.len().min(inner.buf_capacity() - inner.buf_len());
unsafe {
std::ptr::copy_nonoverlapping(
buf.as_ptr(),
inner.as_buf_mut_ptr().add(inner.buf_len()),
len,
);
inner.set_buf_init(inner.buf_len() + len);
}
BufResult(Ok(len), inner)
})?;

Ok(written)
}

fn flush(&mut self) -> io::Result<()> {
// Related PR:
// https://github.com/sfackler/rust-openssl/pull/1922
// After this PR merged, we can use self.flush_impl()
Ok(())
}
}

fn would_block(msg: &str) -> io::Error {
io::Error::new(io::ErrorKind::WouldBlock, msg)
}

impl<S: crate::AsyncRead> SyncStream<S> {
/// Fill the read buffer.
pub async fn fill_read_buf(&mut self) -> io::Result<usize> {
let stream = &mut self.stream;
let len = self
.read_buffer
.with(|b| async move {
let len = b.buf_len();
let b = b.slice(len..);
stream.read(b).await.into_inner()
})
.await?;
if len == 0 {
self.eof = true;
}
Ok(len)
}
}

impl<S: crate::AsyncWrite> SyncStream<S> {
/// Flush all data in the write buffer.
pub async fn flush_write_buf(&mut self) -> io::Result<usize> {
let stream = &mut self.stream;
let len = self.write_buffer.flush_to(stream).await?;
stream.flush().await?;
Ok(len)
}
}
use crate::{PinBoxFuture, compat::SyncStream};

/// A stream wrapper for [`futures_util::io`] traits.
pub struct AsyncStream<S> {
Expand Down
7 changes: 7 additions & 0 deletions compio-io/src/compat/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! Compat wrappers for interop with other crates.

mod sync_stream;
pub use sync_stream::*;

mod async_stream;
pub use async_stream::*;
Loading
Loading