Skip to content

Commit

Permalink
chore: use afit and rpitit (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah committed Nov 6, 2023
1 parent d5bf594 commit b29ec77
Show file tree
Hide file tree
Showing 32 changed files with 707 additions and 1,237 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ A thread-per-core Rust runtime with io_uring/epoll/kqueue.
[zh-readme-url]: README-zh.md

## Design Goal
Monoio is a pure io_uring/epoll/kqueue Rust async runtime. Part of the design has been borrowed from Tokio and Tokio-uring. However, unlike Tokio-uring, Monoio does not run on top of another runtime, rendering it more efficient.
Monoio is a pure io_uring/epoll/kqueue Rust async runtime. Part of the design has been borrowed from Tokio and Tokio-uring. However, unlike Tokio-uring, Monoio does not run on top of another runtime, rendering it more efficient.

Moreover, Monoio is designed with a thread-per-core model in mind. Users do not need to worry about tasks being `Send` or `Sync`, as thread local storage can be used safely. In other words, the data does not escape the thread on await points, unlike on work-stealing runtimes such as Tokio. This is because for some use cases, specifically those targeted by this runtime, it is not necessary to make task schedulable between threads. For example, if we were to write a load balancer like NGINX, we would write it in a thread-per-core way. The thread local data does not need to be shared between threads, so the `Sync` and `Send` do not need to be implemented in the first place.

Expand Down
16 changes: 3 additions & 13 deletions docs/en/io-cancel.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
title: Cancel IO
date: 2023-02-27 15:09:00
updated: 2023-11-06 16:49:00
author: ihciah
---

Expand All @@ -14,28 +15,17 @@ Taking reads as an example, the following code defines a cancelable asynchronous
```rust
/// CancelableAsyncReadRent: async read with an ownership of a buffer and ability to cancel io.
pub trait CancelableAsyncReadRent: AsyncReadRent {
/// The future of read Result<size, buffer>
type CancelableReadFuture<'a, T>: Future<Output = BufResult<usize, T>>
where
Self: 'a,
T: IoBufMut + 'a;
/// The future of readv Result<size, buffer>
type CancelableReadvFuture<'a, T>: Future<Output = BufResult<usize, T>>
where
Self: 'a,
T: IoVecBufMut + 'a;

fn cancelable_read<T: IoBufMut>(
&mut self,
buf: T,
c: CancelHandle,
) -> Self::CancelableReadFuture<'_, T>;
) -> impl Future<Output = BufResult<usize, T>>;

fn cancelable_readv<T: IoVecBufMut>(
&mut self,
buf: T,
c: CancelHandle,
) -> Self::CancelableReadvFuture<'_, T>;
) -> impl Future<Output = BufResult<usize, T>>;
}
```

Expand Down
15 changes: 15 additions & 0 deletions docs/en/why-GAT.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
title: Why GAT
date: 2021-11-24 20:00:00
updated: 2023-11-06 16:49:00
author: ihciah
---

Expand Down Expand Up @@ -39,3 +40,17 @@ trait AsyncReadRent {
```

The only problem here is, if you use GAT style, you should always use it. Providing `poll` style based on GAT is not easy. As an example, `monoio-compat` implement tokio `AsyncRead` and `AsyncWrite` based on GAT style future with some unsafe hack(and also with a `Box` cost).

## async_fn_in_trait
`async_fn_in_trait` and `return_position_impl_trait_in_trait` is stable now in rust and can be used to replace GAT usage here(related [issue](https://github.com/rust-lang/rust/issues/91611)).

Now we can define and impl async trait easier:
```rust
trait AsyncReadRent {
fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>>;
}

impl AsyncReadRent for Demo {
async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> { ... }
}
```
16 changes: 3 additions & 13 deletions docs/zh/io-cancel.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
title: 取消 IO
date: 2023-02-27 15:09:00
updated: 2023-11-06 16:49:00
author: ihciah
---

Expand All @@ -14,28 +15,17 @@ author: ihciah
```rust
/// CancelableAsyncReadRent: async read with a ownership of a buffer and ability to cancel io.
pub trait CancelableAsyncReadRent: AsyncReadRent {
/// The future of read Result<size, buffer>
type CancelableReadFuture<'a, T>: Future<Output = BufResult<usize, T>>
where
Self: 'a,
T: IoBufMut + 'a;
/// The future of readv Result<size, buffer>
type CancelableReadvFuture<'a, T>: Future<Output = BufResult<usize, T>>
where
Self: 'a,
T: IoVecBufMut + 'a;

fn cancelable_read<T: IoBufMut>(
&mut self,
buf: T,
c: CancelHandle,
) -> Self::CancelableReadFuture<'_, T>;
) -> impl Future<Output = BufResult<usize, T>>;

fn cancelable_readv<T: IoVecBufMut>(
&mut self,
buf: T,
c: CancelHandle,
) -> Self::CancelableReadvFuture<'_, T>;
) -> impl Future<Output = BufResult<usize, T>>;
}
```

Expand Down
15 changes: 15 additions & 0 deletions docs/zh/why-GAT.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
title: 为什么使用 GAT
date: 2021-11-24 20:00:00
updated: 2023-11-06 16:49:00
author: ihciah
---

Expand Down Expand Up @@ -42,3 +43,17 @@ trait AsyncReadRent {
```

这是银弹吗?不是。唯一的问题在于,如果使用了 GAT 这一套模式,就要总是使用它。如果你在 `poll` 形式和 GAT 形式之间反复横跳,那你会十分痛苦。基于 `poll` 形式接口自行维护状态,确实可以实现 Future(最简单的实现如 `poll_fn`);但反过来就很难受了:你很难存储一个带生命周期的 Future。虽然使用一些 unsafe 的 hack 可以做(也有 cost)这件事,但是仍旧,限制很多且并不推荐这么做。`monoio-compat` 基于 GAT 的 future 实现了 Tokio 的 `AsyncRead``AsyncWrite`,如果你非要试一试,可以参考它。

## async_fn_in_trait
Rust 已经稳定了 `async_fn_in_trait`,结合 `return_position_impl_trait_in_trait` 可以替代这里的 GAT(相关 [issue](https://github.com/rust-lang/rust/issues/91611))。

现在我们可以更简单地定义并实现 trait:
```rust
trait AsyncReadRent {
fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>>;
}

impl AsyncReadRent for Demo {
async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> { ... }
}
```
6 changes: 3 additions & 3 deletions monoio-compat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ license = "MIT/Apache-2.0"
name = "monoio-compat"
readme = "README.md"
repository = "https://github.com/bytedance/monoio"
version = "0.1.2"
version = "0.2.0"

[dependencies]
monoio = {version = "0.1.0", path = "../monoio", default-features = false}
monoio = {version = "0.2.0", path = "../monoio", default-features = false}
reusable-box-future = "0.2"
tokio = {version = "1", default-features = false, features = ["io-util"]}

[dev-dependencies]
monoio = {version = "0.1.0", path = "../monoio", features = ["async-cancel", "macros"]}
monoio = {version = "0.2.0", path = "../monoio", features = ["async-cancel", "macros"]}
2 changes: 1 addition & 1 deletion monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT/Apache-2.0"
name = "monoio"
readme = "../README.md"
repository = "https://github.com/bytedance/monoio"
version = "0.1.9"
version = "0.2.0"

# common dependencies
[dependencies]
Expand Down
7 changes: 1 addition & 6 deletions monoio/src/io/async_buf_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@ use crate::io::AsyncReadRent;

/// AsyncBufRead: async read with buffered content
pub trait AsyncBufRead: AsyncReadRent {
/// The returned future of fill_buf
type FillBufFuture<'a>: Future<Output = std::io::Result<&'a [u8]>>
where
Self: 'a;

/// Try read data and get a reference to the internal buffer
fn fill_buf(&mut self) -> Self::FillBufFuture<'_>;
fn fill_buf(&mut self) -> impl Future<Output = std::io::Result<&[u8]>>;
/// Mark how much data is read
fn consume(&mut self, amt: usize);
}
64 changes: 28 additions & 36 deletions monoio/src/io/async_buf_read_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ where

/// AsyncBufReadExt
pub trait AsyncBufReadExt {
/// The future of read until Result<usize>
type ReadUntilFuture<'a>: Future<Output = Result<usize>>
where
Self: 'a;

/// This function will read bytes from the underlying stream until the delimiter or EOF is
/// found. Once found, all bytes up to, and including, the delimiter (if found) will be appended
/// to buf.
Expand All @@ -69,12 +64,11 @@ pub trait AsyncBufReadExt {
/// # Errors
/// This function will ignore all instances of ErrorKind::Interrupted and will otherwise return
/// any errors returned by fill_buf.
fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> Self::ReadUntilFuture<'a>;

/// The future of read line Result<usize>
type ReadLineFuture<'a>: Future<Output = Result<usize>>
where
Self: 'a;
fn read_until<'a>(
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> impl Future<Output = Result<usize>>;

/// This function will read bytes from the underlying stream until the newline delimiter (the
/// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if
Expand All @@ -88,41 +82,39 @@ pub trait AsyncBufReadExt {
/// This function has the same error semantics as read_until and will also return an error if
/// the read bytes are not valid UTF-8. If an I/O error is encountered then buf may contain some
/// bytes already read in the event that all data read so far was valid UTF-8.
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Self::ReadLineFuture<'a>;
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> impl Future<Output = Result<usize>>;
}

impl<A> AsyncBufReadExt for A
where
A: AsyncBufRead + ?Sized,
{
type ReadUntilFuture<'a> = impl Future<Output = Result<usize>> + 'a where Self: 'a;

fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> Self::ReadUntilFuture<'a> {
fn read_until<'a>(
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> impl Future<Output = Result<usize>> {
read_until(self, byte, buf)
}

type ReadLineFuture<'a> = impl Future<Output = Result<usize>> + 'a where Self: 'a;

fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Self::ReadLineFuture<'a> {
async {
unsafe {
let mut g = Guard {
len: buf.len(),
buf: buf.as_mut_vec(),
};
async fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Result<usize> {
unsafe {
let mut g = Guard {
len: buf.len(),
buf: buf.as_mut_vec(),
};

let ret = read_until(self, b'\n', g.buf).await;
if from_utf8(&g.buf[g.len..]).is_err() {
ret.and_then(|_| {
Err(Error::new(
ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
})
} else {
g.len = g.buf.len();
ret
}
let ret = read_until(self, b'\n', g.buf).await;
if from_utf8(&g.buf[g.len..]).is_err() {
ret.and_then(|_| {
Err(Error::new(
ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
})
} else {
g.len = g.buf.len();
ret
}
}
}
Expand Down
52 changes: 12 additions & 40 deletions monoio/src/io/async_read_rent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,64 +7,36 @@ use crate::{

/// AsyncReadRent: async read with a ownership of a buffer
pub trait AsyncReadRent {
/// The future of read Result<size, buffer>
type ReadFuture<'a, T>: Future<Output = BufResult<usize, T>>
where
Self: 'a,
T: IoBufMut + 'a;
/// The future of readv Result<size, buffer>
type ReadvFuture<'a, T>: Future<Output = BufResult<usize, T>>
where
Self: 'a,
T: IoVecBufMut + 'a;

/// Same as read(2)
fn read<T: IoBufMut>(&mut self, buf: T) -> Self::ReadFuture<'_, T>;
fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>>;
/// Same as readv(2)
fn readv<T: IoVecBufMut>(&mut self, buf: T) -> Self::ReadvFuture<'_, T>;
fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>>;
}

/// AsyncReadRentAt: async read with a ownership of a buffer and a position
pub trait AsyncReadRentAt {
/// The future of Result<size, buffer>
type Future<'a, T>: Future<Output = BufResult<usize, T>>
where
Self: 'a,
T: 'a;

/// Same as Read(2)
fn read_at<T: IoBufMut>(&mut self, buf: T, pos: usize) -> Self::Future<'_, T>;
/// Same as pread(2)
fn read_at<T: IoBufMut>(
&mut self,
buf: T,
pos: usize,
) -> impl Future<Output = BufResult<usize, T>>;
}

impl<A: ?Sized + AsyncReadRent> AsyncReadRent for &mut A {
type ReadFuture<'a, T> = A::ReadFuture<'a, T>
where
Self: 'a,
T: IoBufMut + 'a;

type ReadvFuture<'a, T> = A::ReadvFuture<'a, T>
where
Self: 'a,
T: IoVecBufMut + 'a;

#[inline]
fn read<T: IoBufMut>(&mut self, buf: T) -> Self::ReadFuture<'_, T> {
fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
(**self).read(buf)
}

#[inline]
fn readv<T: IoVecBufMut>(&mut self, buf: T) -> Self::ReadvFuture<'_, T> {
fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
(**self).readv(buf)
}
}

impl AsyncReadRent for &[u8] {
type ReadFuture<'a, B> = impl std::future::Future<Output = crate::BufResult<usize, B>> where
B: IoBufMut + 'a, Self: 'a;
type ReadvFuture<'a, B> = impl std::future::Future<Output = crate::BufResult<usize, B>> where
B: IoVecBufMut + 'a, Self: 'a;

fn read<T: IoBufMut>(&mut self, mut buf: T) -> Self::ReadFuture<'_, T> {
fn read<T: IoBufMut>(&mut self, mut buf: T) -> impl Future<Output = BufResult<usize, T>> {
let amt = std::cmp::min(self.len(), buf.bytes_total());
let (a, b) = self.split_at(amt);
unsafe {
Expand All @@ -75,7 +47,7 @@ impl AsyncReadRent for &[u8] {
async move { (Ok(amt), buf) }
}

fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> Self::ReadvFuture<'_, T> {
fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> impl Future<Output = BufResult<usize, T>> {
// # Safety
// We do it in pure sync way.
let n = match unsafe { RawBuf::new_from_iovec_mut(&mut buf) } {
Expand Down
Loading

0 comments on commit b29ec77

Please sign in to comment.