Skip to content

Commit

Permalink
Feature: add simple integration for monoio
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Griffon <anthony@griffon.one>
  • Loading branch information
Miaxos committed Feb 19, 2024
1 parent dc39e37 commit a59cdd1
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
matrix:
include:
- toolchain: "nightly"
features: "bench,serde,bt,singlethreaded"
features: "bench,serde,bt,singlethreaded,monoio"

steps:
- name: Setup | Checkout
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ derive_more = { version="0.99.9" }
futures = "0.3"
lazy_static = "1.4.0"
maplit = "1.0.2"
monoio = "0.2.2"
pretty_assertions = "1.0.0"
proc-macro2 = "1.0"
quote = "1.0"
Expand All @@ -38,7 +39,7 @@ tokio = { version="1.8", default-features=false, features=["fs", "io-util", "mac
tracing = { version = "0.1.40" }
tracing-appender = "0.2.0"
tracing-futures = "0.2.4"
tracing-subscriber = { version = "0.3.3", features=["env-filter"] }
tracing-subscriber = { version = "0.3.18", features=["env-filter"] }
validit = { version = "0.2.2" }

[workspace]
Expand Down
5 changes: 5 additions & 0 deletions openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rand = { workspace = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }
monoio = { workspace = true, optional = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down Expand Up @@ -78,6 +79,9 @@ compat = []
compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing"]
compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"]

monoio = ["dep:monoio", "singlethreaded"]
monoio-sync = ["dep:monoio", "monoio/sync"]

# Allows an application to implement a custom the v2 storage API.
# See `openraft::storage::v2` for more details.
# V2 API are unstable and may change in the future.
Expand Down Expand Up @@ -115,6 +119,7 @@ generic-snapshot-data = []
tracing-log = [ "tracing/log" ]

# default = ["single-term-leader"]
# default = ["monoio"]

[package.metadata.docs.rs]

Expand Down
75 changes: 75 additions & 0 deletions openraft/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,78 @@ impl AsyncRuntime for TokioRuntime {
rand::thread_rng()
}
}

#[cfg(feature = "monoio")]
pub mod monoio {
use std::fmt::Debug;
use std::future::Future;
use std::time::Duration;

use crate::AsyncRuntime;
use crate::OptionalSend;

#[derive(Debug, Default)]
pub struct MonoioRuntime;

pub type MonoioInstant = monoio::time::Instant;

impl crate::Instant for monoio::time::Instant {
#[inline]
fn now() -> Self {
monoio::time::Instant::now()
}
}

impl AsyncRuntime for MonoioRuntime {
type JoinError = crate::error::Infallible;
type JoinHandle<T: OptionalSend + 'static> = monoio::task::JoinHandle<Result<T, Self::JoinError>>;
type Sleep = monoio::time::Sleep;
type Instant = MonoioInstant;
type TimeoutError = monoio::time::error::Elapsed;
type Timeout<R, T: Future<Output = R> + OptionalSend> = monoio::time::Timeout<T>;
type ThreadLocalRng = rand::rngs::ThreadRng;

#[inline]
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
T: Future + OptionalSend + 'static,
T::Output: OptionalSend + 'static,
{
monoio::spawn(async move { Ok(future.await) })
}

#[inline]
fn sleep(duration: Duration) -> Self::Sleep {
monoio::time::sleep(duration)
}

#[inline]
fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
monoio::time::sleep_until(deadline)
}

#[inline]
fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F> {
monoio::time::timeout(duration, future)
}

#[inline]
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(
deadline: Self::Instant,
future: F,
) -> Self::Timeout<R, F> {
monoio::time::timeout_at(deadline, future)
}

#[inline]
fn is_panic(_join_error: &Self::JoinError) -> bool {
// A monoio task shouldn't panic or it would bubble the panic in case of a join
false
}

#[inline]
fn thread_rng() -> Self::ThreadLocalRng {
rand::thread_rng()
}
}
}
1 change: 1 addition & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub use network::RaftNetwork;
pub use network::RaftNetworkFactory;
pub use type_config::RaftTypeConfig;

#[cfg(feature = "monoio")] pub use crate::async_runtime::monoio;
pub use crate::async_runtime::AsyncRuntime;
pub use crate::async_runtime::TokioRuntime;
pub use crate::change_members::ChangeMembers;
Expand Down
1 change: 1 addition & 0 deletions tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ tracing-subscriber = { workspace = true }
bt = ["openraft/bt"]
single-term-leader = ["openraft/single-term-leader"]
loosen-follower-log-revert = ["openraft/loosen-follower-log-revert"]
monoio = ["openraft/monoio"]

0 comments on commit a59cdd1

Please sign in to comment.