Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ compio-driver = { path = "./compio-driver", version = "0.9.0", default-features
compio-runtime = { path = "./compio-runtime", version = "0.9.0" }
compio-macros = { path = "./compio-macros", version = "0.1.2" }
compio-fs = { path = "./compio-fs", version = "0.9.0" }
compio-io = { path = "./compio-io", version = "0.8.0" }
compio-io = { path = "./compio-io", version = "0.8.2" }
compio-net = { path = "./compio-net", version = "0.9.0" }
compio-signal = { path = "./compio-signal", version = "0.7.0" }
compio-dispatcher = { path = "./compio-dispatcher", version = "0.8.0" }
compio-log = { path = "./compio-log", version = "0.1.0" }
compio-tls = { path = "./compio-tls", version = "0.7.0", default-features = false }
compio-tls = { path = "./compio-tls", version = "0.7.1", default-features = false }
compio-process = { path = "./compio-process", version = "0.6.0" }
compio-quic = { path = "./compio-quic", version = "0.5.0", default-features = false }

Expand All @@ -46,6 +46,7 @@ criterion = "0.7.0"
crossbeam-queue = "0.3.8"
flume = { version = "0.11.0", default-features = false }
futures-channel = "0.3.29"
futures-rustls = { version = "0.26.0", default-features = false }
futures-util = "0.3.29"
libc = "0.2.164"
nix = "0.30.1"
Expand Down
5 changes: 2 additions & 3 deletions compio-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "compio-io"
version = "0.8.1"
version = "0.8.2"
description = "IO traits for completion based async IO"
categories = ["asynchronous"]
keywords = ["async", "io"]
Expand All @@ -15,7 +15,6 @@ compio-buf = { workspace = true, features = ["arrayvec", "bytes"] }
futures-util = { workspace = true, features = ["sink"] }
paste = { workspace = true }
thiserror = { workspace = true, optional = true }
pin-project-lite = { workspace = true, optional = true }
serde = { version = "1.0.219", optional = true }
serde_json = { version = "1.0.140", optional = true }

Expand All @@ -29,7 +28,7 @@ futures-executor = "0.3.30"

[features]
default = []
compat = ["dep:pin-project-lite", "futures-util/io"]
compat = ["futures-util/io"]

# Codecs
# Serde json codec
Expand Down
99 changes: 45 additions & 54 deletions compio-io/src/compat.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Compat wrappers for interop with other crates.

use std::{
fmt::Debug,
io::{self, BufRead, Read, Write},
mem::MaybeUninit,
pin::Pin,
task::{Context, Poll},
};

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
use pin_project_lite::pin_project;

use crate::{PinBoxFuture, buffer::Buffer, util::DEFAULT_BUF_SIZE};

Expand Down Expand Up @@ -176,15 +176,14 @@ impl<S: crate::AsyncWrite> SyncStream<S> {
}
}

pin_project! {
/// A stream wrapper for [`futures_util::io`] traits.
pub struct AsyncStream<S> {
#[pin]
inner: SyncStream<S>,
read_future: Option<PinBoxFuture<io::Result<usize>>>,
write_future: Option<PinBoxFuture<io::Result<usize>>>,
shutdown_future: Option<PinBoxFuture<io::Result<()>>>,
}
/// A stream wrapper for [`futures_util::io`] traits.
pub struct AsyncStream<S> {
// The futures keep the reference to the inner stream, so we need to pin
// the inner stream to make sure the reference is valid.
inner: Pin<Box<SyncStream<S>>>,
read_future: Option<PinBoxFuture<io::Result<usize>>>,
write_future: Option<PinBoxFuture<io::Result<usize>>>,
shutdown_future: Option<PinBoxFuture<io::Result<()>>>,
}

impl<S> AsyncStream<S> {
Expand All @@ -200,7 +199,7 @@ impl<S> AsyncStream<S> {

fn new_impl(inner: SyncStream<S>) -> Self {
Self {
inner,
inner: Box::pin(inner),
read_future: None,
write_future: None,
shutdown_future: None,
Expand Down Expand Up @@ -253,20 +252,18 @@ macro_rules! poll_future_would_block {

impl<S: crate::AsyncRead + 'static> futures_util::AsyncRead for AsyncStream<S> {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.project();
// Safety:
// - The futures won't live longer than the stream.
// - `self` is pinned.
// - The inner stream won't be moved.
// - The inner stream is pinned.
let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };

poll_future_would_block!(
this.read_future,
self.read_future,
cx,
inner.fill_read_buf(),
io::Read::read(inner, buf)
Expand All @@ -279,16 +276,14 @@ impl<S: crate::AsyncRead + 'static> AsyncStream<S> {
///
/// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
pub fn poll_read_uninit(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [MaybeUninit<u8>],
) -> Poll<io::Result<usize>> {
let this = self.project();

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
poll_future_would_block!(
this.read_future,
self.read_future,
cx,
inner.fill_read_buf(),
inner.read_buf_uninit(buf)
Expand All @@ -297,79 +292,75 @@ impl<S: crate::AsyncRead + 'static> AsyncStream<S> {
}

impl<S: crate::AsyncRead + 'static> futures_util::AsyncBufRead for AsyncStream<S> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let this = self.project();

fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
poll_future_would_block!(
this.read_future,
self.read_future,
cx,
inner.fill_read_buf(),
// Safety: anyway the slice won't be used after free.
io::BufRead::fill_buf(inner).map(|slice| unsafe { &*(slice as *const _) })
)
}

fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
inner.consume(amt)
fn consume(mut self: Pin<&mut Self>, amt: usize) {
unsafe { self.inner.as_mut().get_unchecked_mut().consume(amt) }
}
}

impl<S: crate::AsyncWrite + 'static> futures_util::AsyncWrite for AsyncStream<S> {
fn poll_write(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.project();

if this.shutdown_future.is_some() {
debug_assert!(this.write_future.is_none());
if self.shutdown_future.is_some() {
debug_assert!(self.write_future.is_none());
return Poll::Pending;
}

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
poll_future_would_block!(
this.write_future,
self.write_future,
cx,
inner.flush_write_buf(),
io::Write::write(inner, buf)
)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.project();

if this.shutdown_future.is_some() {
debug_assert!(this.write_future.is_none());
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.shutdown_future.is_some() {
debug_assert!(self.write_future.is_none());
return Poll::Pending;
}

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
let res = poll_future!(this.write_future, cx, inner.flush_write_buf());
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
let res = poll_future!(self.write_future, cx, inner.flush_write_buf());
Poll::Ready(res.map(|_| ()))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.project();

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// Avoid shutdown on flush because the inner buffer might be passed to the
// driver.
if this.write_future.is_some() {
debug_assert!(this.shutdown_future.is_none());
if self.write_future.is_some() {
debug_assert!(self.shutdown_future.is_none());
return Poll::Pending;
}

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
let res = poll_future!(this.shutdown_future, cx, inner.get_mut().shutdown());
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
let res = poll_future!(self.shutdown_future, cx, inner.get_mut().shutdown());
Poll::Ready(res)
}
}

impl<S: Debug> Debug for AsyncStream<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncStream")
.field("inner", &self.inner)
.finish_non_exhaustive()
}
}
20 changes: 15 additions & 5 deletions compio-tls/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "compio-tls"
version = "0.7.0"
version = "0.7.1"
description = "TLS adaptor with compio"
categories = ["asynchronous", "network-programming"]
keywords = ["async", "net", "tls"]
Expand All @@ -25,6 +25,12 @@ rustls = { workspace = true, default-features = false, optional = true, features
"tls12",
] }

futures-rustls = { workspace = true, default-features = false, optional = true, features = [
"logging",
"tls12",
] }
futures-util = { workspace = true, optional = true }

[dev-dependencies]
compio-net = { workspace = true }
compio-runtime = { workspace = true }
Expand All @@ -33,14 +39,18 @@ compio-macros = { workspace = true }
rustls = { workspace = true, default-features = false, features = ["ring"] }
rustls-native-certs = { workspace = true }

futures-rustls = { workspace = true, default-features = false, features = [
"ring",
] }

[features]
default = ["native-tls"]
all = ["native-tls", "rustls"]
rustls = ["dep:rustls"]
rustls = ["dep:rustls", "dep:futures-rustls", "dep:futures-util"]

ring = ["rustls", "rustls/ring"]
aws-lc-rs = ["rustls", "rustls/aws-lc-rs"]
aws-lc-rs-fips = ["aws-lc-rs", "rustls/fips"]
ring = ["rustls", "rustls/ring", "futures-rustls/ring"]
aws-lc-rs = ["rustls", "rustls/aws-lc-rs", "futures-rustls/aws-lc-rs"]
aws-lc-rs-fips = ["aws-lc-rs", "rustls/fips", "futures-rustls/fips"]

read_buf = ["compio-buf/read_buf", "compio-io/read_buf", "rustls?/read_buf"]
nightly = ["read_buf"]
Loading
Loading