From 84539cb03b95ad96875b58961b2d29d9268f2f41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 26 Apr 2023 08:54:52 +0800 Subject: [PATCH] Change: move snapshot type definition from storage traits to `RaftTypeConfig` Similar to `NodeId` or `Entry`, `SnapshotData` is also a data type that is specified by the application and needs to be defined in `RaftTypeConfig`, which is a collection of all application types. Public types changes: - Add `SnapshotData` to `RaftTypeConfig`: ```rust pub trait RaftTypeConfig { /// ... type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; } ``` - Remove associated type `SnapshotData` from `storage::RaftStorage`. - Remove associated type `SnapshotData` from `storage::v2::RaftStateMachine`. Corresponding API changes: - Change `storage::RaftSnapshotBuilder` to `RaftSnapshotBuilder` - Change `storage::Snapshot` to `storage::Snapshot` Upgrade tip: Update generic type parameter in application types to pass compilation. --- cluster_benchmark/tests/benchmark/network.rs | 2 +- cluster_benchmark/tests/benchmark/store.rs | 36 +++--- .../tests/benchmark/store/test.rs | 4 +- examples/raft-kv-memstore/src/lib.rs | 4 +- examples/raft-kv-memstore/src/store/mod.rs | 16 +-- examples/raft-kv-rocksdb/src/lib.rs | 10 +- examples/raft-kv-rocksdb/src/store.rs | 16 +-- memstore/src/lib.rs | 35 +++--- memstore/src/test.rs | 13 ++- openraft/src/compat/compat07.rs | 6 +- openraft/src/core/raft_core.rs | 25 ++--- openraft/src/core/sm/command.rs | 41 +++---- openraft/src/core/sm/mod.rs | 41 +++---- openraft/src/core/streaming_state.rs | 19 ++-- openraft/src/engine/testing.rs | 3 + openraft/src/raft.rs | 11 ++ openraft/src/replication/mod.rs | 103 ++++++------------ openraft/src/storage/adapter.rs | 9 +- openraft/src/storage/mod.rs | 38 ++----- openraft/src/storage/v2.rs | 19 +--- rocksstore-compat07/src/compatibility_test.rs | 2 +- rocksstore-compat07/src/lib.rs | 37 +++---- rocksstore-compat07/src/test.rs | 8 +- rocksstore/src/lib.rs | 35 +++--- rocksstore/src/test.rs | 8 +- sledstore/src/lib.rs | 35 +++--- sledstore/src/test.rs | 8 +- stores/rocksstore-v2/src/lib.rs | 39 ++++--- stores/rocksstore-v2/src/test.rs | 4 +- .../t10_conflict_with_empty_entries.rs | 6 +- .../append_entries/t11_append_conflicts.rs | 2 +- .../t11_append_entries_with_bigger_term.rs | 2 +- tests/tests/fixtures/mod.rs | 2 +- ...building_snapshot_does_not_block_append.rs | 2 +- 34 files changed, 286 insertions(+), 355 deletions(-) diff --git a/cluster_benchmark/tests/benchmark/network.rs b/cluster_benchmark/tests/benchmark/network.rs index cde7d3411..cc88fd7d3 100644 --- a/cluster_benchmark/tests/benchmark/network.rs +++ b/cluster_benchmark/tests/benchmark/network.rs @@ -23,10 +23,10 @@ use openraft::Raft; use openraft::RaftNetwork; use openraft::RaftNetworkFactory; -use crate::store::Config as MemConfig; use crate::store::LogStore; use crate::store::NodeId; use crate::store::StateMachineStore; +use crate::store::TypeConfig as MemConfig; pub type BenchRaft = Raft, Arc>; diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index 712c228f9..b3a3e79ae 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -20,6 +20,7 @@ use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; use openraft::RaftLogId; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -38,7 +39,8 @@ pub struct ClientResponse {} pub type NodeId = u64; openraft::declare_raft_types!( - pub Config: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (), Entry = Entry + pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (), + Entry = Entry, SnapshotData = Cursor> ); #[derive(Debug)] @@ -55,7 +57,7 @@ pub struct StateMachine { pub struct LogStore { vote: RwLock>>, - log: RwLock>>, + log: RwLock>>, last_purged_log_id: RwLock>>, } @@ -98,11 +100,11 @@ impl StateMachineStore { } #[async_trait] -impl RaftLogReader for Arc { +impl RaftLogReader for Arc { async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, - ) -> Result>, StorageError> { + ) -> Result>, StorageError> { let mut entries = vec![]; { let log = self.log.read().await; @@ -114,7 +116,7 @@ impl RaftLogReader for Arc { Ok(entries) } - async fn get_log_state(&mut self) -> Result, StorageError> { + async fn get_log_state(&mut self) -> Result, StorageError> { let log = self.log.read().await; let last_serialized = log.iter().rev().next().map(|(_, ent)| ent); @@ -138,9 +140,9 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder>> for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result>>, StorageError> { + async fn build_snapshot(&mut self) -> Result, StorageError> { let data; let last_applied_log; let last_membership; @@ -190,7 +192,7 @@ impl RaftSnapshotBuilder>> for Arc { } #[async_trait] -impl RaftLogStorage for Arc { +impl RaftLogStorage for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { let mut v = self.vote.write().await; @@ -225,7 +227,7 @@ impl RaftLogStorage for Arc { #[tracing::instrument(level = "trace", skip_all)] async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> - where I: IntoIterator> + Send { + where I: IntoIterator> + Send { { let mut log = self.log.write().await; log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index, entry))); @@ -242,9 +244,7 @@ impl RaftLogStorage for Arc { } #[async_trait] -impl RaftStateMachine for Arc { - type SnapshotData = Cursor>; - +impl RaftStateMachine for Arc { async fn applied_state( &mut self, ) -> Result<(Option>, StoredMembership), StorageError> { @@ -253,7 +253,7 @@ impl RaftStateMachine for Arc { } async fn apply(&mut self, entries: I) -> Result, StorageError> - where I: IntoIterator> + Send { + where I: IntoIterator> + Send { let mut sm = self.sm.write().await; let it = entries.into_iter(); @@ -275,7 +275,9 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -283,7 +285,7 @@ impl RaftStateMachine for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { let new_snapshot = StoredSnapshot { meta: meta.clone(), @@ -305,9 +307,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { match &*self.current_snapshot.read().await { Some(snapshot) => { let data = snapshot.data.clone(); diff --git a/cluster_benchmark/tests/benchmark/store/test.rs b/cluster_benchmark/tests/benchmark/store/test.rs index c017afb7d..42f538168 100644 --- a/cluster_benchmark/tests/benchmark/store/test.rs +++ b/cluster_benchmark/tests/benchmark/store/test.rs @@ -5,14 +5,14 @@ use openraft::testing::StoreBuilder; use openraft::testing::Suite; use openraft::StorageError; -use crate::store::Config; use crate::store::LogStore; use crate::store::NodeId; use crate::store::StateMachineStore; +use crate::store::TypeConfig; struct Builder {} #[async_trait] -impl StoreBuilder, Arc> for Builder { +impl StoreBuilder, Arc> for Builder { async fn build(&self) -> Result<((), Arc, Arc), StorageError> { let log_store = LogStore::new_async().await; let sm = Arc::new(StateMachineStore::new()); diff --git a/examples/raft-kv-memstore/src/lib.rs b/examples/raft-kv-memstore/src/lib.rs index 177143956..d681983ad 100644 --- a/examples/raft-kv-memstore/src/lib.rs +++ b/examples/raft-kv-memstore/src/lib.rs @@ -1,6 +1,7 @@ #![allow(clippy::uninlined_format_args)] #![deny(unused_qualifications)] +use std::io::Cursor; use std::sync::Arc; use actix_web::middleware; @@ -29,7 +30,8 @@ pub type NodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. - pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode, Entry = openraft::Entry + pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode, + Entry = openraft::Entry, SnapshotData = Cursor> ); pub type LogStore = Adaptor>; diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index 873f09902..990077b30 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -15,6 +15,7 @@ use openraft::LogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -123,9 +124,9 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder>> for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result>>, StorageError> { + async fn build_snapshot(&mut self) -> Result, StorageError> { let data; let last_applied_log; let last_membership; @@ -176,7 +177,6 @@ impl RaftSnapshotBuilder>> for Arc { #[async_trait] impl RaftStorage for Arc { - type SnapshotData = Cursor>; type LogReader = Self; type SnapshotBuilder = Self; @@ -277,7 +277,9 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -285,7 +287,7 @@ impl RaftStorage for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, @@ -312,9 +314,7 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { match &*self.current_snapshot.read().await { Some(snapshot) => { let data = snapshot.data.clone(); diff --git a/examples/raft-kv-rocksdb/src/lib.rs b/examples/raft-kv-rocksdb/src/lib.rs index ab16d2b3b..0cf12d80b 100644 --- a/examples/raft-kv-rocksdb/src/lib.rs +++ b/examples/raft-kv-rocksdb/src/lib.rs @@ -2,6 +2,7 @@ #![deny(unused_qualifications)] use std::fmt::Display; +use std::io::Cursor; use std::path::Path; use std::sync::Arc; @@ -33,17 +34,14 @@ pub struct Node { impl Display for Node { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "ExampleNode {{ rpc_addr: {}, api_addr: {} }}", - self.rpc_addr, self.api_addr - ) + write!(f, "Node {{ rpc_addr: {}, api_addr: {} }}", self.rpc_addr, self.api_addr) } } openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. - pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node, Entry = openraft::Entry + pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node, + Entry = openraft::Entry, SnapshotData = Cursor> ); pub type LogStore = Adaptor>; diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 555f72a98..178ac9969 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -22,6 +22,7 @@ use openraft::LogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -363,9 +364,9 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder>> for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result>>, StorageError> { + async fn build_snapshot(&mut self) -> Result, StorageError> { let data; let last_applied_log; let last_membership; @@ -411,7 +412,6 @@ impl RaftSnapshotBuilder>> for Arc { #[async_trait] impl RaftStorage for Arc { - type SnapshotData = Cursor>; type LogReader = Self; type SnapshotBuilder = Self; @@ -508,7 +508,9 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -516,7 +518,7 @@ impl RaftStorage for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, @@ -541,9 +543,7 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { match Store::get_current_snapshot_(self)? { Some(snapshot) => { let data = snapshot.data.clone(); diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index feb1e4e89..056edd82d 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -21,6 +21,7 @@ use openraft::EntryPayload; use openraft::LogId; use openraft::RaftLogId; use openraft::RaftStorage; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -72,7 +73,8 @@ pub type MemNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. - pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (), Entry = Entry + pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (), + Entry = Entry, SnapshotData = Cursor> ); /// The application snapshot type which the `MemStore` works with. @@ -184,11 +186,11 @@ impl Default for MemStore { } #[async_trait] -impl RaftLogReader for Arc { +impl RaftLogReader for Arc { async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, - ) -> Result>, StorageError> { + ) -> Result>, StorageError> { let mut entries = vec![]; { let log = self.log.read().await; @@ -201,14 +203,15 @@ impl RaftLogReader for Arc { Ok(entries) } - async fn get_log_state(&mut self) -> Result, StorageError> { + async fn get_log_state(&mut self) -> Result, StorageError> { let log = self.log.read().await; let last_serialized = log.iter().rev().next().map(|(_, ent)| ent); let last = match last_serialized { None => None, Some(serialized) => { - let ent: Entry = serde_json::from_str(serialized).map_err(|e| StorageIOError::read_logs(&e))?; + let ent: Entry = + serde_json::from_str(serialized).map_err(|e| StorageIOError::read_logs(&e))?; Some(*ent.get_log_id()) } }; @@ -228,9 +231,9 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder>> for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result>>, StorageError> { + async fn build_snapshot(&mut self) -> Result, StorageError> { let data; let last_applied_log; let last_membership; @@ -289,9 +292,7 @@ impl RaftSnapshotBuilder>> for Arc { } #[async_trait] -impl RaftStorage for Arc { - type SnapshotData = Cursor>; - +impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { tracing::debug!(?vote, "save_vote"); @@ -352,7 +353,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn append_to_log(&mut self, entries: I) -> Result<(), StorageError> - where I: IntoIterator> + Send { + where I: IntoIterator> + Send { let mut log = self.log.write().await; for entry in entries { let s = @@ -365,7 +366,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn apply_to_state_machine( &mut self, - entries: &[Entry], + entries: &[Entry], ) -> Result, StorageError> { let mut res = Vec::with_capacity(entries.len()); @@ -399,7 +400,9 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -407,7 +410,7 @@ impl RaftStorage for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, @@ -441,9 +444,7 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { match &*self.current_snapshot.read().await { Some(snapshot) => { let data = snapshot.data.clone(); diff --git a/memstore/src/test.rs b/memstore/src/test.rs index 60bf9ffbd..dd9b0943c 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -6,16 +6,23 @@ use openraft::testing::StoreBuilder; use openraft::testing::Suite; use openraft::StorageError; -use crate::Config; use crate::MemNodeId; use crate::MemStore; +use crate::TypeConfig; struct MemBuilder {} #[async_trait] -impl StoreBuilder>, Adaptor>> for MemBuilder { +impl StoreBuilder>, Adaptor>> for MemBuilder { async fn build( &self, - ) -> Result<((), Adaptor>, Adaptor>), StorageError> { + ) -> Result< + ( + (), + Adaptor>, + Adaptor>, + ), + StorageError, + > { let store = MemStore::new_async().await; let (log_store, sm) = Adaptor::new(store); Ok(((), log_store, sm)) diff --git a/openraft/src/compat/compat07.rs b/openraft/src/compat/compat07.rs index 17af01913..73e1d475e 100644 --- a/openraft/src/compat/compat07.rs +++ b/openraft/src/compat/compat07.rs @@ -414,6 +414,8 @@ pub mod testing { #[cfg(test)] mod tests { + use std::io::Cursor; + use maplit::btreemap; use maplit::btreeset; @@ -507,7 +509,9 @@ mod tests { } crate::declare_raft_types!( - pub TestingConfig: D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode, Entry = crate::Entry + pub TestingConfig: + D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode, + Entry = crate::Entry, SnapshotData = Cursor> ); #[test] diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index ecbf688b5..3f79d55da 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -3,6 +3,7 @@ use std::collections::BTreeMap; use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use std::marker::PhantomData; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -11,8 +12,6 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use futures::TryFutureExt; use maplit::btreeset; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; use tokio::select; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -130,24 +129,20 @@ impl Debug for ApplyResult { /// Data for a Leader. /// /// It is created when RaftCore enters leader state, and will be dropped when it quits leader state. -pub(crate) struct LeaderData -where SD: AsyncRead + AsyncSeek + Send + Unpin + 'static -{ +pub(crate) struct LeaderData { /// Channels to send result back to client when logs are committed. pub(crate) client_resp_channels: BTreeMap>, /// A mapping of node IDs the replication state of the target node. // TODO(xp): make it a field of RaftCore. it does not have to belong to leader. // It requires the Engine to emit correct add/remove replication commands - pub(super) replications: BTreeMap>, + pub(super) replications: BTreeMap>, /// The time to send next heartbeat. pub(crate) next_heartbeat: Instant, } -impl LeaderData -where SD: AsyncRead + AsyncSeek + Send + Unpin + 'static -{ +impl LeaderData { pub(crate) fn new() -> Self { Self { client_resp_channels: Default::default(), @@ -181,11 +176,11 @@ where pub(crate) log_store: LS, /// A controlling handle to the [`RaftStateMachine`] worker. - pub(crate) sm_handle: sm::Handle, + pub(crate) sm_handle: sm::Handle, pub(crate) engine: Engine, - pub(crate) leader_data: Option>, + pub(crate) leader_data: Option>, #[allow(dead_code)] pub(crate) tx_api: mpsc::UnboundedSender>, @@ -201,6 +196,8 @@ where pub(crate) tx_metrics: watch::Sender>, pub(crate) span: Span, + + pub(crate) _p: PhantomData, } impl RaftCore @@ -804,7 +801,7 @@ where &mut self, target: C::NodeId, progress_entry: ProgressEntry, - ) -> ReplicationHandle { + ) -> ReplicationHandle { // Safe unwrap(): target must be in membership let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap(); @@ -813,7 +810,7 @@ where let session_id = ReplicationSessionId::new(*self.engine.state.vote_ref(), *membership_log_id); - ReplicationCore::::spawn( + ReplicationCore::::spawn( target, session_id, self.config.clone(), @@ -948,7 +945,7 @@ where #[allow(clippy::collapsible_else_if)] if notify_processed == balancer.notify() { - tracing::info!("there may be more Notify to process, increaase Notify ratio"); + tracing::info!("there may be more Notify to process, increase Notify ratio"); balancer.increase_notify(); } else { if raft_msg_processed == balancer.raft_msg() { diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index 315fdeb6d..c8fba808f 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -8,27 +8,22 @@ use tokio::sync::oneshot; use crate::display_ext::DisplaySlice; use crate::error::InstallSnapshotError; use crate::raft::InstallSnapshotRequest; -use crate::storage::RaftStateMachine; use crate::RaftTypeConfig; use crate::Snapshot; use crate::SnapshotMeta; -pub(crate) struct Command -where - C: RaftTypeConfig, - SM: RaftStateMachine, +pub(crate) struct Command +where C: RaftTypeConfig { pub(crate) seq: CommandSeq, - pub(crate) payload: CommandPayload, + pub(crate) payload: CommandPayload, /// Custom respond function to be called when the command is done. pub(crate) respond: Box, } -impl Debug for Command -where - C: RaftTypeConfig, - SM: RaftStateMachine, +impl Debug for Command +where C: RaftTypeConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("StateMachineCommand") @@ -38,10 +33,8 @@ where } } -impl Command -where - C: RaftTypeConfig, - SM: RaftStateMachine, +impl Command +where C: RaftTypeConfig { /// Generate the next command seq with atomic increment. fn next_seq() -> CommandSeq { @@ -49,7 +42,7 @@ where SEQ.fetch_add(1, Ordering::Relaxed) } - pub(crate) fn new(payload: CommandPayload, respond: F) -> Self + pub(crate) fn new(payload: CommandPayload, respond: F) -> Self where F: FnOnce() + Send + 'static { Self { seq: Self::next_seq(), @@ -68,7 +61,7 @@ where Command::new(payload, || {}) } - pub(crate) fn get_snapshot(tx: oneshot::Sender>>) -> Self { + pub(crate) fn get_snapshot(tx: oneshot::Sender>>) -> Self { let payload = CommandPayload::GetSnapshot { tx }; Command::new(payload, || {}) } @@ -111,18 +104,14 @@ where pub(crate) type CommandSeq = u64; /// The payload of a state machine command. -pub(crate) enum CommandPayload -where - C: RaftTypeConfig, - SM: RaftStateMachine, +pub(crate) enum CommandPayload +where C: RaftTypeConfig { /// Instruct the state machine to create a snapshot based on its most recent view. BuildSnapshot, /// Get the latest built snapshot. - GetSnapshot { - tx: oneshot::Sender>>, - }, + GetSnapshot { tx: oneshot::Sender>> }, /// Receive a chunk of snapshot. /// @@ -146,10 +135,8 @@ where Apply { entries: Vec }, } -impl Debug for CommandPayload -where - C: RaftTypeConfig, - SM: RaftStateMachine, +impl Debug for CommandPayload +where C: RaftTypeConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index 46ff719ed..e8e21b622 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -40,38 +40,32 @@ use crate::core::notify::Notify; // TODO: replace it with Snapshot /// Received snapshot from the leader. -struct Received> { +struct Received { snapshot_meta: SnapshotMeta, - data: Box, + data: Box, } -impl Received -where - C: RaftTypeConfig, - SM: RaftStateMachine, +impl Received +where C: RaftTypeConfig { - pub(crate) fn new(snapshot_meta: SnapshotMeta, data: Box) -> Self { + pub(crate) fn new(snapshot_meta: SnapshotMeta, data: Box) -> Self { Self { snapshot_meta, data } } } /// State machine worker handle for sending command to it. -pub(crate) struct Handle -where - C: RaftTypeConfig, - SM: RaftStateMachine, +pub(crate) struct Handle +where C: RaftTypeConfig { - cmd_tx: mpsc::UnboundedSender>, + cmd_tx: mpsc::UnboundedSender>, #[allow(dead_code)] join_handle: tokio::task::JoinHandle<()>, } -impl Handle -where - C: RaftTypeConfig, - SM: RaftStateMachine, +impl Handle +where C: RaftTypeConfig { - pub(crate) fn send(&mut self, cmd: Command) -> Result<(), mpsc::error::SendError>> { + pub(crate) fn send(&mut self, cmd: Command) -> Result<(), mpsc::error::SendError>> { tracing::debug!("sending command to state machine worker: {:?}", cmd); self.cmd_tx.send(cmd) } @@ -84,11 +78,11 @@ where { state_machine: SM, - streaming: Option>, + streaming: Option>, - received: Option>, + received: Option>, - cmd_rx: mpsc::UnboundedReceiver>, + cmd_rx: mpsc::UnboundedReceiver>, resp_tx: mpsc::UnboundedSender>, } @@ -99,7 +93,7 @@ where SM: RaftStateMachine, { /// Spawn a new state machine worker, return a controlling handle. - pub(crate) fn spawn(state_machine: SM, resp_tx: mpsc::UnboundedSender>) -> Handle { + pub(crate) fn spawn(state_machine: SM, resp_tx: mpsc::UnboundedSender>) -> Handle { let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let worker = Worker { @@ -244,10 +238,7 @@ where } #[tracing::instrument(level = "info", skip_all)] - async fn get_snapshot( - &mut self, - tx: oneshot::Sender>>, - ) -> Result<(), StorageError> { + async fn get_snapshot(&mut self, tx: oneshot::Sender>>) -> Result<(), StorageError> { tracing::info!("{}", func_name!()); let snapshot = self.state_machine.get_current_snapshot().await?; diff --git a/openraft/src/core/streaming_state.rs b/openraft/src/core/streaming_state.rs index 702a8813b..53528e086 100644 --- a/openraft/src/core/streaming_state.rs +++ b/openraft/src/core/streaming_state.rs @@ -1,8 +1,6 @@ use std::io::SeekFrom; -use tokio::io::AsyncSeek; use tokio::io::AsyncSeekExt; -use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use crate::raft::InstallSnapshotRequest; @@ -13,7 +11,9 @@ use crate::SnapshotId; use crate::StorageError; /// The Raft node is streaming in a snapshot from the leader. -pub(crate) struct Streaming { +pub(crate) struct Streaming +where C: RaftTypeConfig +{ /// The offset of the last byte written to the snapshot. pub(crate) offset: u64, @@ -21,13 +21,13 @@ pub(crate) struct Streaming { pub(crate) snapshot_id: SnapshotId, /// A handle to the snapshot writer. - pub(crate) snapshot_data: Box, + pub(crate) snapshot_data: Box, } -impl Streaming -where SD: AsyncSeek + AsyncWrite + Unpin +impl Streaming +where C: RaftTypeConfig { - pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { + pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { Self { offset: 0, snapshot_id, @@ -36,10 +36,7 @@ where SD: AsyncSeek + AsyncWrite + Unpin } /// Receive a chunk of snapshot data. - pub(crate) async fn receive( - &mut self, - req: InstallSnapshotRequest, - ) -> Result> { + pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { // TODO: check id? // Always seek to the target offset if not an exact match. diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index ed81bb49b..a9ba2eef4 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -1,3 +1,5 @@ +use std::io::Cursor; + use crate::RaftTypeConfig; /// Trivial Raft type config for Engine related unit test. @@ -10,4 +12,5 @@ impl RaftTypeConfig for UTConfig { type NodeId = u64; type Node = (); type Entry = crate::Entry; + type SnapshotData = Cursor>; } diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index f1bc9aa7e..9e9f0f611 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -9,6 +9,9 @@ use std::sync::Arc; use std::time::Duration; use maplit::btreemap; +use tokio::io::AsyncRead; +use tokio::io::AsyncSeek; +use tokio::io::AsyncWrite; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; @@ -94,6 +97,12 @@ pub trait RaftTypeConfig: /// Raft log entry, which can be built from an AppData. type Entry: RaftEntry + FromAppData; + + /// Snapshot data for exposing a snapshot for reading & writing. + /// + /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) + /// for details on where and how this is used. + type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; } /// Define types for a Raft type configuration. @@ -280,6 +289,8 @@ where tx_metrics, span: core_span, + + _p: Default::default(), }; let core_handle = tokio::spawn(core.main(rx_shutdown).instrument(trace_span!("spawn").or_current())); diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index a9461312d..473362cad 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -11,9 +11,7 @@ use anyerror::AnyError; use futures::future::FutureExt; pub(crate) use replication_session_id::ReplicationSessionId; pub(crate) use response::Response; -use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeek; use tokio::io::AsyncSeekExt; use tokio::select; use tokio::sync::mpsc; @@ -40,13 +38,11 @@ use crate::raft::AppendEntriesResponse; use crate::raft::InstallSnapshotRequest; use crate::storage::RaftLogReader; use crate::storage::RaftLogStorage; -use crate::storage::RaftStateMachine; use crate::storage::Snapshot; use crate::ErrorSubject; use crate::ErrorVerb; use crate::LogId; use crate::MessageSummary; -use crate::Node; use crate::NodeId; use crate::RPCTypes; use crate::RaftNetwork; @@ -57,17 +53,14 @@ use crate::StorageIOError; use crate::ToStorageResult; /// The handle to a spawned replication stream. -pub(crate) struct ReplicationHandle -where - NID: NodeId, - N: Node, - S: AsyncRead + AsyncSeek + Send + Unpin + 'static, +pub(crate) struct ReplicationHandle +where C: RaftTypeConfig { /// The spawn handle the `ReplicationCore` task. pub(crate) join_handle: JoinHandle>, /// The channel used for communicating with the replication task. - pub(crate) tx_repl: mpsc::UnboundedSender>, + pub(crate) tx_repl: mpsc::UnboundedSender>, } /// A task responsible for sending replication events to a target follower in the Raft cluster. @@ -75,12 +68,11 @@ where /// NOTE: we do not stack replication requests to targets because this could result in /// out-of-order delivery. We always buffer until we receive a success response, then send the /// next payload from the buffer. -pub(crate) struct ReplicationCore +pub(crate) struct ReplicationCore where C: RaftTypeConfig, N: RaftNetworkFactory, LS: RaftLogStorage, - SM: RaftStateMachine, { /// The ID of the target Raft node which replication events are to be sent to. target: C::NodeId, @@ -93,7 +85,7 @@ where tx_raft_core: mpsc::UnboundedSender>, /// A channel for receiving events from the RaftCore. - rx_repl: mpsc::UnboundedReceiver>, + rx_repl: mpsc::UnboundedReceiver>, /// The `RaftNetwork` interface. network: N::Network, @@ -115,15 +107,14 @@ where matching: Option>, /// Next replication action to run. - next_action: Option>, + next_action: Option>, } -impl ReplicationCore +impl ReplicationCore where C: RaftTypeConfig, N: RaftNetworkFactory, LS: RaftLogStorage, - SM: RaftStateMachine, { /// Spawn a new replication task for the target node. #[tracing::instrument(level = "trace", skip_all,fields(target=display(target), session_id=display(session_id)))] @@ -139,7 +130,7 @@ where log_reader: LS::LogReader, tx_raft_core: mpsc::UnboundedSender>, span: tracing::Span, - ) -> ReplicationHandle { + ) -> ReplicationHandle { tracing::debug!( session_id = display(&session_id), target = display(&target), @@ -490,7 +481,7 @@ where } #[tracing::instrument(level = "trace", skip_all)] - pub fn process_event(&mut self, event: Replicate) { + pub fn process_event(&mut self, event: Replicate) { tracing::debug!(event=%event.summary(), "process_event"); match event { @@ -525,7 +516,7 @@ where async fn stream_snapshot( &mut self, id: u64, - rx: oneshot::Receiver>>, + rx: oneshot::Receiver>>, ) -> Result<(), ReplicationError> { tracing::info!(id = display(id), "{}", func_name!()); @@ -645,21 +636,15 @@ where /// It defines what data to send to a follower/learner and an id to identify who is sending this /// data. #[derive(Debug)] -pub(crate) struct Data -where - NID: NodeId, - N: Node, - SD: AsyncRead + AsyncSeek + Send + Unpin + 'static, +pub(crate) struct Data +where C: RaftTypeConfig { id: u64, - payload: Payload, + payload: Payload, } -impl MessageSummary> for Data -where - NID: NodeId, - N: Node, - S: AsyncRead + AsyncSeek + Send + Unpin + 'static, +impl MessageSummary> for Data +where C: RaftTypeConfig { fn summary(&self) -> String { match &self.payload { @@ -673,20 +658,17 @@ where } } -impl Data -where - NID: NodeId, - N: Node, - SD: AsyncRead + AsyncSeek + Send + Unpin + 'static, +impl Data +where C: RaftTypeConfig { - fn new_logs(id: u64, log_id_range: LogIdRange) -> Self { + fn new_logs(id: u64, log_id_range: LogIdRange) -> Self { Self { id, payload: Payload::Logs(log_id_range), } } - fn new_snapshot(id: u64, snapshot_rx: oneshot::Receiver>>) -> Self { + fn new_snapshot(id: u64, snapshot_rx: oneshot::Receiver>>) -> Self { Self { id, payload: Payload::Snapshot(snapshot_rx), @@ -697,21 +679,15 @@ where /// The data to replication. /// /// Either a series of logs or a snapshot. -pub(crate) enum Payload -where - NID: NodeId, - N: Node, - SD: AsyncRead + AsyncSeek + Send + Unpin + 'static, +pub(crate) enum Payload +where C: RaftTypeConfig { - Logs(LogIdRange), - Snapshot(oneshot::Receiver>>), + Logs(LogIdRange), + Snapshot(oneshot::Receiver>>), } -impl Debug for Payload -where - NID: NodeId, - N: Node, - SD: AsyncRead + AsyncSeek + Send + Unpin + 'static, +impl Debug for Payload +where C: RaftTypeConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -733,42 +709,33 @@ pub(crate) enum ReplicationResult { } /// A replication request sent by RaftCore leader state to replication stream. -pub(crate) enum Replicate -where - NID: NodeId, - N: Node, - SD: AsyncRead + AsyncSeek + Send + Unpin + 'static, +pub(crate) enum Replicate +where C: RaftTypeConfig { /// Inform replication stream to forward the committed log id to followers/learners. - Committed(Option>), + Committed(Option>), /// Send an empty AppendEntries RPC as heartbeat. Heartbeat, /// Send a chunk of data, e.g., logs or snapshot. - Data(Data), + Data(Data), } -impl Replicate -where - NID: NodeId, - N: Node, - SD: AsyncRead + AsyncSeek + Send + Unpin + 'static, +impl Replicate +where C: RaftTypeConfig { - pub(crate) fn logs(id: u64, log_id_range: LogIdRange) -> Self { + pub(crate) fn logs(id: u64, log_id_range: LogIdRange) -> Self { Self::Data(Data::new_logs(id, log_id_range)) } - pub(crate) fn snapshot(id: u64, snapshot_rx: oneshot::Receiver>>) -> Self { + pub(crate) fn snapshot(id: u64, snapshot_rx: oneshot::Receiver>>) -> Self { Self::Data(Data::new_snapshot(id, snapshot_rx)) } } -impl MessageSummary> for Replicate -where - NID: NodeId, - N: Node, - S: AsyncRead + AsyncSeek + Send + Unpin + 'static, +impl MessageSummary> for Replicate +where C: RaftTypeConfig { fn summary(&self) -> String { match self { diff --git a/openraft/src/storage/adapter.rs b/openraft/src/storage/adapter.rs index 56e3a5003..f935812e5 100644 --- a/openraft/src/storage/adapter.rs +++ b/openraft/src/storage/adapter.rs @@ -166,7 +166,6 @@ where C: RaftTypeConfig, S: RaftStorage, { - type SnapshotData = S::SnapshotData; type SnapshotBuilder = S::SnapshotBuilder; async fn applied_state( @@ -185,21 +184,19 @@ where S::get_snapshot_builder(self.storage_mut().await.deref_mut()).await } - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { S::begin_receiving_snapshot(self.storage_mut().await.deref_mut()).await } async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box, ) -> Result<(), StorageError> { S::install_snapshot(self.storage_mut().await.deref_mut(), meta, snapshot).await } - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { S::get_current_snapshot(self.storage_mut().await.deref_mut()).await } } diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index 0e1248866..922b7af93 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -15,9 +15,6 @@ use async_trait::async_trait; pub use helper::StorageHelper; pub use log_store_ext::RaftLogReaderExt; pub use snapshot_signature::SnapshotSignature; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; -use tokio::io::AsyncWrite; pub use v2::RaftLogStorage; pub use v2::RaftStateMachine; @@ -88,17 +85,14 @@ where /// The data associated with the current snapshot. #[derive(Debug)] -pub struct Snapshot -where - NID: NodeId, - N: Node, - SD: AsyncRead + AsyncSeek + Send + Unpin + 'static, +pub struct Snapshot +where C: RaftTypeConfig { /// metadata of a snapshot - pub meta: SnapshotMeta, + pub meta: SnapshotMeta, /// A read handle to the associated snapshot. - pub snapshot: Box, + pub snapshot: Box, } /// The state about logs. @@ -155,10 +149,8 @@ where C: RaftTypeConfig /// co-implemented with [`RaftStorage`] interface on the same cloneable object, if the underlying /// state machine is anyway synchronized. #[async_trait] -pub trait RaftSnapshotBuilder: Send + Sync + 'static -where - C: RaftTypeConfig, - SD: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static, +pub trait RaftSnapshotBuilder: Send + Sync + 'static +where C: RaftTypeConfig { /// Build snapshot /// @@ -169,7 +161,7 @@ where /// - Performing log compaction, e.g. merge log entries that operates on the same key, like a /// LSM-tree does, /// - or by fetching a snapshot from the state machine. - async fn build_snapshot(&mut self) -> Result, StorageError>; + async fn build_snapshot(&mut self) -> Result, StorageError>; // NOTES: // This interface is geared toward small file-based snapshots. However, not all snapshots can @@ -193,17 +185,11 @@ where pub trait RaftStorage: RaftLogReader + Send + Sync + 'static where C: RaftTypeConfig { - /// The storage engine's associated type used for exposing a snapshot for reading & writing. - /// - /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) - /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; - /// Log reader type. type LogReader: RaftLogReader; /// Snapshot builder type. - type SnapshotBuilder: RaftSnapshotBuilder; + type SnapshotBuilder: RaftSnapshotBuilder; // --- Vote @@ -317,7 +303,7 @@ where C: RaftTypeConfig /// /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/storage.html) /// for details on log compaction / snapshotting. - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError>; + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError>; /// Install a snapshot which has finished streaming from the leader. /// @@ -329,7 +315,7 @@ where C: RaftTypeConfig async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box, ) -> Result<(), StorageError>; /// Get a readable handle to the current snapshot, along with its metadata. @@ -343,7 +329,5 @@ where C: RaftTypeConfig /// /// A proper snapshot implementation will store the term, index and membership config as part /// of the snapshot, which should be decoded for creating this method's response data. - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError>; + async fn get_current_snapshot(&mut self) -> Result>, StorageError>; } diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index df1e1b965..2dea09285 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -3,9 +3,6 @@ //! logs, and [`RaftStateMachine`] is responsible for storing state machine and snapshot. use async_trait::async_trait; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; -use tokio::io::AsyncWrite; use crate::storage::callback::LogFlushed; use crate::storage::v2::sealed::Sealed; @@ -112,14 +109,8 @@ where C: RaftTypeConfig pub trait RaftStateMachine: Sealed + Send + Sync + 'static where C: RaftTypeConfig { - /// The associated type used for exposing a snapshot for reading & writing. - /// - /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) - /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; - /// Snapshot builder type. - type SnapshotBuilder: RaftSnapshotBuilder; + type SnapshotBuilder: RaftSnapshotBuilder; // TODO: This can be made into sync, provided all state machines will use atomic read or the // like. @@ -184,7 +175,7 @@ where C: RaftTypeConfig /// /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/storage.html) /// for details on snapshot streaming. - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError>; + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError>; /// Install a snapshot which has finished streaming from the leader. /// @@ -200,7 +191,7 @@ where C: RaftTypeConfig async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box, ) -> Result<(), StorageError>; /// Get a readable handle to the current snapshot. @@ -216,7 +207,5 @@ where C: RaftTypeConfig /// A proper snapshot implementation will store last-applied-log-id and the /// last-applied-membership config as part of the snapshot, which should be decoded for /// creating this method's response data. - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError>; + async fn get_current_snapshot(&mut self) -> Result>, StorageError>; } diff --git a/rocksstore-compat07/src/compatibility_test.rs b/rocksstore-compat07/src/compatibility_test.rs index 276e953c1..1ad102a09 100644 --- a/rocksstore-compat07/src/compatibility_test.rs +++ b/rocksstore-compat07/src/compatibility_test.rs @@ -28,7 +28,7 @@ impl compat::testing::StoreBuilder07 for Builder07 { #[async_trait::async_trait] impl compat::testing::StoreBuilder for BuilderLatest { - type C = crate::Config; + type C = crate::TypeConfig; type S = Arc; async fn build(&self, p: &Path) -> Arc { diff --git a/rocksstore-compat07/src/lib.rs b/rocksstore-compat07/src/lib.rs index a47339426..1b25559fc 100644 --- a/rocksstore-compat07/src/lib.rs +++ b/rocksstore-compat07/src/lib.rs @@ -46,6 +46,7 @@ use openraft::LogState; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; +use openraft::RaftTypeConfig; use openraft::Snapshot; use openraft::SnapshotMeta; use openraft::StorageError; @@ -64,7 +65,8 @@ pub type RocksNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. - pub Config: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = EmptyNode, Entry = Entry + pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = EmptyNode, + Entry = Entry, SnapshotData = Cursor> ); #[derive(Serialize, Deserialize, Debug, Clone)] @@ -366,8 +368,8 @@ impl RocksStore { } #[async_trait] -impl RaftLogReader for Arc { - async fn get_log_state(&mut self) -> StorageResult> { +impl RaftLogReader for Arc { + async fn get_log_state(&mut self) -> StorageResult> { let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); let last_log_id = match last { @@ -375,7 +377,7 @@ impl RaftLogReader for Arc { Some(res) => { let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; - let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; + let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; let ent = ent.upgrade(); Some(ent.log_id) } @@ -404,7 +406,7 @@ impl RaftLogReader for Arc { async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, - ) -> StorageResult>> { + ) -> StorageResult>> { let start = match range.start_bound() { std::ops::Bound::Included(x) => id_to_bin(*x), std::ops::Bound::Excluded(x) => id_to_bin(*x + 1), @@ -422,7 +424,7 @@ impl RaftLogReader for Arc { break; } - let entry = serde_json::from_slice::>(&val).map_err(read_logs_err)?; + let entry = serde_json::from_slice::>(&val).map_err(read_logs_err)?; let entry = entry.upgrade(); assert_eq!(id, entry.log_id.index); @@ -434,11 +436,9 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder>> for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot( - &mut self, - ) -> Result>>, StorageError> { + async fn build_snapshot(&mut self) -> Result, StorageError> { let data; let last_applied_log; let last_membership; @@ -483,8 +483,7 @@ impl RaftSnapshotBuilder>> for Arc { } #[async_trait] -impl RaftStorage for Arc { - type SnapshotData = Cursor>; +impl RaftStorage for Arc { type LogReader = Self; type SnapshotBuilder = Self; @@ -511,7 +510,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn append_to_log(&mut self, entries: I) -> StorageResult<()> - where I: IntoIterator> + Send { + where I: IntoIterator> + Send { for entry in entries { let id = id_to_bin(entry.log_id.index); assert_eq!(bin_to_id(&id), entry.log_id.index); @@ -563,7 +562,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn apply_to_state_machine( &mut self, - entries: &[Entry], + entries: &[Entry], ) -> Result, StorageError> { let mut res = Vec::with_capacity(entries.len()); @@ -595,7 +594,9 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -603,7 +604,7 @@ impl RaftStorage for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, @@ -635,9 +636,7 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { let curr_snap = self.get_meta_vec::()?; let bs = if let Some(x) = curr_snap { x diff --git a/rocksstore-compat07/src/test.rs b/rocksstore-compat07/src/test.rs index 3005006aa..378eb0d26 100644 --- a/rocksstore-compat07/src/test.rs +++ b/rocksstore-compat07/src/test.rs @@ -7,16 +7,16 @@ use openraft::testing::Suite; use openraft::StorageError; use tempfile::TempDir; -use crate::Config; use crate::RocksNodeId; use crate::RocksStore; +use crate::TypeConfig; -type LogStore = Adaptor>; -type StateMachine = Adaptor>; +type LogStore = Adaptor>; +type StateMachine = Adaptor>; struct RocksBuilder {} #[async_trait] -impl StoreBuilder for RocksBuilder { +impl StoreBuilder for RocksBuilder { async fn build(&self) -> Result<(TempDir, LogStore, StateMachine), StorageError> { let td = TempDir::new().expect("couldn't create temp dir"); let store = RocksStore::new(td.path()).await; diff --git a/rocksstore/src/lib.rs b/rocksstore/src/lib.rs index 62bd7c11a..b3dceba80 100644 --- a/rocksstore/src/lib.rs +++ b/rocksstore/src/lib.rs @@ -27,6 +27,7 @@ use openraft::LogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -44,7 +45,8 @@ pub type RocksNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. - pub Config: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, Entry = Entry + pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, + Entry = Entry, SnapshotData = Cursor> ); /** @@ -336,15 +338,15 @@ impl RocksStore { } #[async_trait] -impl RaftLogReader for Arc { - async fn get_log_state(&mut self) -> StorageResult> { +impl RaftLogReader for Arc { + async fn get_log_state(&mut self) -> StorageResult> { let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); let last_log_id = match last { None => None, Some(res) => { let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; - let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; + let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; Some(ent.log_id) } }; @@ -365,7 +367,7 @@ impl RaftLogReader for Arc { async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, - ) -> StorageResult>> { + ) -> StorageResult>> { let start = match range.start_bound() { std::ops::Bound::Included(x) => id_to_bin(*x), std::ops::Bound::Excluded(x) => id_to_bin(*x + 1), @@ -394,11 +396,9 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder>> for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot( - &mut self, - ) -> Result>>, StorageError> { + async fn build_snapshot(&mut self) -> Result, StorageError> { let data; let last_applied_log; let last_membership; @@ -443,8 +443,7 @@ impl RaftSnapshotBuilder>> for Arc { } #[async_trait] -impl RaftStorage for Arc { - type SnapshotData = Cursor>; +impl RaftStorage for Arc { type LogReader = Self; type SnapshotBuilder = Self; @@ -459,7 +458,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn append_to_log(&mut self, entries: I) -> StorageResult<()> - where I: IntoIterator> + Send { + where I: IntoIterator> + Send { for entry in entries { let id = id_to_bin(entry.log_id.index); assert_eq!(bin_to_id(&id), entry.log_id.index); @@ -511,7 +510,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn apply_to_state_machine( &mut self, - entries: &[Entry], + entries: &[Entry], ) -> Result, StorageError> { let mut res = Vec::with_capacity(entries.len()); @@ -543,7 +542,9 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -551,7 +552,7 @@ impl RaftStorage for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, @@ -577,9 +578,7 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { let curr_snap = self.get_meta::()?; match curr_snap { diff --git a/rocksstore/src/test.rs b/rocksstore/src/test.rs index 6b959d556..679dbfcf8 100644 --- a/rocksstore/src/test.rs +++ b/rocksstore/src/test.rs @@ -7,16 +7,16 @@ use openraft::testing::Suite; use openraft::StorageError; use tempfile::TempDir; -use crate::Config; use crate::RocksNodeId; use crate::RocksStore; +use crate::TypeConfig; -type LogStore = Adaptor>; -type StateMachine = Adaptor>; +type LogStore = Adaptor>; +type StateMachine = Adaptor>; struct RocksBuilder {} #[async_trait] -impl StoreBuilder for RocksBuilder { +impl StoreBuilder for RocksBuilder { async fn build(&self) -> Result<(TempDir, LogStore, StateMachine), StorageError> { let td = TempDir::new().expect("couldn't create temp dir"); let store = RocksStore::new(td.path()).await; diff --git a/sledstore/src/lib.rs b/sledstore/src/lib.rs index 6948d80af..61551643b 100644 --- a/sledstore/src/lib.rs +++ b/sledstore/src/lib.rs @@ -26,6 +26,7 @@ use openraft::RaftLogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -39,7 +40,8 @@ pub type ExampleNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. - pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode, Entry = Entry + pub TypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode, + Entry = Entry, SnapshotData = Cursor> ); /** @@ -382,8 +384,8 @@ impl SledStore { } #[async_trait] -impl RaftLogReader for Arc { - async fn get_log_state(&mut self) -> StorageResult> { +impl RaftLogReader for Arc { + async fn get_log_state(&mut self) -> StorageResult> { let last_purged = self.get_last_purged_()?; let logs_tree = logs(&self.db); @@ -397,7 +399,7 @@ impl RaftLogReader for Arc { }); }; - let last_ent = serde_json::from_slice::>(&ent_ivec).map_err(read_logs_err)?; + let last_ent = serde_json::from_slice::>(&ent_ivec).map_err(read_logs_err)?; let last_log_id = Some(*last_ent.get_log_id()); let last_log_id = std::cmp::max(last_log_id, last_purged); @@ -410,7 +412,7 @@ impl RaftLogReader for Arc { async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, - ) -> StorageResult>> { + ) -> StorageResult>> { let start_bound = range.start_bound(); let start = match start_bound { std::ops::Bound::Included(x) => id_to_bin(*x), @@ -440,11 +442,9 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder>> for Arc { +impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot( - &mut self, - ) -> Result>>, StorageError> { + async fn build_snapshot(&mut self) -> Result, StorageError> { let data; let last_applied_log; let last_membership; @@ -489,8 +489,7 @@ impl RaftSnapshotBuilder>> for Arc } #[async_trait] -impl RaftStorage for Arc { - type SnapshotData = Cursor>; +impl RaftStorage for Arc { type LogReader = Self; type SnapshotBuilder = Self; @@ -509,7 +508,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn append_to_log(&mut self, entries: I) -> StorageResult<()> - where I: IntoIterator> + Send { + where I: IntoIterator> + Send { let logs_tree = logs(&self.db); let mut batch = sled::Batch::default(); for entry in entries { @@ -576,7 +575,7 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "trace", skip(self, entries))] async fn apply_to_state_machine( &mut self, - entries: &[Entry], + entries: &[Entry], ) -> Result, StorageError> { let sm = self.state_machine.write().await; let state_machine = state_machine(&self.db); @@ -619,7 +618,9 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -627,7 +628,7 @@ impl RaftStorage for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, @@ -652,9 +653,7 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { match SledStore::get_current_snapshot_(self)? { Some(snapshot) => { let data = snapshot.data.clone(); diff --git a/sledstore/src/test.rs b/sledstore/src/test.rs index fb29630d1..eebc5cb26 100644 --- a/sledstore/src/test.rs +++ b/sledstore/src/test.rs @@ -8,8 +8,8 @@ use openraft::StorageError; use tempfile::TempDir; use crate::ExampleNodeId; -use crate::ExampleTypeConfig; use crate::SledStore; +use crate::TypeConfig; struct SledBuilder {} @@ -18,11 +18,11 @@ pub fn test_sled_store() -> Result<(), StorageError> { Suite::test_all(SledBuilder {}) } -type LogStore = Adaptor>; -type StateMachine = Adaptor>; +type LogStore = Adaptor>; +type StateMachine = Adaptor>; #[async_trait] -impl StoreBuilder for SledBuilder { +impl StoreBuilder for SledBuilder { async fn build(&self) -> Result<(TempDir, LogStore, StateMachine), StorageError> { let td = TempDir::new().expect("couldn't create temp dir"); diff --git a/stores/rocksstore-v2/src/lib.rs b/stores/rocksstore-v2/src/lib.rs index 6d652f29c..6f22f42be 100644 --- a/stores/rocksstore-v2/src/lib.rs +++ b/stores/rocksstore-v2/src/lib.rs @@ -33,6 +33,7 @@ use openraft::LogId; use openraft::RaftLogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -50,8 +51,9 @@ use serde::Serialize; pub type RocksNodeId = u64; openraft::declare_raft_types!( - /// Declare the type configuration for `MemStore`. - pub Config: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, Entry = Entry + /// Declare the type configuration. + pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, + Entry = Entry, SnapshotData = Cursor> ); /** @@ -229,15 +231,15 @@ impl RocksLogStore { } #[async_trait] -impl RaftLogReader for RocksLogStore { - async fn get_log_state(&mut self) -> StorageResult> { +impl RaftLogReader for RocksLogStore { + async fn get_log_state(&mut self) -> StorageResult> { let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); let last_log_id = match last { None => None, Some(res) => { let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; - let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; + let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; Some(ent.log_id) } }; @@ -258,7 +260,7 @@ impl RaftLogReader for RocksLogStore { async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, - ) -> StorageResult>> { + ) -> StorageResult>> { let start = match range.start_bound() { std::ops::Bound::Included(x) => id_to_bin(*x), std::ops::Bound::Excluded(x) => id_to_bin(*x + 1), @@ -287,11 +289,9 @@ impl RaftLogReader for RocksLogStore { } #[async_trait] -impl RaftSnapshotBuilder>> for RocksStateMachine { +impl RaftSnapshotBuilder for RocksStateMachine { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot( - &mut self, - ) -> Result>>, StorageError> { + async fn build_snapshot(&mut self) -> Result, StorageError> { let data; // Serialize the data of the state machine. @@ -335,7 +335,7 @@ impl RaftSnapshotBuilder>> for RocksStateMachine { } #[async_trait] -impl RaftLogStorage for RocksLogStore { +impl RaftLogStorage for RocksLogStore { type LogReader = Self; async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { @@ -358,7 +358,7 @@ impl RaftLogStorage for RocksLogStore { callback: LogFlushed, ) -> Result<(), StorageError> where - I: IntoIterator> + Send, + I: IntoIterator> + Send, { for entry in entries { let id = id_to_bin(entry.log_id.index); @@ -407,8 +407,7 @@ impl RaftLogStorage for RocksLogStore { } } #[async_trait] -impl RaftStateMachine for RocksStateMachine { - type SnapshotData = Cursor>; +impl RaftStateMachine for RocksStateMachine { type SnapshotBuilder = Self; async fn applied_state( @@ -418,7 +417,7 @@ impl RaftStateMachine for RocksStateMachine { } async fn apply(&mut self, entries: I) -> Result, StorageError> - where I: IntoIterator> + Send { + where I: IntoIterator> + Send { let entries_iter = entries.into_iter(); let mut res = Vec::with_capacity(entries_iter.size_hint().0); @@ -452,14 +451,16 @@ impl RaftStateMachine for RocksStateMachine { self.clone() } - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: Box<::SnapshotData>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, @@ -490,9 +491,7 @@ impl RaftStateMachine for RocksStateMachine { Ok(()) } - async fn get_current_snapshot( - &mut self, - ) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { let x = self .db .get_cf(self.db.cf_handle("sm_meta").unwrap(), "snapshot") diff --git a/stores/rocksstore-v2/src/test.rs b/stores/rocksstore-v2/src/test.rs index 4b9ae1ab1..aece3f9f5 100644 --- a/stores/rocksstore-v2/src/test.rs +++ b/stores/rocksstore-v2/src/test.rs @@ -4,14 +4,14 @@ use openraft::testing::Suite; use openraft::StorageError; use tempfile::TempDir; -use crate::Config; use crate::RocksLogStore; use crate::RocksNodeId; use crate::RocksStateMachine; +use crate::TypeConfig; struct RocksBuilder {} #[async_trait] -impl StoreBuilder for RocksBuilder { +impl StoreBuilder for RocksBuilder { async fn build(&self) -> Result<(TempDir, RocksLogStore, RocksStateMachine), StorageError> { let td = TempDir::new().expect("couldn't create temp dir"); let (log_store, sm) = crate::new(td.path()).await; diff --git a/tests/tests/append_entries/t10_conflict_with_empty_entries.rs b/tests/tests/append_entries/t10_conflict_with_empty_entries.rs index eaf4fa292..0652875ed 100644 --- a/tests/tests/append_entries/t10_conflict_with_empty_entries.rs +++ b/tests/tests/append_entries/t10_conflict_with_empty_entries.rs @@ -50,7 +50,7 @@ async fn conflict_with_empty_entries() -> Result<()> { // Expect conflict even if the message contains no entries. - let rpc = AppendEntriesRequest:: { + let rpc = AppendEntriesRequest:: { vote: Vote::new_committed(1, 1), prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 5)), entries: vec![], @@ -63,7 +63,7 @@ async fn conflict_with_empty_entries() -> Result<()> { // Feed logs - let rpc = AppendEntriesRequest:: { + let rpc = AppendEntriesRequest:: { vote: Vote::new_committed(1, 1), prev_log_id: None, entries: vec![blank_ent(0, 0, 0), blank_ent(1, 0, 1), Entry { @@ -83,7 +83,7 @@ async fn conflict_with_empty_entries() -> Result<()> { // Expect a conflict with prev_log_index == 3 - let rpc = AppendEntriesRequest:: { + let rpc = AppendEntriesRequest:: { vote: Vote::new_committed(1, 1), prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 3)), entries: vec![], diff --git a/tests/tests/append_entries/t11_append_conflicts.rs b/tests/tests/append_entries/t11_append_conflicts.rs index b9914b204..af3f68394 100644 --- a/tests/tests/append_entries/t11_append_conflicts.rs +++ b/tests/tests/append_entries/t11_append_conflicts.rs @@ -229,7 +229,7 @@ where { let logs = log_store.get_log_entries(..).await?; let skip = 0; - let want: Vec> = terms + let want: Vec> = terms .iter() .skip(skip) .enumerate() diff --git a/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs b/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs index ff9508392..cb44ca84d 100644 --- a/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs +++ b/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs @@ -44,7 +44,7 @@ async fn append_entries_with_bigger_term() -> Result<()> { .await?; // append entries with term 2 and leader_id, this MUST cause hard state changed in node 0 - let req = AppendEntriesRequest:: { + let req = AppendEntriesRequest:: { vote: Vote::new_committed(2, 1), prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), log_index)), entries: vec![], diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 5f722f3fa..49ba07660 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -53,10 +53,10 @@ use openraft::RaftState; use openraft::ServerState; use openraft_memstore::ClientRequest; use openraft_memstore::ClientResponse; -use openraft_memstore::Config as MemConfig; use openraft_memstore::IntoMemClientRequest; use openraft_memstore::MemNodeId; use openraft_memstore::MemStore; +use openraft_memstore::TypeConfig as MemConfig; #[allow(unused_imports)] use pretty_assertions::assert_eq; #[allow(unused_imports)] use pretty_assertions::assert_ne; use tracing_appender::non_blocking::WorkerGuard; diff --git a/tests/tests/log_compaction/t35_building_snapshot_does_not_block_append.rs b/tests/tests/log_compaction/t35_building_snapshot_does_not_block_append.rs index c831f9668..4a4d05be9 100644 --- a/tests/tests/log_compaction/t35_building_snapshot_does_not_block_append.rs +++ b/tests/tests/log_compaction/t35_building_snapshot_does_not_block_append.rs @@ -56,7 +56,7 @@ async fn building_snapshot_does_not_block_append() -> Result<()> { tracing::info!("--- send append-entries request to the follower that is building snapshot"); { - let rpc = AppendEntriesRequest:: { + let rpc = AppendEntriesRequest:: { vote: Vote::new_committed(1, 0), prev_log_id: Some(log_id(1, 0, log_index)), entries: vec![blank_ent(1, 0, 15)],