From 941e988193fbfad4146712890b84791860d9666b Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Fri, 16 Feb 2024 15:46:41 +0100 Subject: [PATCH 01/17] Feature: add simple integration for monoio Signed-off-by: Anthony Griffon --- .github/workflows/ci.yaml | 2 +- Cargo.toml | 3 +- openraft/Cargo.toml | 5 +++ openraft/src/async_runtime.rs | 75 +++++++++++++++++++++++++++++++++++ openraft/src/lib.rs | 1 + tests/Cargo.toml | 1 + 6 files changed, 85 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 21362beba..8119a46b8 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -15,7 +15,7 @@ jobs: matrix: include: - toolchain: "nightly" - features: "bench,serde,bt,singlethreaded" + features: "bench,serde,bt,singlethreaded,monoio" steps: - name: Setup | Checkout diff --git a/Cargo.toml b/Cargo.toml index 9dbd6fb86..de830b539 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ derive_more = { version="0.99.9" } futures = "0.3" lazy_static = "1.4.0" maplit = "1.0.2" +monoio = "0.2.2" pretty_assertions = "1.0.0" proc-macro2 = "1.0" quote = "1.0" @@ -38,7 +39,7 @@ tokio = { version="1.8", default-features=false, features=["fs", "io-util", "mac tracing = { version = "0.1.40" } tracing-appender = "0.2.0" tracing-futures = "0.2.4" -tracing-subscriber = { version = "0.3.3", features=["env-filter"] } +tracing-subscriber = { version = "0.3.18", features=["env-filter"] } validit = { version = "0.2.2" } [workspace] diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index 4b3ad2d4b..0044a37b9 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -26,6 +26,7 @@ rand = { workspace = true } serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } +monoio = { workspace = true, optional = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } @@ -78,6 +79,9 @@ compat = [] compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing"] compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"] +monoio = ["dep:monoio", "singlethreaded"] +monoio-sync = ["dep:monoio", "monoio/sync"] + # Allows an application to implement a custom the v2 storage API. # See `openraft::storage::v2` for more details. # V2 API are unstable and may change in the future. @@ -115,6 +119,7 @@ generic-snapshot-data = [] tracing-log = [ "tracing/log" ] # default = ["single-term-leader"] +# default = ["monoio"] [package.metadata.docs.rs] diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs index c602d48a7..67e6ff114 100644 --- a/openraft/src/async_runtime.rs +++ b/openraft/src/async_runtime.rs @@ -192,3 +192,78 @@ impl Debug for TokioOneShotSender { f.debug_tuple("TokioSendWrapper").finish() } } + +#[cfg(feature = "monoio")] +pub mod monoio { + use std::fmt::Debug; + use std::future::Future; + use std::time::Duration; + + use crate::AsyncRuntime; + use crate::OptionalSend; + + #[derive(Debug, Default)] + pub struct MonoioRuntime; + + pub type MonoioInstant = monoio::time::Instant; + + impl crate::Instant for monoio::time::Instant { + #[inline] + fn now() -> Self { + monoio::time::Instant::now() + } + } + + impl AsyncRuntime for MonoioRuntime { + type JoinError = crate::error::Infallible; + type JoinHandle = monoio::task::JoinHandle>; + type Sleep = monoio::time::Sleep; + type Instant = MonoioInstant; + type TimeoutError = monoio::time::error::Elapsed; + type Timeout + OptionalSend> = monoio::time::Timeout; + type ThreadLocalRng = rand::rngs::ThreadRng; + + #[inline] + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static, + { + monoio::spawn(async move { Ok(future.await) }) + } + + #[inline] + fn sleep(duration: Duration) -> Self::Sleep { + monoio::time::sleep(duration) + } + + #[inline] + fn sleep_until(deadline: Self::Instant) -> Self::Sleep { + monoio::time::sleep_until(deadline) + } + + #[inline] + fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { + monoio::time::timeout(duration, future) + } + + #[inline] + fn timeout_at + OptionalSend>( + deadline: Self::Instant, + future: F, + ) -> Self::Timeout { + monoio::time::timeout_at(deadline, future) + } + + #[inline] + fn is_panic(_join_error: &Self::JoinError) -> bool { + // A monoio task shouldn't panic or it would bubble the panic in case of a join + false + } + + #[inline] + fn thread_rng() -> Self::ThreadLocalRng { + rand::thread_rng() + } + } +} diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 583c1b0fc..9ebd362e5 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -75,6 +75,7 @@ pub use network::RaftNetwork; pub use network::RaftNetworkFactory; pub use type_config::RaftTypeConfig; +#[cfg(feature = "monoio")] pub use crate::async_runtime::monoio; pub use crate::async_runtime::AsyncRuntime; pub use crate::async_runtime::TokioRuntime; pub use crate::change_members::ChangeMembers; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index ce6d7bd17..82c9728cb 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -41,3 +41,4 @@ tracing-subscriber = { workspace = true } bt = ["openraft/bt"] single-term-leader = ["openraft/single-term-leader"] loosen-follower-log-revert = ["openraft/loosen-follower-log-revert"] +monoio = ["openraft/monoio"] From b998768e34c9a7056ea37d218cfaf1acd26ce58b Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 19 Feb 2024 18:07:56 +0100 Subject: [PATCH 02/17] Feature: Add monoio feature on memstore Signed-off-by: Anthony Griffon --- memstore/Cargo.toml | 1 + memstore/src/lib.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/memstore/Cargo.toml b/memstore/Cargo.toml index b0fac485a..827ec79ae 100644 --- a/memstore/Cargo.toml +++ b/memstore/Cargo.toml @@ -24,6 +24,7 @@ tracing = { workspace = true } [dev-dependencies] [features] +monoio = ["openraft/monoio"] [package.metadata.docs.rs] all-features = true diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 815de24f5..ec0aaa398 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -72,6 +72,7 @@ pub struct ClientResponse(pub Option); pub type MemNodeId = u64; +#[cfg(not(feature = "monoio"))] openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. pub TypeConfig: @@ -84,6 +85,19 @@ openraft::declare_raft_types!( AsyncRuntime = TokioRuntime ); +#[cfg(feature = "monoio")] +openraft::declare_raft_types!( + /// Declare the type configuration for `MemStore`. + pub TypeConfig: + D = ClientRequest, + R = ClientResponse, + NodeId = MemNodeId, + Node = (), + Entry = Entry, + SnapshotData = Cursor>, + AsyncRuntime = openraft::monoio::MonoioRuntime +); + /// The application snapshot type which the `MemStore` works with. #[derive(Debug)] pub struct MemStoreSnapshot { From 15a9bfc01dadc7d919e50e133ab5ef0c79d2b6f5 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 19 Feb 2024 18:08:07 +0100 Subject: [PATCH 03/17] Test: Starting the work to have monoio available for integration tests Signed-off-by: Anthony Griffon --- tests/Cargo.toml | 3 ++- .../append_entries/t10_see_higher_vote.rs | 9 +++++--- .../append_entries/t11_append_conflicts.rs | 8 ++++++- .../append_entries/t60_enable_heartbeat.rs | 2 ++ .../t61_heartbeat_reject_vote.rs | 2 ++ tests/tests/fixtures/mod.rs | 22 ++++++++++++++----- .../t12_concurrent_write_and_add_learner.rs | 2 ++ 7 files changed, 38 insertions(+), 10 deletions(-) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 82c9728cb..b96e410c0 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -20,6 +20,7 @@ repository = { workspace = true } [dev-dependencies] openraft = { path="../openraft", version = "0.9.0" } openraft-memstore = { path="../memstore" } +monoio = "0.2.2" anyerror = { workspace = true } anyhow = { workspace = true } @@ -41,4 +42,4 @@ tracing-subscriber = { workspace = true } bt = ["openraft/bt"] single-term-leader = ["openraft/single-term-leader"] loosen-follower-log-revert = ["openraft/loosen-follower-log-revert"] -monoio = ["openraft/monoio"] +monoio = ["openraft/monoio", "async-entry/monoio", "openraft-memstore/monoio"] diff --git a/tests/tests/append_entries/t10_see_higher_vote.rs b/tests/tests/append_entries/t10_see_higher_vote.rs index 726c992a6..0837722a2 100644 --- a/tests/tests/append_entries/t10_see_higher_vote.rs +++ b/tests/tests/append_entries/t10_see_higher_vote.rs @@ -3,6 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +#[cfg(feature = "monoio")] use monoio::spawn; +#[cfg(feature = "monoio")] use monoio::time::sleep; use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; @@ -13,7 +15,8 @@ use openraft::LogId; use openraft::ServerState; use openraft::Vote; use openraft_memstore::ClientRequest; -use tokio::time::sleep; +#[cfg(not(feature = "monoio"))] use tokio::spawn; +#[cfg(not(feature = "monoio"))] use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -66,7 +69,7 @@ async fn append_sees_higher_vote() -> Result<()> { router.wait(&0, timeout()).state(ServerState::Leader, "node-0 is leader").await?; let n0 = router.get_raft_handle(&0)?; - tokio::spawn(async move { + spawn(async move { let res = n0 .client_write(ClientRequest { client: "0".to_string(), @@ -78,7 +81,7 @@ async fn append_sees_higher_vote() -> Result<()> { tracing::debug!("--- client_write res: {:?}", res); }); - tokio::time::sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(500)).await; router .wait(&0, timeout()) diff --git a/tests/tests/append_entries/t11_append_conflicts.rs b/tests/tests/append_entries/t11_append_conflicts.rs index af3f68394..f0ba5014e 100644 --- a/tests/tests/append_entries/t11_append_conflicts.rs +++ b/tests/tests/append_entries/t11_append_conflicts.rs @@ -227,7 +227,13 @@ where C: RaftTypeConfig, LS: RaftLogStorage, { - let logs = log_store.get_log_entries(..).await?; + let logs = match log_store.get_log_entries(..).await { + Ok(elt) => elt, + Err(err) => { + dbg!(err); + anyhow::bail!("blbl") + } + }; let skip = 0; let want: Vec> = terms .iter() diff --git a/tests/tests/append_entries/t60_enable_heartbeat.rs b/tests/tests/append_entries/t60_enable_heartbeat.rs index 319661300..767bb8b7b 100644 --- a/tests/tests/append_entries/t60_enable_heartbeat.rs +++ b/tests/tests/append_entries/t60_enable_heartbeat.rs @@ -1,3 +1,4 @@ +/* use std::sync::Arc; use std::time::Duration; @@ -53,3 +54,4 @@ async fn enable_heartbeat() -> Result<()> { fn timeout() -> Option { Some(Duration::from_millis(1_000)) } +*/ diff --git a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs index 577fc310c..c7f17573e 100644 --- a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs +++ b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs @@ -1,3 +1,4 @@ +/* use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; @@ -89,3 +90,4 @@ async fn heartbeat_reject_vote() -> Result<()> { fn timeout() -> Option { Some(Duration::from_millis(1_000)) } +*/ diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 58bcae1ad..428ea5178 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -33,6 +33,8 @@ use openraft::error::RemoteError; use openraft::error::Unreachable; use openraft::metrics::Wait; use openraft::network::RPCOption; +#[cfg(feature = "monoio")] use openraft::monoio::MonoioInstant; +#[cfg(feature = "monoio")] use openraft::monoio::MonoioRuntime; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; @@ -58,8 +60,8 @@ use openraft::RaftMetrics; use openraft::RaftState; use openraft::RaftTypeConfig; use openraft::ServerState; -use openraft::TokioInstant; -use openraft::TokioRuntime; +#[cfg(not(feature = "monoio"))] use openraft::TokioInstant; +#[cfg(not(feature = "monoio"))] use openraft::TokioRuntime; use openraft::Vote; use openraft_memstore::ClientRequest; use openraft_memstore::ClientResponse; @@ -74,6 +76,16 @@ use tracing_appender::non_blocking::WorkerGuard; use crate::fixtures::logging::init_file_logging; +#[cfg(not(feature = "monoio"))] +type Runtime = TokioRuntime; +#[cfg(feature = "monoio")] +type Runtime = MonoioRuntime; + +#[cfg(not(feature = "monoio"))] +type Instant = TokioInstant; +#[cfg(feature = "monoio")] +type Instant = MonoioInstant; + pub mod logging; pub type MemLogStore = Adaptor>; @@ -622,7 +634,7 @@ impl TypedRaftRouter { Ok(rst) } - pub fn wait(&self, node_id: &MemNodeId, timeout: Option) -> Wait { + pub fn wait(&self, node_id: &MemNodeId, timeout: Option) -> Wait { let node = { let rt = self.nodes.lock().unwrap(); rt.get(node_id).expect("target node not found in routing table").clone().0 @@ -765,7 +777,7 @@ impl TypedRaftRouter { /// Send external request to the particular node. pub async fn with_raft_state(&self, target: MemNodeId, func: F) -> Result> where - F: FnOnce(&RaftState) -> V + Send + 'static, + F: FnOnce(&RaftState) -> V + Send + 'static, V: Send + 'static, { let r = self.get_raft_handle(&target).unwrap(); @@ -773,7 +785,7 @@ impl TypedRaftRouter { } /// Send external request to the particular node. - pub fn external_request) + Send + 'static>( + pub fn external_request) + Send + 'static>( &self, target: MemNodeId, req: F, diff --git a/tests/tests/membership/t12_concurrent_write_and_add_learner.rs b/tests/tests/membership/t12_concurrent_write_and_add_learner.rs index 94702af3f..869e26b7f 100644 --- a/tests/tests/membership/t12_concurrent_write_and_add_learner.rs +++ b/tests/tests/membership/t12_concurrent_write_and_add_learner.rs @@ -1,3 +1,4 @@ +/* use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; @@ -162,3 +163,4 @@ async fn wait_log(router: &RaftRouter, node_ids: &BTreeSet, want_log: u64) fn timeout() -> Option { Some(Duration::from_millis(500)) } +*/ From 509a462708ee45c61afb8a8c16c9d8dfea8caf66 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 19 Feb 2024 18:34:42 +0100 Subject: [PATCH 04/17] Test: Add more test running with monoio Signed-off-by: Anthony Griffon --- tests/Cargo.toml | 1 + tests/tests/client_api/t11_client_reads.rs | 6 ++++-- .../client_api/t51_write_when_leader_quit.rs | 12 +++++++----- tests/tests/fixtures/mod.rs | 17 ++++------------- tests/tests/fixtures/runtime.rs | 10 ++++++++++ tests/tests/life_cycle/t11_shutdown.rs | 2 ++ tests/tests/membership/t11_add_learner.rs | 5 +++-- .../tests/membership/t30_commit_joint_config.rs | 5 +++-- .../membership/t30_elect_with_new_config.rs | 2 +- tests/tests/membership/t31_remove_leader.rs | 3 ++- 10 files changed, 37 insertions(+), 26 deletions(-) create mode 100644 tests/tests/fixtures/runtime.rs diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b96e410c0..3b976d334 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -21,6 +21,7 @@ repository = { workspace = true } openraft = { path="../openraft", version = "0.9.0" } openraft-memstore = { path="../memstore" } monoio = "0.2.2" +local-sync = "0.1" anyerror = { workspace = true } anyhow = { workspace = true } diff --git a/tests/tests/client_api/t11_client_reads.rs b/tests/tests/client_api/t11_client_reads.rs index 7db692e2e..2bce9fd00 100644 --- a/tests/tests/client_api/t11_client_reads.rs +++ b/tests/tests/client_api/t11_client_reads.rs @@ -11,6 +11,8 @@ use openraft::LogIdOptionExt; use openraft::RPCTypes; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; +use crate::fixtures::runtime::spawn; use crate::fixtures::RPCRequest; use crate::fixtures::RaftRouter; @@ -114,7 +116,7 @@ async fn get_read_log_id() -> Result<()> { router.set_rpc_pre_hook(RPCTypes::AppendEntries, block_to_n0); // Expire current leader - tokio::time::sleep(Duration::from_millis(200)).await; + sleep(Duration::from_millis(200)).await; tracing::info!("--- let node 1 to become leader, append a blank log"); let n1 = router.get_raft_handle(&1).unwrap(); @@ -162,7 +164,7 @@ async fn get_read_log_id() -> Result<()> { router.set_rpc_pre_hook(RPCTypes::AppendEntries, block_to_n0); let r = router.clone(); - tokio::spawn(async move { + spawn(async move { // This will block for ever let _x = r.client_request_many(1, "foo", 1).await; }); diff --git a/tests/tests/client_api/t51_write_when_leader_quit.rs b/tests/tests/client_api/t51_write_when_leader_quit.rs index 6466c5f37..c617ca94d 100644 --- a/tests/tests/client_api/t51_write_when_leader_quit.rs +++ b/tests/tests/client_api/t51_write_when_leader_quit.rs @@ -12,9 +12,11 @@ use openraft::Config; use openraft::Vote; use openraft_memstore::ClientRequest; use openraft_memstore::IntoMemClientRequest; -use tokio::sync::oneshot; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::oneshot; +use crate::fixtures::runtime::sleep; +use crate::fixtures::runtime::spawn; use crate::fixtures::RaftRouter; /// Client write will receive a [`ForwardToLeader`] error because of log reversion, when leader @@ -48,14 +50,14 @@ async fn write_when_leader_quit_and_log_revert() -> Result<()> { tracing::info!(log_index, "--- write a log in another task"); { let n0 = router.get_raft_handle(&0)?; - tokio::spawn(async move { + spawn(async move { let res = n0.client_write(ClientRequest::make_request("cli", 1)).await; tx.send(res).unwrap(); }); } // wait for log to be appended on leader, and response channel is installed. - tokio::time::sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(500)).await; tracing::info!(log_index, "--- force node 0 to give up leadership"); { @@ -122,14 +124,14 @@ async fn write_when_leader_switched() -> Result<()> { tracing::info!(log_index, "--- write a log in another task"); { let n0 = router.get_raft_handle(&0)?; - tokio::spawn(async move { + spawn(async move { let res = n0.client_write(ClientRequest::make_request("cli", 1)).await; tx.send(res).unwrap(); }); } // wait for log to be appended on leader, and response channel is installed. - tokio::time::sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(500)).await; tracing::info!(log_index, "--- force node 0 to give up leadership, inform it to commit"); { diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 428ea5178..0fe409e3d 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -2,6 +2,9 @@ #![allow(dead_code)] +pub mod runtime; +use runtime::{Runtime, Instant, sleep}; + #[cfg(feature = "bt")] use std::backtrace::Backtrace; use std::collections::BTreeMap; use std::collections::BTreeSet; @@ -60,8 +63,6 @@ use openraft::RaftMetrics; use openraft::RaftState; use openraft::RaftTypeConfig; use openraft::ServerState; -#[cfg(not(feature = "monoio"))] use openraft::TokioInstant; -#[cfg(not(feature = "monoio"))] use openraft::TokioRuntime; use openraft::Vote; use openraft_memstore::ClientRequest; use openraft_memstore::ClientResponse; @@ -76,16 +77,6 @@ use tracing_appender::non_blocking::WorkerGuard; use crate::fixtures::logging::init_file_logging; -#[cfg(not(feature = "monoio"))] -type Runtime = TokioRuntime; -#[cfg(feature = "monoio")] -type Runtime = MonoioRuntime; - -#[cfg(not(feature = "monoio"))] -type Instant = TokioInstant; -#[cfg(feature = "monoio")] -type Instant = MonoioInstant; - pub mod logging; pub type MemLogStore = Adaptor>; @@ -324,7 +315,7 @@ impl TypedRaftRouter { let r = rand::random::() % send_delay; let timeout = Duration::from_millis(r); - tokio::time::sleep(timeout).await; + sleep(timeout).await; } pub fn set_append_entries_quota(&mut self, quota: Option) { diff --git a/tests/tests/fixtures/runtime.rs b/tests/tests/fixtures/runtime.rs new file mode 100644 index 000000000..7cce8a995 --- /dev/null +++ b/tests/tests/fixtures/runtime.rs @@ -0,0 +1,10 @@ +#[cfg(feature = "monoio")] pub use local_sync::oneshot; +#[cfg(feature = "monoio")] pub use monoio::spawn; +#[cfg(feature = "monoio")] pub use monoio::time::sleep; +#[cfg(feature = "monoio")] pub use openraft::monoio::MonoioInstant as Instant; +#[cfg(feature = "monoio")] pub use openraft::monoio::MonoioRuntime as Runtime; +#[cfg(not(feature = "monoio"))] pub use openraft::TokioInstant as Instant; +#[cfg(not(feature = "monoio"))] pub use openraft::TokioRuntime as Runtime; +#[cfg(not(feature = "monoio"))] pub use tokio::spawn; +#[cfg(not(feature = "monoio"))] pub use tokio::sync::oneshot; +#[cfg(not(feature = "monoio"))] pub use tokio::time::sleep; diff --git a/tests/tests/life_cycle/t11_shutdown.rs b/tests/tests/life_cycle/t11_shutdown.rs index 7764cdaf9..4df8777ea 100644 --- a/tests/tests/life_cycle/t11_shutdown.rs +++ b/tests/tests/life_cycle/t11_shutdown.rs @@ -1,3 +1,4 @@ +/* use std::sync::Arc; use anyhow::Result; @@ -108,3 +109,4 @@ async fn return_error_after_shutdown() -> Result<()> { Ok(()) } +*/ diff --git a/tests/tests/membership/t11_add_learner.rs b/tests/tests/membership/t11_add_learner.rs index b2da38aa6..ec49064ea 100644 --- a/tests/tests/membership/t11_add_learner.rs +++ b/tests/tests/membership/t11_add_learner.rs @@ -14,9 +14,10 @@ use openraft::Config; use openraft::LogId; use openraft::Membership; use openraft::StorageHelper; -use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; +use crate::fixtures::runtime::spawn; use crate::fixtures::RaftRouter; #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] @@ -211,7 +212,7 @@ async fn add_learner_when_previous_membership_not_committed() -> Result<()> { router.set_network_error(1, true); let node = router.get_raft_handle(&0)?; - tokio::spawn(async move { + spawn(async move { let res = node.change_membership([0, 1], false).await; tracing::info!("do not expect res: {:?}", res); unreachable!("do not expect any res"); diff --git a/tests/tests/membership/t30_commit_joint_config.rs b/tests/tests/membership/t30_commit_joint_config.rs index ad73e381c..d86ae8d01 100644 --- a/tests/tests/membership/t30_commit_joint_config.rs +++ b/tests/tests/membership/t30_commit_joint_config.rs @@ -8,6 +8,7 @@ use openraft::Config; use openraft::LogIdOptionExt; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::spawn; use crate::fixtures::RaftRouter; /// A leader must wait for learner to commit member-change from [0] to [0,1,2]. @@ -64,7 +65,7 @@ async fn commit_joint_config_during_0_to_012() -> Result<()> { tracing::info!(log_index, "--- changing cluster config, should timeout"); - tokio::spawn({ + spawn({ let router = router.clone(); async move { let node = router.get_raft_handle(&0).unwrap(); @@ -124,7 +125,7 @@ async fn commit_joint_config_during_012_to_234() -> Result<()> { { let router = router.clone(); // this is expected to be blocked since 3 and 4 are isolated. - tokio::spawn( + spawn( async move { let node = router.get_raft_handle(&0)?; node.change_membership([2, 3, 4], false).await?; diff --git a/tests/tests/membership/t30_elect_with_new_config.rs b/tests/tests/membership/t30_elect_with_new_config.rs index c4cf42c11..08d7b950d 100644 --- a/tests/tests/membership/t30_elect_with_new_config.rs +++ b/tests/tests/membership/t30_elect_with_new_config.rs @@ -5,9 +5,9 @@ use anyhow::Result; use maplit::btreeset; use openraft::Config; use openraft::LogIdOptionExt; -use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Dynamic membership test. diff --git a/tests/tests/membership/t31_remove_leader.rs b/tests/tests/membership/t31_remove_leader.rs index f2a475a9b..f638393f2 100644 --- a/tests/tests/membership/t31_remove_leader.rs +++ b/tests/tests/membership/t31_remove_leader.rs @@ -12,6 +12,7 @@ use openraft_memstore::ClientRequest; use openraft_memstore::IntoMemClientRequest; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Change membership from {0,1} to {1,2,3}. @@ -158,7 +159,7 @@ async fn remove_leader_and_convert_to_learner() -> Result<()> { tracing::info!(log_index, "--- wait 1 sec, old leader(non-voter) stays as a leader"); { - tokio::time::sleep(Duration::from_millis(1_000)).await; + sleep(Duration::from_millis(1_000)).await; router .wait(&0, timeout()) From e08475a739322af689a7b3b4a5ea7ec22fd5646c Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 19 Feb 2024 18:51:03 +0100 Subject: [PATCH 05/17] Improve: Change 'tokio' calls to 'AsyncRuntime' Signed-off-by: Anthony Griffon --- memstore/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index ec0aaa398..7dc5e9023 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -15,6 +15,7 @@ use openraft::storage::LogState; use openraft::storage::RaftLogReader; use openraft::storage::RaftSnapshotBuilder; use openraft::storage::Snapshot; +use openraft::AsyncRuntime; use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; @@ -26,7 +27,6 @@ use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; -use openraft::TokioRuntime; use openraft::Vote; use serde::Deserialize; use serde::Serialize; @@ -82,7 +82,7 @@ openraft::declare_raft_types!( Node = (), Entry = Entry, SnapshotData = Cursor>, - AsyncRuntime = TokioRuntime + AsyncRuntime = openraft::TokioRuntime ); #[cfg(feature = "monoio")] @@ -246,7 +246,7 @@ impl RaftSnapshotBuilder for Arc { if let Some(d) = self.get_blocking(&BlockOperation::DelayBuildingSnapshot) { tracing::info!(?d, "delay snapshot build"); - tokio::time::sleep(d).await; + ::AsyncRuntime::sleep(d).await; } { @@ -259,7 +259,7 @@ impl RaftSnapshotBuilder for Arc { if let Some(d) = self.get_blocking(&BlockOperation::BuildSnapshot) { tracing::info!(?d, "blocking snapshot build"); - tokio::time::sleep(d).await; + ::AsyncRuntime::sleep(d).await; } } @@ -382,7 +382,7 @@ impl RaftStorage for Arc { if let Some(d) = self.get_blocking(&BlockOperation::PurgeLog) { tracing::info!(?d, "block purging log"); - tokio::time::sleep(d).await; + ::AsyncRuntime::sleep(d).await; } { From b13720e9a380a2cad73d80ed431fc38a28b7e8ef Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 19 Feb 2024 18:52:24 +0100 Subject: [PATCH 06/17] Test: add some new tests available for 'monoio' Signed-off-by: Anthony Griffon --- tests/tests/append_entries/t60_enable_heartbeat.rs | 8 +++----- .../append_entries/t61_heartbeat_reject_vote.rs | 12 +++++------- tests/tests/fixtures/runtime.rs | 2 ++ tests/tests/membership/t11_add_learner.rs | 1 + tests/tests/metrics/t30_leader_metrics.rs | 2 +- .../t10_append_entries_partial_success.rs | 3 ++- .../replication/t50_append_entries_backoff_rejoin.rs | 3 ++- .../t35_building_snapshot_does_not_block_append.rs | 6 ++++-- .../t35_building_snapshot_does_not_block_apply.rs | 6 ++++-- .../snapshot_streaming/t30_purge_in_snapshot_logs.rs | 2 +- .../t34_replication_does_not_block_purge.rs | 2 +- 11 files changed, 26 insertions(+), 21 deletions(-) diff --git a/tests/tests/append_entries/t60_enable_heartbeat.rs b/tests/tests/append_entries/t60_enable_heartbeat.rs index 767bb8b7b..1ae8c0c17 100644 --- a/tests/tests/append_entries/t60_enable_heartbeat.rs +++ b/tests/tests/append_entries/t60_enable_heartbeat.rs @@ -1,4 +1,3 @@ -/* use std::sync::Arc; use std::time::Duration; @@ -6,9 +5,9 @@ use anyhow::Result; use maplit::btreeset; use openraft::AsyncRuntime; use openraft::Config; -use openraft::TokioRuntime; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::Runtime; use crate::fixtures::RaftRouter; /// Enable heartbeat, heartbeat should be replicated. @@ -31,8 +30,8 @@ async fn enable_heartbeat() -> Result<()> { node0.runtime_config().heartbeat(true); for _i in 0..3 { - let now = ::Instant::now(); - TokioRuntime::sleep(Duration::from_millis(500)).await; + let now = ::Instant::now(); + Runtime::sleep(Duration::from_millis(500)).await; for node_id in [1, 2, 3] { // no new log will be sent, . @@ -54,4 +53,3 @@ async fn enable_heartbeat() -> Result<()> { fn timeout() -> Option { Some(Duration::from_millis(1_000)) } -*/ diff --git a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs index c7f17573e..0d1bae9ee 100644 --- a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs +++ b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs @@ -1,4 +1,3 @@ -/* use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; @@ -8,11 +7,11 @@ use maplit::btreeset; use openraft::raft::VoteRequest; use openraft::testing::log_id; use openraft::Config; -use openraft::TokioInstant; use openraft::Vote; -use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; +use crate::fixtures::runtime::Instant; use crate::fixtures::RaftRouter; /// If a follower receives heartbeat, it should reject vote request until leader lease expired. @@ -29,12 +28,12 @@ async fn heartbeat_reject_vote() -> Result<()> { ); let mut router = RaftRouter::new(config.clone()); - let now = TokioInstant::now(); + let now = Instant::now(); sleep(Duration::from_millis(1)).await; let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {3}).await?; - let vote_modified_time = Arc::new(Mutex::new(Some(TokioInstant::now()))); + let vote_modified_time = Arc::new(Mutex::new(Some(Instant::now()))); tracing::info!(log_index, "--- leader lease is set by heartbeat"); { let m = vote_modified_time.clone(); @@ -45,7 +44,7 @@ async fn heartbeat_reject_vote() -> Result<()> { assert!(state.vote_last_modified() > Some(now)); }); - let now = TokioInstant::now(); + let now = Instant::now(); sleep(Duration::from_millis(700)).await; let m = vote_modified_time.clone(); @@ -90,4 +89,3 @@ async fn heartbeat_reject_vote() -> Result<()> { fn timeout() -> Option { Some(Duration::from_millis(1_000)) } -*/ diff --git a/tests/tests/fixtures/runtime.rs b/tests/tests/fixtures/runtime.rs index 7cce8a995..f0edf2d34 100644 --- a/tests/tests/fixtures/runtime.rs +++ b/tests/tests/fixtures/runtime.rs @@ -1,3 +1,5 @@ +#![allow(unused_imports)] + #[cfg(feature = "monoio")] pub use local_sync::oneshot; #[cfg(feature = "monoio")] pub use monoio::spawn; #[cfg(feature = "monoio")] pub use monoio::time::sleep; diff --git a/tests/tests/membership/t11_add_learner.rs b/tests/tests/membership/t11_add_learner.rs index ec49064ea..4f82f74c7 100644 --- a/tests/tests/membership/t11_add_learner.rs +++ b/tests/tests/membership/t11_add_learner.rs @@ -195,6 +195,7 @@ async fn add_learner_with_set_nodes() -> Result<()> { /// Because adding learner is also a change-membership operation, a new membership config log will /// let raft consider the previous membership config log as committed, which is actually not. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +#[cfg_attr(feature = "monoio", ignore)] // Crashing the future is causing a whole crash with monoio async fn add_learner_when_previous_membership_not_committed() -> Result<()> { let config = Arc::new( Config { diff --git a/tests/tests/metrics/t30_leader_metrics.rs b/tests/tests/metrics/t30_leader_metrics.rs index 70bbb55fb..d48120d9e 100644 --- a/tests/tests/metrics/t30_leader_metrics.rs +++ b/tests/tests/metrics/t30_leader_metrics.rs @@ -12,9 +12,9 @@ use openraft::LogId; use openraft::ServerState; #[allow(unused_imports)] use pretty_assertions::assert_eq; #[allow(unused_imports)] use pretty_assertions::assert_ne; -use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Cluster leader_metrics test. diff --git a/tests/tests/replication/t10_append_entries_partial_success.rs b/tests/tests/replication/t10_append_entries_partial_success.rs index a5573564f..ecf3e6019 100644 --- a/tests/tests/replication/t10_append_entries_partial_success.rs +++ b/tests/tests/replication/t10_append_entries_partial_success.rs @@ -6,6 +6,7 @@ use maplit::btreeset; use openraft::Config; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::spawn; use crate::fixtures::RaftRouter; /// RaftNetwork::append_entries can return a partial success. @@ -33,7 +34,7 @@ async fn append_entries_partial_success() -> Result<()> { router.set_append_entries_quota(Some(quota)); let r = router.clone(); - tokio::spawn(async move { + spawn(async move { // client request will be blocked due to limited quota=2 r.client_request_many(0, "0", n as usize).await.unwrap(); }); diff --git a/tests/tests/replication/t50_append_entries_backoff_rejoin.rs b/tests/tests/replication/t50_append_entries_backoff_rejoin.rs index e5ee4f6b0..f350aebd5 100644 --- a/tests/tests/replication/t50_append_entries_backoff_rejoin.rs +++ b/tests/tests/replication/t50_append_entries_backoff_rejoin.rs @@ -7,6 +7,7 @@ use openraft::Config; use openraft::ServerState; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// A restarted unreachable node should recover correctly, and catch up with the leader: @@ -43,7 +44,7 @@ async fn append_entries_backoff_rejoin() -> Result<()> { tracing::info!(log_index, "--- elect node-1"); { // Timeout leader lease otherwise vote-request will be rejected by node-2 - tokio::time::sleep(Duration::from_millis(1_000)).await; + sleep(Duration::from_millis(1_000)).await; n1.trigger().elect().await?; n1.wait(timeout()).state(ServerState::Leader, "node-1 elect").await?; diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs index c96afa921..3dd84b185 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs @@ -9,11 +9,13 @@ use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::testing::blank_ent; use openraft::testing::log_id; +use openraft::AsyncRuntime; use openraft::Config; use openraft::Vote; use openraft_memstore::BlockOperation; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::Runtime; use crate::fixtures::RaftRouter; /// When building a snapshot, append-entries request should not be blocked. @@ -46,7 +48,7 @@ async fn building_snapshot_does_not_block_append() -> Result<()> { follower.trigger().snapshot().await?; tracing::info!(log_index, "--- sleep 500 ms to make sure snapshot is started"); - tokio::time::sleep(Duration::from_millis(500)).await; + Runtime::sleep(Duration::from_millis(500)).await; let res = router .wait(&1, Some(Duration::from_millis(500))) @@ -70,7 +72,7 @@ async fn building_snapshot_does_not_block_append() -> Result<()> { let mut cli = router.new_client(1, &()).await; let option = RPCOption::new(Duration::from_millis(1_000)); let fu = cli.append_entries(rpc, option); - let fu = tokio::time::timeout(Duration::from_millis(500), fu); + let fu = Runtime::timeout(Duration::from_millis(500), fu); let resp = fu.await??; assert!(resp.is_success()); } diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs index 7a9c3b43b..ce418a18b 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs @@ -9,11 +9,13 @@ use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::testing::blank_ent; use openraft::testing::log_id; +use openraft::AsyncRuntime; use openraft::Config; use openraft::Vote; use openraft_memstore::BlockOperation; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::Runtime; use crate::fixtures::RaftRouter; /// When building a snapshot, applying-entries request should not be blocked. @@ -50,7 +52,7 @@ async fn building_snapshot_does_not_block_apply() -> Result<()> { follower.trigger().snapshot().await?; tracing::info!(log_index, "--- sleep 500 ms to make sure snapshot is started"); - tokio::time::sleep(Duration::from_millis(500)).await; + Runtime::sleep(Duration::from_millis(500)).await; let res = router .wait(&1, Some(Duration::from_millis(500))) @@ -78,7 +80,7 @@ async fn building_snapshot_does_not_block_apply() -> Result<()> { let option = RPCOption::new(Duration::from_millis(1_000)); let fu = cli.append_entries(rpc, option); - let fu = tokio::time::timeout(Duration::from_millis(500), fu); + let fu = Runtime::timeout(Duration::from_millis(500), fu); let resp = fu.await??; assert!(resp.is_success()); diff --git a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs index 35560b72f..223b33b6b 100644 --- a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs +++ b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs @@ -7,9 +7,9 @@ use openraft::CommittedLeaderId; use openraft::Config; use openraft::LogId; use openraft::RaftLogReader; -use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Leader logs should be deleted upto snapshot.last_log_id-max_in_snapshot_log_to_keep after diff --git a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs index 483d03f7f..1c227821a 100644 --- a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs +++ b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs @@ -7,9 +7,9 @@ use openraft::CommittedLeaderId; use openraft::Config; use openraft::LogId; use openraft::RaftLogReader; -use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Replication blocks purge, but it should not purge for ever. From c816202cb5db70728c1f663bca2ac4d24ca59b5b Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 19 Feb 2024 19:01:28 +0100 Subject: [PATCH 07/17] Chore: Update README.md to add 'monoio' & fmt Signed-off-by: Anthony Griffon --- README.md | 2 +- tests/tests/fixtures/mod.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index eeba2683b..61c2cb09b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@

Openraft

- Advanced Raft in 🦀 Rust using Tokio. Please ⭐ on github! + Advanced Raft in 🦀 Rust using Tokio or Monoio. Please ⭐ on github!

diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 0fe409e3d..31dbbd066 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -3,8 +3,6 @@ #![allow(dead_code)] pub mod runtime; -use runtime::{Runtime, Instant, sleep}; - #[cfg(feature = "bt")] use std::backtrace::Backtrace; use std::collections::BTreeMap; use std::collections::BTreeSet; @@ -73,6 +71,9 @@ use openraft_memstore::TypeConfig; use openraft_memstore::TypeConfig as MemConfig; #[allow(unused_imports)] use pretty_assertions::assert_eq; #[allow(unused_imports)] use pretty_assertions::assert_ne; +use runtime::sleep; +use runtime::Instant; +use runtime::Runtime; use tracing_appender::non_blocking::WorkerGuard; use crate::fixtures::logging::init_file_logging; From aa0d447b9b1d2f0183e09181f75683ac51979299 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Tue, 20 Feb 2024 16:28:59 +0100 Subject: [PATCH 08/17] Improve: Split runtime into separate files Signed-off-by: Anthony Griffon --- openraft/src/async_runtime/mod.rs | 78 ++++++++++++++++++++++++++++ openraft/src/async_runtime/monoio.rs | 68 ++++++++++++++++++++++++ openraft/src/async_runtime/tokio.rs | 76 +++++++++++++++++++++++++++ openraft/src/instant.rs | 9 ---- openraft/src/lib.rs | 7 +-- 5 files changed, 226 insertions(+), 12 deletions(-) create mode 100644 openraft/src/async_runtime/mod.rs create mode 100644 openraft/src/async_runtime/monoio.rs create mode 100644 openraft/src/async_runtime/tokio.rs diff --git a/openraft/src/async_runtime/mod.rs b/openraft/src/async_runtime/mod.rs new file mode 100644 index 000000000..23155012d --- /dev/null +++ b/openraft/src/async_runtime/mod.rs @@ -0,0 +1,78 @@ +use std::fmt::Debug; +use std::fmt::Display; +use std::future::Future; +use std::time::Duration; + +use crate::Instant; +use crate::OptionalSend; +use crate::OptionalSync; + +pub mod tokio; + +#[cfg(feature = "monoio")] pub mod monoio; + +/// A trait defining interfaces with an asynchronous runtime. +/// +/// The intention of this trait is to allow an application using this crate to bind an asynchronous +/// runtime that suits it the best. +/// +/// Some additional related functions are also exposed by this trait. +/// +/// ## Note +/// +/// The default asynchronous runtime is `tokio`. +pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static { + /// The error type of [`Self::JoinHandle`]. + type JoinError: Debug + Display + OptionalSend; + + /// The return type of [`Self::spawn`]. + type JoinHandle: Future> + + OptionalSend + + OptionalSync + + Unpin; + + /// The type that enables the user to sleep in an asynchronous runtime. + type Sleep: Future + OptionalSend + OptionalSync; + + /// A measurement of a monotonically non-decreasing clock. + type Instant: Instant; + + /// The timeout error type. + type TimeoutError: Debug + Display + OptionalSend; + + /// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user + /// to await the outcome of a [`Future`]. + type Timeout + OptionalSend>: Future> + OptionalSend; + + /// Type of a thread-local random number generator. + type ThreadLocalRng: rand::Rng; + + /// Spawn a new task. + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static; + + /// Wait until `duration` has elapsed. + fn sleep(duration: Duration) -> Self::Sleep; + + /// Wait until `deadline` is reached. + fn sleep_until(deadline: Self::Instant) -> Self::Sleep; + + /// Require a [`Future`] to complete before the specified duration has elapsed. + fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout; + + /// Require a [`Future`] to complete before the specified instant in time. + fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout; + + /// Check if the [`Self::JoinError`] is `panic`. + fn is_panic(join_error: &Self::JoinError) -> bool; + + /// Get the random number generator to use for generating random numbers. + /// + /// # Note + /// + /// This is a per-thread instance, which cannot be shared across threads or + /// sent to another thread. + fn thread_rng() -> Self::ThreadLocalRng; +} diff --git a/openraft/src/async_runtime/monoio.rs b/openraft/src/async_runtime/monoio.rs new file mode 100644 index 000000000..7284c28f6 --- /dev/null +++ b/openraft/src/async_runtime/monoio.rs @@ -0,0 +1,68 @@ +use std::fmt::Debug; +use std::future::Future; +use std::time::Duration; + +use crate::AsyncRuntime; +use crate::OptionalSend; + +#[derive(Debug, Default)] +pub struct MonoioRuntime; + +pub type MonoioInstant = monoio::time::Instant; + +impl crate::Instant for monoio::time::Instant { + #[inline] + fn now() -> Self { + monoio::time::Instant::now() + } +} + +impl AsyncRuntime for MonoioRuntime { + type JoinError = crate::error::Infallible; + type JoinHandle = monoio::task::JoinHandle>; + type Sleep = monoio::time::Sleep; + type Instant = MonoioInstant; + type TimeoutError = monoio::time::error::Elapsed; + type Timeout + OptionalSend> = monoio::time::Timeout; + type ThreadLocalRng = rand::rngs::ThreadRng; + + #[inline] + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static, + { + monoio::spawn(async move { Ok(future.await) }) + } + + #[inline] + fn sleep(duration: Duration) -> Self::Sleep { + monoio::time::sleep(duration) + } + + #[inline] + fn sleep_until(deadline: Self::Instant) -> Self::Sleep { + monoio::time::sleep_until(deadline) + } + + #[inline] + fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { + monoio::time::timeout(duration, future) + } + + #[inline] + fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout { + monoio::time::timeout_at(deadline, future) + } + + #[inline] + fn is_panic(_join_error: &Self::JoinError) -> bool { + // A monoio task shouldn't panic or it would bubble the panic in case of a join + false + } + + #[inline] + fn thread_rng() -> Self::ThreadLocalRng { + rand::thread_rng() + } +} diff --git a/openraft/src/async_runtime/tokio.rs b/openraft/src/async_runtime/tokio.rs new file mode 100644 index 000000000..3814155b9 --- /dev/null +++ b/openraft/src/async_runtime/tokio.rs @@ -0,0 +1,76 @@ +use std::fmt::Debug; +use std::future::Future; +use std::time::Duration; + +use crate::AsyncRuntime; +use crate::Instant; +use crate::OptionalSend; + +/// `Tokio` is the default asynchronous executor. +#[derive(Debug, Default)] +pub struct TokioRuntime; + +pub type TokioInstant = tokio::time::Instant; + +impl Instant for tokio::time::Instant { + #[inline] + fn now() -> Self { + tokio::time::Instant::now() + } +} + +impl AsyncRuntime for TokioRuntime { + type JoinError = tokio::task::JoinError; + type JoinHandle = tokio::task::JoinHandle; + type Sleep = tokio::time::Sleep; + type Instant = TokioInstant; + type TimeoutError = tokio::time::error::Elapsed; + type Timeout + OptionalSend> = tokio::time::Timeout; + type ThreadLocalRng = rand::rngs::ThreadRng; + + #[inline] + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static, + { + #[cfg(feature = "singlethreaded")] + { + tokio::task::spawn_local(future) + } + #[cfg(not(feature = "singlethreaded"))] + { + tokio::task::spawn(future) + } + } + + #[inline] + fn sleep(duration: Duration) -> Self::Sleep { + tokio::time::sleep(duration) + } + + #[inline] + fn sleep_until(deadline: Self::Instant) -> Self::Sleep { + tokio::time::sleep_until(deadline) + } + + #[inline] + fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { + tokio::time::timeout(duration, future) + } + + #[inline] + fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout { + tokio::time::timeout_at(deadline, future) + } + + #[inline] + fn is_panic(join_error: &Self::JoinError) -> bool { + join_error.is_panic() + } + + #[inline] + fn thread_rng() -> Self::ThreadLocalRng { + rand::thread_rng() + } +} diff --git a/openraft/src/instant.rs b/openraft/src/instant.rs index 66d0ebc65..801a48485 100644 --- a/openraft/src/instant.rs +++ b/openraft/src/instant.rs @@ -34,12 +34,3 @@ pub trait Instant: /// Return the current instant. fn now() -> Self; } - -pub type TokioInstant = tokio::time::Instant; - -impl Instant for tokio::time::Instant { - #[inline] - fn now() -> Self { - tokio::time::Instant::now() - } -} diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 9ebd362e5..b71813192 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -75,9 +75,11 @@ pub use network::RaftNetwork; pub use network::RaftNetworkFactory; pub use type_config::RaftTypeConfig; -#[cfg(feature = "monoio")] pub use crate::async_runtime::monoio; +#[cfg(feature = "monoio")] pub use crate::async_runtime::monoio::MonoioInstant; +#[cfg(feature = "monoio")] pub use crate::async_runtime::monoio::MonoioRuntime; +pub use crate::async_runtime::tokio::TokioInstant; +pub use crate::async_runtime::tokio::TokioRuntime; pub use crate::async_runtime::AsyncRuntime; -pub use crate::async_runtime::TokioRuntime; pub use crate::change_members::ChangeMembers; pub use crate::config::Config; pub use crate::config::ConfigError; @@ -86,7 +88,6 @@ pub use crate::core::ServerState; pub use crate::entry::Entry; pub use crate::entry::EntryPayload; pub use crate::instant::Instant; -pub use crate::instant::TokioInstant; pub use crate::log_id::LogId; pub use crate::log_id::LogIdOptionExt; pub use crate::log_id::LogIndexOptionExt; From e4445d593f7ec6a943f16e35ed64da5ded070fb7 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Tue, 20 Feb 2024 16:30:15 +0100 Subject: [PATCH 09/17] Test: Add constraint on NodeId to allow multiple runtimes & singlethreaded tests (mb fixup) Signed-off-by: Anthony Griffon --- tests/tests/append_entries/t11_append_conflicts.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/tests/append_entries/t11_append_conflicts.rs b/tests/tests/append_entries/t11_append_conflicts.rs index f0ba5014e..1b63c14e7 100644 --- a/tests/tests/append_entries/t11_append_conflicts.rs +++ b/tests/tests/append_entries/t11_append_conflicts.rs @@ -226,14 +226,9 @@ async fn check_logs(log_store: &mut LS, terms: Vec) -> Result<()> where C: RaftTypeConfig, LS: RaftLogStorage, + C::NodeId: Sync + Send, { - let logs = match log_store.get_log_entries(..).await { - Ok(elt) => elt, - Err(err) => { - dbg!(err); - anyhow::bail!("blbl") - } - }; + let logs = log_store.get_log_entries(..).await?; let skip = 0; let want: Vec> = terms .iter() From 14bd18098fa5c4da7bb88d1c4ff711457738f438 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Tue, 20 Feb 2024 16:56:18 +0100 Subject: [PATCH 10/17] Fix: openraft::MonoioRuntiem (fixup 587ac720edb5afb568025d5120ffd614e217f6b6) Signed-off-by: Anthony Griffon --- memstore/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 7dc5e9023..f39713913 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -95,7 +95,7 @@ openraft::declare_raft_types!( Node = (), Entry = Entry, SnapshotData = Cursor>, - AsyncRuntime = openraft::monoio::MonoioRuntime + AsyncRuntime = openraft::MonoioRuntime ); /// The application snapshot type which the `MemStore` works with. From 5a6406b0916aff95b6596b60d45436d7a296c747 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Tue, 20 Feb 2024 16:56:50 +0100 Subject: [PATCH 11/17] Test: use 'AsyncRuntime' Signed-off-by: Anthony Griffon --- .../append_entries/t10_see_higher_vote.rs | 13 +++++----- .../append_entries/t60_enable_heartbeat.rs | 9 ++++--- .../t61_heartbeat_reject_vote.rs | 15 +++++++----- tests/tests/client_api/t11_client_reads.rs | 9 +++---- .../client_api/t51_write_when_leader_quit.rs | 13 +++++----- tests/tests/fixtures/mod.rs | 24 +++++++++++++------ tests/tests/fixtures/runtime.rs | 8 ------- tests/tests/membership/t11_add_learner.rs | 11 +++++---- .../membership/t30_commit_joint_config.rs | 8 ++++--- .../membership/t30_elect_with_new_config.rs | 6 +++-- tests/tests/membership/t31_remove_leader.rs | 6 +++-- tests/tests/metrics/t30_leader_metrics.rs | 6 +++-- .../t10_append_entries_partial_success.rs | 6 +++-- .../t50_append_entries_backoff_rejoin.rs | 6 +++-- ...building_snapshot_does_not_block_append.rs | 7 +++--- ..._building_snapshot_does_not_block_apply.rs | 7 +++--- .../t30_purge_in_snapshot_logs.rs | 8 ++++--- .../t34_replication_does_not_block_purge.rs | 6 +++-- 18 files changed, 98 insertions(+), 70 deletions(-) diff --git a/tests/tests/append_entries/t10_see_higher_vote.rs b/tests/tests/append_entries/t10_see_higher_vote.rs index 0837722a2..cf883e5d7 100644 --- a/tests/tests/append_entries/t10_see_higher_vote.rs +++ b/tests/tests/append_entries/t10_see_higher_vote.rs @@ -3,20 +3,19 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -#[cfg(feature = "monoio")] use monoio::spawn; -#[cfg(feature = "monoio")] use monoio::time::sleep; use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::VoteRequest; +use openraft::AsyncRuntime; use openraft::CommittedLeaderId; use openraft::Config; use openraft::LogId; +use openraft::RaftTypeConfig; use openraft::ServerState; use openraft::Vote; use openraft_memstore::ClientRequest; -#[cfg(not(feature = "monoio"))] use tokio::spawn; -#[cfg(not(feature = "monoio"))] use tokio::time::sleep; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -42,7 +41,7 @@ async fn append_sees_higher_vote() -> Result<()> { tracing::info!("--- upgrade vote on node-1"); { // Let leader lease expire - sleep(Duration::from_millis(800)).await; + ::AsyncRuntime::sleep(Duration::from_millis(800)).await; let option = RPCOption::new(Duration::from_millis(1_000)); @@ -69,7 +68,7 @@ async fn append_sees_higher_vote() -> Result<()> { router.wait(&0, timeout()).state(ServerState::Leader, "node-0 is leader").await?; let n0 = router.get_raft_handle(&0)?; - spawn(async move { + ::AsyncRuntime::spawn(async move { let res = n0 .client_write(ClientRequest { client: "0".to_string(), @@ -81,7 +80,7 @@ async fn append_sees_higher_vote() -> Result<()> { tracing::debug!("--- client_write res: {:?}", res); }); - sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; router .wait(&0, timeout()) diff --git a/tests/tests/append_entries/t60_enable_heartbeat.rs b/tests/tests/append_entries/t60_enable_heartbeat.rs index 1ae8c0c17..af0ecb187 100644 --- a/tests/tests/append_entries/t60_enable_heartbeat.rs +++ b/tests/tests/append_entries/t60_enable_heartbeat.rs @@ -5,11 +5,14 @@ use anyhow::Result; use maplit::btreeset; use openraft::AsyncRuntime; use openraft::Config; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::Runtime; use crate::fixtures::RaftRouter; +type Instant = <::AsyncRuntime as AsyncRuntime>::Instant; + /// Enable heartbeat, heartbeat should be replicated. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn enable_heartbeat() -> Result<()> { @@ -30,8 +33,8 @@ async fn enable_heartbeat() -> Result<()> { node0.runtime_config().heartbeat(true); for _i in 0..3 { - let now = ::Instant::now(); - Runtime::sleep(Duration::from_millis(500)).await; + let now = Instant::now(); + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; for node_id in [1, 2, 3] { // no new log will be sent, . diff --git a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs index 0d1bae9ee..245b7ea9c 100644 --- a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs +++ b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs @@ -6,14 +6,17 @@ use anyhow::Result; use maplit::btreeset; use openraft::raft::VoteRequest; use openraft::testing::log_id; +use openraft::AsyncRuntime; use openraft::Config; +use openraft::RaftTypeConfig; use openraft::Vote; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; -use crate::fixtures::runtime::Instant; use crate::fixtures::RaftRouter; +type Instant = <::AsyncRuntime as AsyncRuntime>::Instant; + /// If a follower receives heartbeat, it should reject vote request until leader lease expired. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn heartbeat_reject_vote() -> Result<()> { @@ -29,7 +32,7 @@ async fn heartbeat_reject_vote() -> Result<()> { let mut router = RaftRouter::new(config.clone()); let now = Instant::now(); - sleep(Duration::from_millis(1)).await; + ::AsyncRuntime::sleep(Duration::from_millis(1)).await; let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {3}).await?; @@ -45,7 +48,7 @@ async fn heartbeat_reject_vote() -> Result<()> { }); let now = Instant::now(); - sleep(Duration::from_millis(700)).await; + ::AsyncRuntime::sleep(Duration::from_millis(700)).await; let m = vote_modified_time.clone(); @@ -68,14 +71,14 @@ async fn heartbeat_reject_vote() -> Result<()> { tracing::info!(log_index, "--- ensures no more blank-log heartbeat is used"); { // TODO: this part can be removed when blank-log heartbeat is removed. - sleep(Duration::from_millis(1500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(1500)).await; router.wait(&1, timeout()).applied_index(Some(log_index), "no log is written").await?; } tracing::info!(log_index, "--- disable heartbeat, vote request will be granted"); { node0.runtime_config().heartbeat(false); - sleep(Duration::from_millis(1500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(1500)).await; router.wait(&1, timeout()).applied_index(Some(log_index), "no log is written").await?; diff --git a/tests/tests/client_api/t11_client_reads.rs b/tests/tests/client_api/t11_client_reads.rs index 2bce9fd00..64b169227 100644 --- a/tests/tests/client_api/t11_client_reads.rs +++ b/tests/tests/client_api/t11_client_reads.rs @@ -6,13 +6,14 @@ use anyhow::Result; use maplit::btreeset; use openraft::error::NetworkError; use openraft::error::RPCError; +use openraft::AsyncRuntime; use openraft::Config; use openraft::LogIdOptionExt; use openraft::RPCTypes; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; -use crate::fixtures::runtime::spawn; use crate::fixtures::RPCRequest; use crate::fixtures::RaftRouter; @@ -116,7 +117,7 @@ async fn get_read_log_id() -> Result<()> { router.set_rpc_pre_hook(RPCTypes::AppendEntries, block_to_n0); // Expire current leader - sleep(Duration::from_millis(200)).await; + ::AsyncRuntime::sleep(Duration::from_millis(200)).await; tracing::info!("--- let node 1 to become leader, append a blank log"); let n1 = router.get_raft_handle(&1).unwrap(); @@ -164,7 +165,7 @@ async fn get_read_log_id() -> Result<()> { router.set_rpc_pre_hook(RPCTypes::AppendEntries, block_to_n0); let r = router.clone(); - spawn(async move { + ::AsyncRuntime::spawn(async move { // This will block for ever let _x = r.client_request_many(1, "foo", 1).await; }); diff --git a/tests/tests/client_api/t51_write_when_leader_quit.rs b/tests/tests/client_api/t51_write_when_leader_quit.rs index c617ca94d..fcdeb3515 100644 --- a/tests/tests/client_api/t51_write_when_leader_quit.rs +++ b/tests/tests/client_api/t51_write_when_leader_quit.rs @@ -8,15 +8,16 @@ use openraft::error::ForwardToLeader; use openraft::error::RaftError; use openraft::raft::AppendEntriesRequest; use openraft::testing::log_id; +use openraft::AsyncRuntime; use openraft::Config; +use openraft::RaftTypeConfig; use openraft::Vote; use openraft_memstore::ClientRequest; use openraft_memstore::IntoMemClientRequest; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::runtime::oneshot; -use crate::fixtures::runtime::sleep; -use crate::fixtures::runtime::spawn; use crate::fixtures::RaftRouter; /// Client write will receive a [`ForwardToLeader`] error because of log reversion, when leader @@ -50,14 +51,14 @@ async fn write_when_leader_quit_and_log_revert() -> Result<()> { tracing::info!(log_index, "--- write a log in another task"); { let n0 = router.get_raft_handle(&0)?; - spawn(async move { + ::AsyncRuntime::spawn(async move { let res = n0.client_write(ClientRequest::make_request("cli", 1)).await; tx.send(res).unwrap(); }); } // wait for log to be appended on leader, and response channel is installed. - sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; tracing::info!(log_index, "--- force node 0 to give up leadership"); { @@ -124,14 +125,14 @@ async fn write_when_leader_switched() -> Result<()> { tracing::info!(log_index, "--- write a log in another task"); { let n0 = router.get_raft_handle(&0)?; - spawn(async move { + ::AsyncRuntime::spawn(async move { let res = n0.client_write(ClientRequest::make_request("cli", 1)).await; tx.send(res).unwrap(); }); } // wait for log to be appended on leader, and response channel is installed. - sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; tracing::info!(log_index, "--- force node 0 to give up leadership, inform it to commit"); { diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 31dbbd066..cdbeccd75 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -48,6 +48,7 @@ use openraft::raft::VoteResponse; use openraft::storage::Adaptor; use openraft::storage::RaftLogStorage; use openraft::storage::RaftStateMachine; +use openraft::AsyncRuntime; use openraft::Config; use openraft::LogId; use openraft::LogIdOptionExt; @@ -71,9 +72,6 @@ use openraft_memstore::TypeConfig; use openraft_memstore::TypeConfig as MemConfig; #[allow(unused_imports)] use pretty_assertions::assert_eq; #[allow(unused_imports)] use pretty_assertions::assert_ne; -use runtime::sleep; -use runtime::Instant; -use runtime::Runtime; use tracing_appender::non_blocking::WorkerGuard; use crate::fixtures::logging::init_file_logging; @@ -316,7 +314,7 @@ impl TypedRaftRouter { let r = rand::random::() % send_delay; let timeout = Duration::from_millis(r); - sleep(timeout).await; + ::AsyncRuntime::sleep(timeout).await; } pub fn set_append_entries_quota(&mut self, quota: Option) { @@ -626,7 +624,11 @@ impl TypedRaftRouter { Ok(rst) } - pub fn wait(&self, node_id: &MemNodeId, timeout: Option) -> Wait { + pub fn wait( + &self, + node_id: &MemNodeId, + timeout: Option, + ) -> Wait::AsyncRuntime> { let node = { let rt = self.nodes.lock().unwrap(); rt.get(node_id).expect("target node not found in routing table").clone().0 @@ -769,7 +771,11 @@ impl TypedRaftRouter { /// Send external request to the particular node. pub async fn with_raft_state(&self, target: MemNodeId, func: F) -> Result> where - F: FnOnce(&RaftState) -> V + Send + 'static, + F: FnOnce( + &RaftState::AsyncRuntime as AsyncRuntime>::Instant>, + ) -> V + + Send + + 'static, V: Send + 'static, { let r = self.get_raft_handle(&target).unwrap(); @@ -777,7 +783,11 @@ impl TypedRaftRouter { } /// Send external request to the particular node. - pub fn external_request) + Send + 'static>( + pub fn external_request< + F: FnOnce(&RaftState::AsyncRuntime as AsyncRuntime>::Instant>) + + Send + + 'static, + >( &self, target: MemNodeId, req: F, diff --git a/tests/tests/fixtures/runtime.rs b/tests/tests/fixtures/runtime.rs index f0edf2d34..fd7b28f18 100644 --- a/tests/tests/fixtures/runtime.rs +++ b/tests/tests/fixtures/runtime.rs @@ -1,12 +1,4 @@ #![allow(unused_imports)] #[cfg(feature = "monoio")] pub use local_sync::oneshot; -#[cfg(feature = "monoio")] pub use monoio::spawn; -#[cfg(feature = "monoio")] pub use monoio::time::sleep; -#[cfg(feature = "monoio")] pub use openraft::monoio::MonoioInstant as Instant; -#[cfg(feature = "monoio")] pub use openraft::monoio::MonoioRuntime as Runtime; -#[cfg(not(feature = "monoio"))] pub use openraft::TokioInstant as Instant; -#[cfg(not(feature = "monoio"))] pub use openraft::TokioRuntime as Runtime; -#[cfg(not(feature = "monoio"))] pub use tokio::spawn; #[cfg(not(feature = "monoio"))] pub use tokio::sync::oneshot; -#[cfg(not(feature = "monoio"))] pub use tokio::time::sleep; diff --git a/tests/tests/membership/t11_add_learner.rs b/tests/tests/membership/t11_add_learner.rs index 4f82f74c7..d505e2f14 100644 --- a/tests/tests/membership/t11_add_learner.rs +++ b/tests/tests/membership/t11_add_learner.rs @@ -8,16 +8,17 @@ use openraft::error::ChangeMembershipError; use openraft::error::ClientWriteError; use openraft::error::InProgress; use openraft::storage::RaftLogReaderExt; +use openraft::AsyncRuntime; use openraft::ChangeMembers; use openraft::CommittedLeaderId; use openraft::Config; use openraft::LogId; use openraft::Membership; +use openraft::RaftTypeConfig; use openraft::StorageHelper; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; -use crate::fixtures::runtime::spawn; use crate::fixtures::RaftRouter; #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] @@ -148,7 +149,7 @@ async fn add_learner_non_blocking() -> Result<()> { if n1_repl.is_none() { tracing::info!("--- no replication attempt is made, sleep and retry: {}-th attempt", i); - sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; continue; } assert_eq!(Some(&None), n1_repl, "no replication state to the learner is reported"); @@ -213,13 +214,13 @@ async fn add_learner_when_previous_membership_not_committed() -> Result<()> { router.set_network_error(1, true); let node = router.get_raft_handle(&0)?; - spawn(async move { + ::AsyncRuntime::spawn(async move { let res = node.change_membership([0, 1], false).await; tracing::info!("do not expect res: {:?}", res); unreachable!("do not expect any res"); }); - sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; } tracing::info!(log_index, "--- add new node node-1, in non blocking mode"); diff --git a/tests/tests/membership/t30_commit_joint_config.rs b/tests/tests/membership/t30_commit_joint_config.rs index d86ae8d01..1ca2087d1 100644 --- a/tests/tests/membership/t30_commit_joint_config.rs +++ b/tests/tests/membership/t30_commit_joint_config.rs @@ -4,11 +4,13 @@ use std::time::Duration; use anyhow::Result; use futures::stream::StreamExt; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::Config; use openraft::LogIdOptionExt; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::spawn; use crate::fixtures::RaftRouter; /// A leader must wait for learner to commit member-change from [0] to [0,1,2]. @@ -65,7 +67,7 @@ async fn commit_joint_config_during_0_to_012() -> Result<()> { tracing::info!(log_index, "--- changing cluster config, should timeout"); - spawn({ + ::AsyncRuntime::spawn({ let router = router.clone(); async move { let node = router.get_raft_handle(&0).unwrap(); @@ -125,7 +127,7 @@ async fn commit_joint_config_during_012_to_234() -> Result<()> { { let router = router.clone(); // this is expected to be blocked since 3 and 4 are isolated. - spawn( + ::AsyncRuntime::spawn( async move { let node = router.get_raft_handle(&0)?; node.change_membership([2, 3, 4], false).await?; diff --git a/tests/tests/membership/t30_elect_with_new_config.rs b/tests/tests/membership/t30_elect_with_new_config.rs index 08d7b950d..836260f86 100644 --- a/tests/tests/membership/t30_elect_with_new_config.rs +++ b/tests/tests/membership/t30_elect_with_new_config.rs @@ -3,11 +3,13 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::Config; use openraft::LogIdOptionExt; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Dynamic membership test. @@ -41,7 +43,7 @@ async fn leader_election_after_changing_0_to_01234() -> Result<()> { router.set_network_error(0, true); // Wait for leader lease to expire - sleep(Duration::from_millis(700)).await; + ::AsyncRuntime::sleep(Duration::from_millis(700)).await; // Let node-1 become leader. let node_1 = router.get_raft_handle(&1)?; diff --git a/tests/tests/membership/t31_remove_leader.rs b/tests/tests/membership/t31_remove_leader.rs index f638393f2..2408f569f 100644 --- a/tests/tests/membership/t31_remove_leader.rs +++ b/tests/tests/membership/t31_remove_leader.rs @@ -4,15 +4,17 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::error::ClientWriteError; +use openraft::AsyncRuntime; use openraft::CommittedLeaderId; use openraft::Config; use openraft::LogId; +use openraft::RaftTypeConfig; use openraft::ServerState; use openraft_memstore::ClientRequest; use openraft_memstore::IntoMemClientRequest; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Change membership from {0,1} to {1,2,3}. @@ -159,7 +161,7 @@ async fn remove_leader_and_convert_to_learner() -> Result<()> { tracing::info!(log_index, "--- wait 1 sec, old leader(non-voter) stays as a leader"); { - sleep(Duration::from_millis(1_000)).await; + ::AsyncRuntime::sleep(Duration::from_millis(1_000)).await; router .wait(&0, timeout()) diff --git a/tests/tests/metrics/t30_leader_metrics.rs b/tests/tests/metrics/t30_leader_metrics.rs index d48120d9e..8894d5d90 100644 --- a/tests/tests/metrics/t30_leader_metrics.rs +++ b/tests/tests/metrics/t30_leader_metrics.rs @@ -6,15 +6,17 @@ use futures::stream::StreamExt; use maplit::btreemap; use maplit::btreeset; use openraft::testing::log_id; +use openraft::AsyncRuntime; use openraft::CommittedLeaderId; use openraft::Config; use openraft::LogId; +use openraft::RaftTypeConfig; use openraft::ServerState; +use openraft_memstore::TypeConfig; #[allow(unused_imports)] use pretty_assertions::assert_eq; #[allow(unused_imports)] use pretty_assertions::assert_ne; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Cluster leader_metrics test. @@ -156,7 +158,7 @@ async fn leader_metrics() -> Result<()> { tracing::info!(log_index, "--- let node-1 to elect to take leadership from node-0"); { // Let the leader lease expire - sleep(Duration::from_millis(700)).await; + ::AsyncRuntime::sleep(Duration::from_millis(700)).await; n1.trigger().elect().await?; n1.wait(timeout()).state(ServerState::Leader, "node-1 becomes leader").await?; diff --git a/tests/tests/replication/t10_append_entries_partial_success.rs b/tests/tests/replication/t10_append_entries_partial_success.rs index ecf3e6019..a1cf3a96f 100644 --- a/tests/tests/replication/t10_append_entries_partial_success.rs +++ b/tests/tests/replication/t10_append_entries_partial_success.rs @@ -3,10 +3,12 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::Config; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::spawn; use crate::fixtures::RaftRouter; /// RaftNetwork::append_entries can return a partial success. @@ -34,7 +36,7 @@ async fn append_entries_partial_success() -> Result<()> { router.set_append_entries_quota(Some(quota)); let r = router.clone(); - spawn(async move { + ::AsyncRuntime::spawn(async move { // client request will be blocked due to limited quota=2 r.client_request_many(0, "0", n as usize).await.unwrap(); }); diff --git a/tests/tests/replication/t50_append_entries_backoff_rejoin.rs b/tests/tests/replication/t50_append_entries_backoff_rejoin.rs index f350aebd5..31fd553a2 100644 --- a/tests/tests/replication/t50_append_entries_backoff_rejoin.rs +++ b/tests/tests/replication/t50_append_entries_backoff_rejoin.rs @@ -3,11 +3,13 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::Config; +use openraft::RaftTypeConfig; use openraft::ServerState; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// A restarted unreachable node should recover correctly, and catch up with the leader: @@ -44,7 +46,7 @@ async fn append_entries_backoff_rejoin() -> Result<()> { tracing::info!(log_index, "--- elect node-1"); { // Timeout leader lease otherwise vote-request will be rejected by node-2 - sleep(Duration::from_millis(1_000)).await; + ::AsyncRuntime::sleep(Duration::from_millis(1_000)).await; n1.trigger().elect().await?; n1.wait(timeout()).state(ServerState::Leader, "node-1 elect").await?; diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs index 3dd84b185..5f4faaea9 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs @@ -11,11 +11,12 @@ use openraft::testing::blank_ent; use openraft::testing::log_id; use openraft::AsyncRuntime; use openraft::Config; +use openraft::RaftTypeConfig; use openraft::Vote; use openraft_memstore::BlockOperation; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::Runtime; use crate::fixtures::RaftRouter; /// When building a snapshot, append-entries request should not be blocked. @@ -48,7 +49,7 @@ async fn building_snapshot_does_not_block_append() -> Result<()> { follower.trigger().snapshot().await?; tracing::info!(log_index, "--- sleep 500 ms to make sure snapshot is started"); - Runtime::sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; let res = router .wait(&1, Some(Duration::from_millis(500))) @@ -72,7 +73,7 @@ async fn building_snapshot_does_not_block_append() -> Result<()> { let mut cli = router.new_client(1, &()).await; let option = RPCOption::new(Duration::from_millis(1_000)); let fu = cli.append_entries(rpc, option); - let fu = Runtime::timeout(Duration::from_millis(500), fu); + let fu = ::AsyncRuntime::timeout(Duration::from_millis(500), fu); let resp = fu.await??; assert!(resp.is_success()); } diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs index ce418a18b..3e8045049 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs @@ -11,11 +11,12 @@ use openraft::testing::blank_ent; use openraft::testing::log_id; use openraft::AsyncRuntime; use openraft::Config; +use openraft::RaftTypeConfig; use openraft::Vote; use openraft_memstore::BlockOperation; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::Runtime; use crate::fixtures::RaftRouter; /// When building a snapshot, applying-entries request should not be blocked. @@ -52,7 +53,7 @@ async fn building_snapshot_does_not_block_apply() -> Result<()> { follower.trigger().snapshot().await?; tracing::info!(log_index, "--- sleep 500 ms to make sure snapshot is started"); - Runtime::sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; let res = router .wait(&1, Some(Duration::from_millis(500))) @@ -80,7 +81,7 @@ async fn building_snapshot_does_not_block_apply() -> Result<()> { let option = RPCOption::new(Duration::from_millis(1_000)); let fu = cli.append_entries(rpc, option); - let fu = Runtime::timeout(Duration::from_millis(500), fu); + let fu = ::AsyncRuntime::timeout(Duration::from_millis(500), fu); let resp = fu.await??; assert!(resp.is_success()); diff --git a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs index 223b33b6b..ede886247 100644 --- a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs +++ b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs @@ -3,13 +3,15 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::CommittedLeaderId; use openraft::Config; use openraft::LogId; use openraft::RaftLogReader; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Leader logs should be deleted upto snapshot.last_log_id-max_in_snapshot_log_to_keep after @@ -51,7 +53,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { let (mut sto0, mut _sm0) = router.get_storage_handle(&0)?; // Wait for purge to complete. - sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; let logs = sto0.try_get_log_entries(..).await?; assert_eq!(max_keep as usize, logs.len()); @@ -78,7 +80,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { // There may be a cached append-entries request that already loads log 10..15 from the store, // just before building snapshot. - sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; tracing::info!( log_index, diff --git a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs index 1c227821a..57b115790 100644 --- a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs +++ b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs @@ -3,13 +3,15 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::CommittedLeaderId; use openraft::Config; use openraft::LogId; use openraft::RaftLogReader; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::runtime::sleep; use crate::fixtures::RaftRouter; /// Replication blocks purge, but it should not purge for ever. @@ -52,7 +54,7 @@ async fn replication_does_not_block_purge() -> Result<()> { .snapshot(LogId::new(CommittedLeaderId::new(1, 0), log_index), "built snapshot") .await?; - sleep(Duration::from_millis(500)).await; + ::AsyncRuntime::sleep(Duration::from_millis(500)).await; let (mut sto0, mut _sm0) = router.get_storage_handle(&0)?; let logs = sto0.try_get_log_entries(..).await?; From ee4577e7a84f15b087b786d8fc6d319b99ce093a Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Tue, 20 Feb 2024 17:08:01 +0100 Subject: [PATCH 12/17] Test: enable shutdown tests and ignore those which fail Signed-off-by: Anthony Griffon --- tests/tests/life_cycle/t11_shutdown.rs | 3 +-- .../membership/t12_concurrent_write_and_add_learner.rs | 7 ++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/tests/life_cycle/t11_shutdown.rs b/tests/tests/life_cycle/t11_shutdown.rs index 4df8777ea..297ddf163 100644 --- a/tests/tests/life_cycle/t11_shutdown.rs +++ b/tests/tests/life_cycle/t11_shutdown.rs @@ -1,4 +1,3 @@ -/* use std::sync::Arc; use anyhow::Result; @@ -39,6 +38,7 @@ async fn shutdown() -> Result<()> { /// A panicked RaftCore should also return a proper error the next time accessing the `Raft`. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +#[cfg_attr(feature = "monoio", ignore)] async fn return_error_after_panic() -> Result<()> { let config = Arc::new( Config { @@ -109,4 +109,3 @@ async fn return_error_after_shutdown() -> Result<()> { Ok(()) } -*/ diff --git a/tests/tests/membership/t12_concurrent_write_and_add_learner.rs b/tests/tests/membership/t12_concurrent_write_and_add_learner.rs index 869e26b7f..08e0c8cf6 100644 --- a/tests/tests/membership/t12_concurrent_write_and_add_learner.rs +++ b/tests/tests/membership/t12_concurrent_write_and_add_learner.rs @@ -1,14 +1,16 @@ -/* use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::Config; use openraft::LogIdOptionExt; +use openraft::RaftTypeConfig; use openraft::ServerState; use openraft::Vote; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -100,7 +102,7 @@ async fn concurrent_write_and_add_learner() -> Result<()> { let r = router.clone(); let handle = { - tokio::spawn( + ::AsyncRuntime::spawn( async move { r.add_learner(leader, 3).await.unwrap(); Ok::<(), anyhow::Error>(()) @@ -163,4 +165,3 @@ async fn wait_log(router: &RaftRouter, node_ids: &BTreeSet, want_log: u64) fn timeout() -> Option { Some(Duration::from_millis(500)) } -*/ From cac30f906a88fa5aa87076eee782c17958b17702 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Wed, 21 Feb 2024 17:42:18 +0100 Subject: [PATCH 13/17] Fixup: Review comments Signed-off-by: Anthony Griffon --- README.md | 3 ++- openraft/src/async_runtime/mod.rs | 2 +- openraft/src/lib.rs | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 61c2cb09b..6cd86a6e9 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,8 @@

Openraft

- Advanced Raft in 🦀 Rust using Tokio or Monoio. Please ⭐ on github! + Advanced Raft in 🦀 Rust using Tokio or +Monoio. Please ⭐ on github!

diff --git a/openraft/src/async_runtime/mod.rs b/openraft/src/async_runtime/mod.rs index 23155012d..5e73ec92a 100644 --- a/openraft/src/async_runtime/mod.rs +++ b/openraft/src/async_runtime/mod.rs @@ -7,7 +7,7 @@ use crate::Instant; use crate::OptionalSend; use crate::OptionalSync; -pub mod tokio; +#[cfg(not(feature = "monoio"))] pub mod tokio; #[cfg(feature = "monoio")] pub mod monoio; diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index b71813192..e61a7ea54 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -77,7 +77,9 @@ pub use type_config::RaftTypeConfig; #[cfg(feature = "monoio")] pub use crate::async_runtime::monoio::MonoioInstant; #[cfg(feature = "monoio")] pub use crate::async_runtime::monoio::MonoioRuntime; +#[cfg(not(feature = "monoio"))] pub use crate::async_runtime::tokio::TokioInstant; +#[cfg(not(feature = "monoio"))] pub use crate::async_runtime::tokio::TokioRuntime; pub use crate::async_runtime::AsyncRuntime; pub use crate::change_members::ChangeMembers; From b8a6e921835b3559fd67f8a6506e0de50813d9b9 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 22 Feb 2024 21:18:55 +0100 Subject: [PATCH 14/17] Fix: Monoio yield when running the main loop due to an issue with the runtime Signed-off-by: Anthony Griffon --- openraft/src/core/raft_core.rs | 5 +++++ tests/tests/membership/t11_add_learner.rs | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 921a08fb7..04d3c0341 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -926,6 +926,11 @@ where let raft_msg_processed = self.process_raft_msg(balancer.raft_msg()).await?; let notify_processed = self.process_notify(balancer.notify()).await?; + // HACK: To force a yield when using monoio, it seems if we are not doing this we got + // an issue + #[cfg(feature = "monoio")] + monoio::time::sleep(Duration::from_millis(0)).await; + // If one of the channel consumed all its budget, re-balance the budget ratio. #[allow(clippy::collapsible_else_if)] diff --git a/tests/tests/membership/t11_add_learner.rs b/tests/tests/membership/t11_add_learner.rs index d505e2f14..238edaadc 100644 --- a/tests/tests/membership/t11_add_learner.rs +++ b/tests/tests/membership/t11_add_learner.rs @@ -196,7 +196,6 @@ async fn add_learner_with_set_nodes() -> Result<()> { /// Because adding learner is also a change-membership operation, a new membership config log will /// let raft consider the previous membership config log as committed, which is actually not. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] -#[cfg_attr(feature = "monoio", ignore)] // Crashing the future is causing a whole crash with monoio async fn add_learner_when_previous_membership_not_committed() -> Result<()> { let config = Arc::new( Config { From f9cad898e1315ae108af1e7098572533d1c9c35e Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 4 Mar 2024 09:21:35 +0100 Subject: [PATCH 15/17] Refactor: Rebase & update --- Cargo.toml | 1 + openraft/Cargo.toml | 6 +- openraft/src/async_runtime.rs | 269 --------------------------- openraft/src/async_runtime/mod.rs | 36 +++- openraft/src/async_runtime/monoio.rs | 29 ++- openraft/src/async_runtime/tokio.rs | 34 +++- 6 files changed, 98 insertions(+), 277 deletions(-) delete mode 100644 openraft/src/async_runtime.rs diff --git a/Cargo.toml b/Cargo.toml index de830b539..d1d9fcc99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ clap = { version = "4.1.11", features = ["derive", "env"] } derive_more = { version="0.99.9" } futures = "0.3" lazy_static = "1.4.0" +local-sync = "0.1" maplit = "1.0.2" monoio = "0.2.2" pretty_assertions = "1.0.0" diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index 0044a37b9..908236b46 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -27,6 +27,7 @@ serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } monoio = { workspace = true, optional = true } +local-sync = { workspace = true, optional = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } @@ -79,8 +80,9 @@ compat = [] compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing"] compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"] -monoio = ["dep:monoio", "singlethreaded"] -monoio-sync = ["dep:monoio", "monoio/sync"] +# Turn on monoio Runtime with singlethreaded monoio tasks +monoio = ["dep:monoio", "singlethreaded", "dep:local-sync"] +# monoio-sync = ["dep:monoio", "monoio/sync"] # Allows an application to implement a custom the v2 storage API. # See `openraft::storage::v2` for more details. diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs deleted file mode 100644 index 67e6ff114..000000000 --- a/openraft/src/async_runtime.rs +++ /dev/null @@ -1,269 +0,0 @@ -use std::fmt::Debug; -use std::fmt::Display; -use std::future::Future; -use std::time::Duration; - -use crate::Instant; -use crate::OptionalSend; -use crate::OptionalSync; -use crate::TokioInstant; - -/// A trait defining interfaces with an asynchronous runtime. -/// -/// The intention of this trait is to allow an application using this crate to bind an asynchronous -/// runtime that suits it the best. -/// -/// Some additional related functions are also exposed by this trait. -/// -/// ## Note -/// -/// The default asynchronous runtime is `tokio`. -pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + OptionalSync + 'static { - /// The error type of [`Self::JoinHandle`]. - type JoinError: Debug + Display + OptionalSend; - - /// The return type of [`Self::spawn`]. - type JoinHandle: Future> - + OptionalSend - + OptionalSync - + Unpin; - - /// The type that enables the user to sleep in an asynchronous runtime. - type Sleep: Future + OptionalSend + OptionalSync; - - /// A measurement of a monotonically non-decreasing clock. - type Instant: Instant; - - /// The timeout error type. - type TimeoutError: Debug + Display + OptionalSend; - - /// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user - /// to await the outcome of a [`Future`]. - type Timeout + OptionalSend>: Future> + OptionalSend; - - /// Type of a thread-local random number generator. - type ThreadLocalRng: rand::Rng; - - /// Type of a `oneshot` sender. - type OneshotSender: AsyncOneshotSendExt + OptionalSend + OptionalSync + Debug + Sized; - - /// Type of a `oneshot` receiver error. - type OneshotReceiverError: std::error::Error + OptionalSend; - - /// Type of a `oneshot` receiver. - type OneshotReceiver: OptionalSend - + OptionalSync - + Future> - + Unpin; - - /// Spawn a new task. - fn spawn(future: T) -> Self::JoinHandle - where - T: Future + OptionalSend + 'static, - T::Output: OptionalSend + 'static; - - /// Wait until `duration` has elapsed. - fn sleep(duration: Duration) -> Self::Sleep; - - /// Wait until `deadline` is reached. - fn sleep_until(deadline: Self::Instant) -> Self::Sleep; - - /// Require a [`Future`] to complete before the specified duration has elapsed. - fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout; - - /// Require a [`Future`] to complete before the specified instant in time. - fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout; - - /// Check if the [`Self::JoinError`] is `panic`. - fn is_panic(join_error: &Self::JoinError) -> bool; - - /// Get the random number generator to use for generating random numbers. - /// - /// # Note - /// - /// This is a per-thread instance, which cannot be shared across threads or - /// sent to another thread. - fn thread_rng() -> Self::ThreadLocalRng; - - /// Creates a new one-shot channel for sending single values. - /// - /// The function returns separate "send" and "receive" handles. The `Sender` - /// handle is used by the producer to send the value. The `Receiver` handle is - /// used by the consumer to receive the value. - /// - /// Each handle can be used on separate tasks. - fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) - where T: OptionalSend; -} - -/// `Tokio` is the default asynchronous executor. -#[derive(Debug, Default, PartialEq, Eq)] -pub struct TokioRuntime; - -pub struct TokioOneShotSender(pub tokio::sync::oneshot::Sender); - -impl AsyncRuntime for TokioRuntime { - type JoinError = tokio::task::JoinError; - type JoinHandle = tokio::task::JoinHandle; - type Sleep = tokio::time::Sleep; - type Instant = TokioInstant; - type TimeoutError = tokio::time::error::Elapsed; - type Timeout + OptionalSend> = tokio::time::Timeout; - type ThreadLocalRng = rand::rngs::ThreadRng; - type OneshotSender = TokioOneShotSender; - type OneshotReceiver = tokio::sync::oneshot::Receiver; - type OneshotReceiverError = tokio::sync::oneshot::error::RecvError; - - #[inline] - fn spawn(future: T) -> Self::JoinHandle - where - T: Future + OptionalSend + 'static, - T::Output: OptionalSend + 'static, - { - #[cfg(feature = "singlethreaded")] - { - tokio::task::spawn_local(future) - } - #[cfg(not(feature = "singlethreaded"))] - { - tokio::task::spawn(future) - } - } - - #[inline] - fn sleep(duration: Duration) -> Self::Sleep { - tokio::time::sleep(duration) - } - - #[inline] - fn sleep_until(deadline: Self::Instant) -> Self::Sleep { - tokio::time::sleep_until(deadline) - } - - #[inline] - fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { - tokio::time::timeout(duration, future) - } - - #[inline] - fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout { - tokio::time::timeout_at(deadline, future) - } - - #[inline] - fn is_panic(join_error: &Self::JoinError) -> bool { - join_error.is_panic() - } - - #[inline] - fn thread_rng() -> Self::ThreadLocalRng { - rand::thread_rng() - } - - #[inline] - fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) - where T: OptionalSend { - let (tx, rx) = tokio::sync::oneshot::channel(); - (TokioOneShotSender(tx), rx) - } -} - -pub trait AsyncOneshotSendExt: Unpin { - /// Attempts to send a value on this channel, returning it back if it could - /// not be sent. - /// - /// This method consumes `self` as only one value may ever be sent on a `oneshot` - /// channel. It is not marked async because sending a message to an `oneshot` - /// channel never requires any form of waiting. Because of this, the `send` - /// method can be used in both synchronous and asynchronous code without - /// problems. - fn send(self, t: T) -> Result<(), T>; -} - -impl AsyncOneshotSendExt for TokioOneShotSender { - #[inline] - fn send(self, t: T) -> Result<(), T> { - self.0.send(t) - } -} - -impl Debug for TokioOneShotSender { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_tuple("TokioSendWrapper").finish() - } -} - -#[cfg(feature = "monoio")] -pub mod monoio { - use std::fmt::Debug; - use std::future::Future; - use std::time::Duration; - - use crate::AsyncRuntime; - use crate::OptionalSend; - - #[derive(Debug, Default)] - pub struct MonoioRuntime; - - pub type MonoioInstant = monoio::time::Instant; - - impl crate::Instant for monoio::time::Instant { - #[inline] - fn now() -> Self { - monoio::time::Instant::now() - } - } - - impl AsyncRuntime for MonoioRuntime { - type JoinError = crate::error::Infallible; - type JoinHandle = monoio::task::JoinHandle>; - type Sleep = monoio::time::Sleep; - type Instant = MonoioInstant; - type TimeoutError = monoio::time::error::Elapsed; - type Timeout + OptionalSend> = monoio::time::Timeout; - type ThreadLocalRng = rand::rngs::ThreadRng; - - #[inline] - fn spawn(future: T) -> Self::JoinHandle - where - T: Future + OptionalSend + 'static, - T::Output: OptionalSend + 'static, - { - monoio::spawn(async move { Ok(future.await) }) - } - - #[inline] - fn sleep(duration: Duration) -> Self::Sleep { - monoio::time::sleep(duration) - } - - #[inline] - fn sleep_until(deadline: Self::Instant) -> Self::Sleep { - monoio::time::sleep_until(deadline) - } - - #[inline] - fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { - monoio::time::timeout(duration, future) - } - - #[inline] - fn timeout_at + OptionalSend>( - deadline: Self::Instant, - future: F, - ) -> Self::Timeout { - monoio::time::timeout_at(deadline, future) - } - - #[inline] - fn is_panic(_join_error: &Self::JoinError) -> bool { - // A monoio task shouldn't panic or it would bubble the panic in case of a join - false - } - - #[inline] - fn thread_rng() -> Self::ThreadLocalRng { - rand::thread_rng() - } - } -} diff --git a/openraft/src/async_runtime/mod.rs b/openraft/src/async_runtime/mod.rs index 5e73ec92a..42e059253 100644 --- a/openraft/src/async_runtime/mod.rs +++ b/openraft/src/async_runtime/mod.rs @@ -21,7 +21,7 @@ use crate::OptionalSync; /// ## Note /// /// The default asynchronous runtime is `tokio`. -pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static { +pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + OptionalSync + 'static { /// The error type of [`Self::JoinHandle`]. type JoinError: Debug + Display + OptionalSend; @@ -47,6 +47,18 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static /// Type of a thread-local random number generator. type ThreadLocalRng: rand::Rng; + /// Type of a `oneshot` sender. + type OneshotSender: AsyncOneshotSendExt + OptionalSend + OptionalSync + Debug + Sized; + + /// Type of a `oneshot` receiver error. + type OneshotReceiverError: std::error::Error + OptionalSend; + + /// Type of a `oneshot` receiver. + type OneshotReceiver: OptionalSend + + OptionalSync + + Future> + + Unpin; + /// Spawn a new task. fn spawn(future: T) -> Self::JoinHandle where @@ -75,4 +87,26 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static /// This is a per-thread instance, which cannot be shared across threads or /// sent to another thread. fn thread_rng() -> Self::ThreadLocalRng; + + /// Creates a new one-shot channel for sending single values. + /// + /// The function returns separate "send" and "receive" handles. The `Sender` + /// handle is used by the producer to send the value. The `Receiver` handle is + /// used by the consumer to receive the value. + /// + /// Each handle can be used on separate tasks. + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) + where T: OptionalSend; +} + +pub trait AsyncOneshotSendExt: Unpin { + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. + /// + /// This method consumes `self` as only one value may ever be sent on a `oneshot` + /// channel. It is not marked async because sending a message to an `oneshot` + /// channel never requires any form of waiting. Because of this, the `send` + /// method can be used in both synchronous and asynchronous code without + /// problems. + fn send(self, t: T) -> Result<(), T>; } diff --git a/openraft/src/async_runtime/monoio.rs b/openraft/src/async_runtime/monoio.rs index 7284c28f6..28d458794 100644 --- a/openraft/src/async_runtime/monoio.rs +++ b/openraft/src/async_runtime/monoio.rs @@ -5,7 +5,9 @@ use std::time::Duration; use crate::AsyncRuntime; use crate::OptionalSend; -#[derive(Debug, Default)] +use super::AsyncOneshotSendExt; + +#[derive(Debug, Default, PartialEq, Eq)] pub struct MonoioRuntime; pub type MonoioInstant = monoio::time::Instant; @@ -17,6 +19,8 @@ impl crate::Instant for monoio::time::Instant { } } +pub struct MonoioOneshotSender(pub local_sync::oneshot::Sender); + impl AsyncRuntime for MonoioRuntime { type JoinError = crate::error::Infallible; type JoinHandle = monoio::task::JoinHandle>; @@ -25,6 +29,9 @@ impl AsyncRuntime for MonoioRuntime { type TimeoutError = monoio::time::error::Elapsed; type Timeout + OptionalSend> = monoio::time::Timeout; type ThreadLocalRng = rand::rngs::ThreadRng; + type OneshotSender = MonoioOneshotSender; + type OneshotReceiver = local_sync::oneshot::Receiver; + type OneshotReceiverError = local_sync::oneshot::error::RecvError; #[inline] fn spawn(future: T) -> Self::JoinHandle @@ -65,4 +72,24 @@ impl AsyncRuntime for MonoioRuntime { fn thread_rng() -> Self::ThreadLocalRng { rand::thread_rng() } + + #[inline] + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) + where T: OptionalSend { + let (tx, rx) = local_sync::oneshot::channel(); + (MonoioOneshotSender(tx), rx) + } +} + +impl AsyncOneshotSendExt for MonoioOneshotSender { + #[inline] + fn send(self, t: T) -> Result<(), T> { + self.0.send(t) + } +} + +impl Debug for MonoioOneshotSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("MonoioSendWrapper").finish() + } } diff --git a/openraft/src/async_runtime/tokio.rs b/openraft/src/async_runtime/tokio.rs index 3814155b9..14b97d112 100644 --- a/openraft/src/async_runtime/tokio.rs +++ b/openraft/src/async_runtime/tokio.rs @@ -2,14 +2,11 @@ use std::fmt::Debug; use std::future::Future; use std::time::Duration; +use crate::async_runtime::AsyncOneshotSendExt; use crate::AsyncRuntime; use crate::Instant; use crate::OptionalSend; -/// `Tokio` is the default asynchronous executor. -#[derive(Debug, Default)] -pub struct TokioRuntime; - pub type TokioInstant = tokio::time::Instant; impl Instant for tokio::time::Instant { @@ -19,6 +16,12 @@ impl Instant for tokio::time::Instant { } } +/// `Tokio` is the default asynchronous executor. +#[derive(Debug, Default, PartialEq, Eq)] +pub struct TokioRuntime; + +pub struct TokioOneShotSender(pub tokio::sync::oneshot::Sender); + impl AsyncRuntime for TokioRuntime { type JoinError = tokio::task::JoinError; type JoinHandle = tokio::task::JoinHandle; @@ -27,6 +30,9 @@ impl AsyncRuntime for TokioRuntime { type TimeoutError = tokio::time::error::Elapsed; type Timeout + OptionalSend> = tokio::time::Timeout; type ThreadLocalRng = rand::rngs::ThreadRng; + type OneshotSender = TokioOneShotSender; + type OneshotReceiver = tokio::sync::oneshot::Receiver; + type OneshotReceiverError = tokio::sync::oneshot::error::RecvError; #[inline] fn spawn(future: T) -> Self::JoinHandle @@ -73,4 +79,24 @@ impl AsyncRuntime for TokioRuntime { fn thread_rng() -> Self::ThreadLocalRng { rand::thread_rng() } + + #[inline] + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) + where T: OptionalSend { + let (tx, rx) = tokio::sync::oneshot::channel(); + (TokioOneShotSender(tx), rx) + } +} + +impl AsyncOneshotSendExt for TokioOneShotSender { + #[inline] + fn send(self, t: T) -> Result<(), T> { + self.0.send(t) + } +} + +impl Debug for TokioOneShotSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("TokioSendWrapper").finish() + } } From 41194cbcfb89c84d334d864a51958b56081f098d Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 4 Mar 2024 09:24:28 +0100 Subject: [PATCH 16/17] Chore: fmt --- openraft/src/async_runtime/monoio.rs | 3 +-- tests/tests/fixtures/mod.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/openraft/src/async_runtime/monoio.rs b/openraft/src/async_runtime/monoio.rs index 28d458794..c8ce99335 100644 --- a/openraft/src/async_runtime/monoio.rs +++ b/openraft/src/async_runtime/monoio.rs @@ -2,11 +2,10 @@ use std::fmt::Debug; use std::future::Future; use std::time::Duration; +use super::AsyncOneshotSendExt; use crate::AsyncRuntime; use crate::OptionalSend; -use super::AsyncOneshotSendExt; - #[derive(Debug, Default, PartialEq, Eq)] pub struct MonoioRuntime; diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index cdbeccd75..8729f3fa1 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -33,9 +33,9 @@ use openraft::error::RaftError; use openraft::error::RemoteError; use openraft::error::Unreachable; use openraft::metrics::Wait; -use openraft::network::RPCOption; #[cfg(feature = "monoio")] use openraft::monoio::MonoioInstant; #[cfg(feature = "monoio")] use openraft::monoio::MonoioRuntime; +use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; From 427c9b1e964a0a69c60b4db0e19d911e03f58bfd Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Mon, 4 Mar 2024 09:37:18 +0100 Subject: [PATCH 17/17] Refactor: use 'AsyncRuntime::spawn' instead of tokio::spawn --- openraft/Cargo.toml | 1 - .../tests/append_entries/t90_issue_216_stale_last_log_id.rs | 5 ++++- tests/tests/fixtures/mod.rs | 2 -- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index 908236b46..cb82585ce 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -82,7 +82,6 @@ compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"] # Turn on monoio Runtime with singlethreaded monoio tasks monoio = ["dep:monoio", "singlethreaded", "dep:local-sync"] -# monoio-sync = ["dep:monoio", "monoio/sync"] # Allows an application to implement a custom the v2 storage API. # See `openraft::storage::v2` for more details. diff --git a/tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs b/tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs index c4d68a4b2..6dcded067 100644 --- a/tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs +++ b/tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs @@ -3,7 +3,10 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::AsyncRuntime; use openraft::Config; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -41,7 +44,7 @@ async fn stale_last_log_id() -> Result<()> { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); for i in 0..n_threads { - tokio::spawn({ + ::AsyncRuntime::spawn({ let router = router.clone(); let tx = tx.clone(); diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 8729f3fa1..ea29a1337 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -33,8 +33,6 @@ use openraft::error::RaftError; use openraft::error::RemoteError; use openraft::error::Unreachable; use openraft::metrics::Wait; -#[cfg(feature = "monoio")] use openraft::monoio::MonoioInstant; -#[cfg(feature = "monoio")] use openraft::monoio::MonoioRuntime; use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory;