diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 861ff441f..5bb118962 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -15,7 +15,6 @@ use std::sync::Mutex; use openraft::async_trait::async_trait; use openraft::raft::Entry; use openraft::raft::EntryPayload; -use openraft::raft::Membership; use openraft::storage::HardState; use openraft::storage::InitialState; use openraft::storage::Snapshot; @@ -157,74 +156,13 @@ impl RaftStorageDebug for MemStore { } } -impl MemStore { - fn find_first_membership_log<'a, T, D>(mut it: T) -> Option - where - T: 'a + Iterator>, - D: AppData, - { - it.find_map(|entry| match &entry.payload { - EntryPayload::Membership(cfg) => Some(EffectiveMembership { - log_id: entry.log_id, - membership: cfg.clone(), - }), - _ => None, - }) - } - - /// Go backwards through the log to find the most recent membership config <= `upto_index`. - #[tracing::instrument(level = "trace", skip(self))] - pub async fn get_membership_from_log(&self, upto_index: Option) -> Result { - let membership_in_log = { - let log = self.log.read().await; - - let reversed_logs = log.values().rev(); - match upto_index { - Some(upto) => { - let skipped = reversed_logs.skip_while(|entry| entry.log_id.index > upto); - Self::find_first_membership_log(skipped) - } - None => Self::find_first_membership_log(reversed_logs), - } - }; - - // Find membership stored in state machine. - - let (_, membership_in_sm) = self.last_applied_state().await?; - - let membership = - if membership_in_log.as_ref().map(|x| x.log_id.index) > membership_in_sm.as_ref().map(|x| x.log_id.index) { - membership_in_log - } else { - membership_in_sm - }; - - // Create a default one if both are None. - - Ok(match membership { - Some(x) => x, - None => EffectiveMembership { - log_id: LogId { term: 0, index: 0 }, - membership: Membership::new_initial(self.id), - }, - }) - } -} - #[async_trait] impl RaftStorage for MemStore { type SnapshotData = Cursor>; - #[tracing::instrument(level = "trace", skip(self))] - async fn get_membership_config(&self) -> Result { - self.get_membership_from_log(None).await - } - - #[tracing::instrument(level = "trace", skip(self))] async fn get_initial_state(&self) -> Result { - let membership = self.get_membership_config().await?; - let mut hs = self.hs.write().await; - match &mut *hs { + let hs = self.read_hard_state().await?; + match hs { Some(inner) => { // Search for two place and use the max one, // because when a state machine is installed there could be logs @@ -237,6 +175,9 @@ impl RaftStorage for MemStore { let last_log_id = max(last_in_log, last_applied); + let membership = self.get_membership().await?; + let membership = membership.unwrap_or_else(|| EffectiveMembership::new_initial(self.id)); + Ok(InitialState { last_log_id, last_applied, @@ -246,7 +187,7 @@ impl RaftStorage for MemStore { } None => { let new = InitialState::new_initial(self.id); - *hs = Some(new.hard_state.clone()); + self.save_hard_state(&new.hard_state).await?; Ok(new) } } diff --git a/memstore/src/test.rs b/memstore/src/test.rs index af0e57fdc..932e31285 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -4,6 +4,7 @@ use std::marker::PhantomData; use async_trait::async_trait; use maplit::btreeset; +use openraft::raft::Membership; use openraft::DefensiveCheck; use openraft::DefensiveError; use openraft::StoreExt; @@ -84,8 +85,10 @@ where B: StoreBuilder, { fn test_store(builder: &B) -> anyhow::Result<()> { - run_fut(Suite::get_membership_config_default(builder))?; - run_fut(Suite::get_membership_config_from_log_and_sm(builder))?; + run_fut(Suite::last_membership_in_log_initial(builder))?; + run_fut(Suite::last_membership_in_log(builder))?; + run_fut(Suite::get_membership_initial(builder))?; + run_fut(Suite::get_membership_from_log_and_sm(builder))?; run_fut(Suite::get_initial_state_default(builder))?; run_fut(Suite::get_initial_state_membership_from_log_and_sm(builder))?; run_fut(Suite::get_initial_state_with_state(builder))?; @@ -109,17 +112,101 @@ where Ok(()) } - pub async fn get_membership_config_default(builder: &B) -> anyhow::Result<()> { + pub async fn last_membership_in_log_initial(builder: &B) -> anyhow::Result<()> { let store = builder.build(NODE_ID).await; - let membership = store.get_membership_config().await?; + let membership = store.last_membership_in_log(0).await?; - assert_eq!(Membership::new_single(btreeset! {NODE_ID}), membership.membership,); + assert!(membership.is_none()); Ok(()) } - pub async fn get_membership_config_from_log_and_sm(builder: &B) -> anyhow::Result<()> { + pub async fn last_membership_in_log(builder: &B) -> anyhow::Result<()> { + let store = builder.build(NODE_ID).await; + + tracing::info!("--- no log, do not read membership from state machine"); + { + store + .apply_to_state_machine(&[ + &Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Membership(Membership::new_single(btreeset! {3,4,5})), + }, + ]) + .await?; + + let mem = store.last_membership_in_log(0).await?; + + assert!(mem.is_none()); + } + + tracing::info!("--- membership presents in log, smaller than last_applied, read from log"); + { + store + .append_to_log(&[&Entry { + log_id: (1, 1).into(), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})), + }]) + .await?; + + let mem = store.last_membership_in_log(0).await?; + let mem = mem.unwrap(); + assert_eq!(Membership::new_single(btreeset! {1, 2, 3}), mem.membership,); + + let mem = store.last_membership_in_log(1).await?; + let mem = mem.unwrap(); + assert_eq!(Membership::new_single(btreeset! {1, 2, 3}), mem.membership,); + + let mem = store.last_membership_in_log(2).await?; + assert!(mem.is_none()); + } + + tracing::info!("--- membership presents in log and > sm.last_applied, read from log"); + { + store + .append_to_log(&[ + &Entry { + log_id: LogId { term: 1, index: 3 }, + payload: EntryPayload::Membership(Membership::new_single(btreeset! {7,8,9})), + }, + &Entry { + log_id: LogId { term: 1, index: 4 }, + payload: EntryPayload::Blank, + }, + ]) + .await?; + + let mem = store.last_membership_in_log(0).await?; + let mem = mem.unwrap(); + + assert_eq!(Membership::new_single(btreeset! {7,8,9},), mem.membership,); + } + + tracing::info!("--- membership presents in log and > sm.last_applied, read from log but since_index is greater than the last"); + { + let mem = store.last_membership_in_log(4).await?; + assert!(mem.is_none()); + } + + Ok(()) + } + + pub async fn get_membership_initial(builder: &B) -> anyhow::Result<()> { + let store = builder.build(NODE_ID).await; + + let membership = store.get_membership().await?; + + assert!(membership.is_none()); + + Ok(()) + } + + pub async fn get_membership_from_log_and_sm(builder: &B) -> anyhow::Result<()> { let store = builder.build(NODE_ID).await; tracing::info!("--- no log, read membership from state machine"); @@ -137,7 +224,8 @@ where ]) .await?; - let mem = store.get_membership_config().await?; + let mem = store.get_membership().await?; + let mem = mem.unwrap(); assert_eq!(Membership::new_single(btreeset! {3,4,5}), mem.membership,); } @@ -151,7 +239,9 @@ where }]) .await?; - let mem = store.get_membership_config().await?; + let mem = store.get_membership().await?; + + let mem = mem.unwrap(); assert_eq!(Membership::new_single(btreeset! {3, 4, 5}), mem.membership,); } @@ -161,13 +251,15 @@ where store .append_to_log(&[&Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {7,8,9})), }]) .await?; - let mem = store.get_membership_config().await?; + let mem = store.get_membership().await?; + + let mem = mem.unwrap(); - assert_eq!(Membership::new_single(btreeset! {1,2,3},), mem.membership,); + assert_eq!(Membership::new_single(btreeset! {7,8,9},), mem.membership,); } Ok(()) @@ -933,7 +1025,7 @@ where ]) .await?; - let res = store.get_membership_config().await; + let res = store.get_membership().await; let e = res.unwrap_err().into_defensive().unwrap(); assert!(matches!(e, DefensiveError { diff --git a/openraft/src/core/admin.rs b/openraft/src/core/admin.rs index ce7c1e0fe..6f7bdf086 100644 --- a/openraft/src/core/admin.rs +++ b/openraft/src/core/admin.rs @@ -54,6 +54,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage if self.core.effective_membership.membership.all_nodes().len() == 1 { self.core.current_term += 1; self.core.voted_for = Some(self.core.id); + + // TODO(xp): it should always commit a initial log entry self.core.set_target_state(State::Leader); self.core.save_hard_state().await?; } else { diff --git a/openraft/src/core/append_entries.rs b/openraft/src/core/append_entries.rs index a7d9a2877..0942b22dd 100644 --- a/openraft/src/core/append_entries.rs +++ b/openraft/src/core/append_entries.rs @@ -109,7 +109,15 @@ impl, S: RaftStorage> Ra self.last_log_id = self.get_log_id(start - 1).await?; - let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?; + // TODO(xp): get_membership() should have a defensive check to ensure it always returns Some() if node is + // initialized. Because a node always commit a membership log as the first log entry. + let membership = self.storage.get_membership().await.map_err(|err| self.map_storage_error(err))?; + + // TODO(xp): This is a dirty patch: + // When a node starts in a single-node mode, it does not append an initial log + // but instead depends on storage.get_membership() to return a default one. + // It would be better a node always append an initial log entry. + let membership = membership.unwrap_or_else(|| EffectiveMembership::new_initial(self.id)); self.update_membership(membership)?; diff --git a/openraft/src/core/install_snapshot.rs b/openraft/src/core/install_snapshot.rs index 8c4d769dc..81e37a7f4 100644 --- a/openraft/src/core/install_snapshot.rs +++ b/openraft/src/core/install_snapshot.rs @@ -226,9 +226,13 @@ impl, S: RaftStorage> Ra } // There could be unknown membership in the snapshot. - let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?; + let membership = self.storage.get_membership().await.map_err(|err| self.map_storage_error(err))?; tracing::debug!("storage membership: {:?}", membership); + assert!(membership.is_some()); + + let membership = membership.unwrap(); + self.update_membership(membership)?; self.snapshot_last_log_id = self.last_applied; diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index 549c447b0..826d0dfd1 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -83,6 +83,15 @@ pub struct EffectiveMembership { pub membership: Membership, } +impl EffectiveMembership { + pub fn new_initial(node_id: u64) -> Self { + EffectiveMembership { + log_id: LogId::new(0, 0), + membership: Membership::new_initial(node_id), + } + } +} + impl MessageSummary for EffectiveMembership { fn summary(&self) -> String { format!("{{log_id:{} membership:{}}}", self.log_id, self.membership.summary()) @@ -1018,7 +1027,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } /// Run the learner loop. - #[tracing::instrument(level="debug", skip(self), fields(id=self.core.id, raft_state="non-voter"))] + #[tracing::instrument(level="debug", skip(self), fields(id=self.core.id, raft_state="learner"))] pub(self) async fn run(mut self) -> RaftResult<()> { self.core.report_metrics(Update::Update(None)); loop { diff --git a/openraft/src/storage.rs b/openraft/src/storage.rs index 79ca96bd3..d1919553e 100644 --- a/openraft/src/storage.rs +++ b/openraft/src/storage.rs @@ -12,6 +12,7 @@ use tokio::io::AsyncWrite; use crate::core::EffectiveMembership; use crate::raft::Entry; +use crate::raft::EntryPayload; use crate::raft::Membership; use crate::raft_types::SnapshotId; use crate::raft_types::StateMachineChanges; @@ -110,32 +111,68 @@ where /// for details on where and how this is used. type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; - /// Get the latest membership config found in the log or in state machine. - /// - /// This must always be implemented as a reverse search through the log to find the most - /// recent membership config to be appended to the log. - /// - /// If a snapshot pointer is encountered, then the membership config embedded in that snapshot - /// pointer should be used. - /// - /// If the system is pristine, then it should return the value of calling - /// `MembershipConfig::new_initial(node_id)`. It is required that the storage engine persist - /// the node's ID so that it is consistent across restarts. - /// - /// Errors returned from this method will cause Raft to go into shutdown. - async fn get_membership_config(&self) -> Result; + /// Returns the last membership config found in log or state machine. + async fn get_membership(&self) -> Result, StorageError> { + let (_, sm_mem) = self.last_applied_state().await?; + + let sm_mem_index = match &sm_mem { + None => 0, + Some(mem) => mem.log_id.index, + }; + + let log_mem = self.last_membership_in_log(sm_mem_index + 1).await?; + + if log_mem.is_some() { + return Ok(log_mem); + } + + return Ok(sm_mem); + } + + /// Get the latest membership config found in the log. + /// + /// This method should returns membership with the greatest log index which is `>=since_index`. + /// If no such membership log is found, it returns `None`, e.g., when logs are cleaned after being applied. + #[tracing::instrument(level = "trace", skip(self))] + async fn last_membership_in_log(&self, since_index: u64) -> Result, StorageError> { + let last_log_id = self.last_id_in_log().await?; + let first_log_id = self.first_id_in_log().await?; + + let first_log_id = match first_log_id { + None => { + // There is no log at all + return Ok(None); + } + Some(x) => x, + }; + + let mut end = last_log_id.index + 1; + let start = std::cmp::max(first_log_id.index, since_index); + let step = 64; + + while start < end { + let entries = self.try_get_log_entries(start..end).await?; + + for ent in entries.iter().rev() { + if let EntryPayload::Membership(ref mem) = ent.payload { + return Ok(Some(EffectiveMembership { + log_id: ent.log_id, + membership: mem.clone(), + })); + } + } + + end = end.saturating_sub(step); + } + + Ok(None) + } /// Get Raft's state information from storage. /// /// When the Raft node is first started, it will call this interface on the storage system to /// fetch the last known state from stable storage. If no such entry exists due to being the /// first time the node has come online, then `InitialState::new_initial` should be used. - /// - /// **Pro tip:** the storage impl may need to look in a few different places to accurately - /// respond to this request: the last entry in the log for `last_log_index` & `last_log_term`; - /// the node's hard state record; and the index of the last log applied to the state machine. - /// - /// Errors returned from this method will cause Raft to go into shutdown. async fn get_initial_state(&self) -> Result; /// Save Raft's hard-state. diff --git a/openraft/src/store_ext.rs b/openraft/src/store_ext.rs index 9fc7f8466..ff64cd103 100644 --- a/openraft/src/store_ext.rs +++ b/openraft/src/store_ext.rs @@ -86,9 +86,9 @@ where type SnapshotData = T::SnapshotData; #[tracing::instrument(level = "trace", skip(self))] - async fn get_membership_config(&self) -> Result { + async fn last_membership_in_log(&self, since_index: u64) -> Result, StorageError> { self.defensive_no_dirty_log().await?; - self.inner().get_membership_config().await + self.inner().last_membership_in_log(since_index).await } #[tracing::instrument(level = "trace", skip(self))] diff --git a/openraft/tests/snapshot_overrides_membership.rs b/openraft/tests/snapshot_overrides_membership.rs index c8bade5c4..e2396b059 100644 --- a/openraft/tests/snapshot_overrides_membership.rs +++ b/openraft/tests/snapshot_overrides_membership.rs @@ -95,7 +95,10 @@ async fn snapshot_overrides_membership() -> Result<()> { tracing::info!("--- check that learner membership is affected"); { - let m = sto.get_membership_config().await?; + let m = sto.get_membership().await?; + + let m = m.unwrap(); + assert_eq!(Membership::new_single(btreeset! {2,3}), m.membership); } } @@ -121,7 +124,10 @@ async fn snapshot_overrides_membership() -> Result<()> { ) .await?; - let m = sto.get_membership_config().await?; + let m = sto.get_membership().await?; + + let m = m.unwrap(); + assert_eq!( Membership::new_single(btreeset! {0}), m.membership, diff --git a/openraft/tests/snapshot_uses_prev_snap_membership.rs b/openraft/tests/snapshot_uses_prev_snap_membership.rs index 00e5635f7..3359c9ec6 100644 --- a/openraft/tests/snapshot_uses_prev_snap_membership.rs +++ b/openraft/tests/snapshot_uses_prev_snap_membership.rs @@ -84,7 +84,10 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { println!("{}", logs.as_slice().summary()); assert_eq!(2, logs.len(), "only one applied log is kept"); } - let m = sto0.get_membership_config().await?; + let m = sto0.get_membership().await?; + + let m = m.unwrap(); + assert_eq!(Membership::new_single(btreeset! {0,1}), m.membership, "membership "); // TODO(xp): this assertion fails because when change-membership, a append-entries request does not update @@ -120,7 +123,10 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { let logs = sto0.get_log_entries(..).await?; assert_eq!(2, logs.len(), "only one applied log"); } - let m = sto0.get_membership_config().await?; + let m = sto0.get_membership().await?; + + let m = m.unwrap(); + assert_eq!(Membership::new_single(btreeset! {0,1}), m.membership, "membership "); }