Skip to content

Commit

Permalink
Feature: add simple integration for monoio
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Griffon <anthony@griffon.one>
  • Loading branch information
Miaxos committed Feb 16, 2024
1 parent c805b29 commit 09a4e46
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
matrix:
include:
- toolchain: "nightly"
features: "bench,serde,bt,singlethreaded"
features: "bench,serde,bt,singlethreaded,monoio"

steps:
- name: Setup | Checkout
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ derive_more = { version="0.99.9" }
futures = "0.3"
lazy_static = "1.4.0"
maplit = "1.0.2"
monoio = "0.2.2"
pretty_assertions = "1.0.0"
proc-macro2 = "1.0"
quote = "1.0"
Expand Down
3 changes: 3 additions & 0 deletions openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rand = { workspace = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }
monoio = { workspace = true, optional = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down Expand Up @@ -78,6 +79,8 @@ compat = []
compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing"]
compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"]

monoio = ["dep:monoio", "singlethreaded"]

# Allows an application to implement a custom the v2 storage API.
# See `openraft::storage::v2` for more details.
# V2 API are unstable and may change in the future.
Expand Down
85 changes: 85 additions & 0 deletions openraft/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,88 @@ impl AsyncRuntime for TokioRuntime {
rand::thread_rng()
}
}

#[cfg(feature = "monoio")]
mod monoio {
use std::fmt::Debug;
use std::future::Future;
use std::time::Duration;

use crate::AsyncRuntime;
use crate::OptionalSend;

#[derive(Debug, Default)]
pub struct MonoioRuntime;

pub type MonoioInstant = monoio::time::Instant;

impl crate::Instant for monoio::time::Instant {
#[inline]
fn now() -> Self {
monoio::time::Instant::now()
}
}

impl AsyncRuntime for MonoioRuntime {
type JoinError = crate::error::Infallible;
type JoinHandle<T: OptionalSend + 'static> = monoio::task::JoinHandle<Result<T, Self::JoinError>>;
type Sleep = monoio::time::Sleep;
type Instant = MonoioInstant;
type TimeoutError = monoio::time::error::Elapsed;
type Timeout<R, T: Future<Output = R> + OptionalSend> = monoio::time::Timeout<T>;
type ThreadLocalRng = rand::rngs::ThreadRng;

#[inline]
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
T: Future + OptionalSend + 'static,
T::Output: OptionalSend + 'static,
{
monoio::spawn(async move { Ok(future.await) })
}

#[inline]
fn sleep(duration: Duration) -> Self::Sleep {
monoio::time::sleep(duration)
}

#[inline]
fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
monoio::time::sleep_until(deadline)
}

#[inline]
fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F> {
monoio::time::timeout(duration, future)
}

#[inline]
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(
deadline: Self::Instant,
future: F,
) -> Self::Timeout<R, F> {
monoio::time::timeout_at(deadline, future)
}

#[inline]
fn is_panic(_join_error: &Self::JoinError) -> bool {
// A monoio task shouldn't panic or it would bubble the panic in case of a join
false
}

#[inline]
fn abort<T: OptionalSend + 'static>(_join_handle: &Self::JoinHandle<T>) {
// No abort in monoio task, it's a little complicated, as it's using io-uring when
// available, you register for update from the kernel, but when a task is dropped, you
// need to "cancel" this registration to the kernel too.
//
// We would need to transmit a `[monoio::io::Canceller]` to the future we want to
// spawn.
}

#[inline]
fn thread_rng() -> Self::ThreadLocalRng {
rand::thread_rng()
}
}
}

0 comments on commit 09a4e46

Please sign in to comment.