diff --git a/async-raft/src/core/admin.rs b/async-raft/src/core/admin.rs index caf71b5e0..b0fd7ead7 100644 --- a/async-raft/src/core/admin.rs +++ b/async-raft/src/core/admin.rs @@ -18,6 +18,7 @@ use crate::replication::RaftEvent; use crate::AppData; use crate::AppDataResponse; use crate::LogId; +use crate::MessageSummary; use crate::NodeId; use crate::RaftError; use crate::RaftNetwork; @@ -92,7 +93,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } - #[tracing::instrument(level = "trace", skip(self, tx))] + #[tracing::instrument(level = "debug", skip(self, tx))] pub(super) async fn change_membership(&mut self, members: BTreeSet, wait: bool, tx: ResponseTx) { // Ensure cluster will have at least one node. if members.is_empty() { @@ -177,7 +178,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } - #[tracing::instrument(level = "trace", skip(self, resp_tx), fields(id=self.core.id))] + #[tracing::instrument(level = "debug", skip(self, resp_tx), fields(id=self.core.id))] pub async fn append_membership_log( &mut self, mem: MembershipConfig, @@ -243,11 +244,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage .nodes .iter_mut() .filter(|(id, _)| !membership.contains(id)) - .filter_map(|(idx, replstate)| { - if replstate.matched.index >= index { + .filter_map(|(idx, repl_state)| { + if repl_state.matched.index >= index { Some(*idx) } else { - replstate.remove_after_commit = Some(index); + repl_state.remove_after_commit = Some(index); None } }) @@ -261,10 +262,17 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage for target in nodes_to_remove { tracing::debug!(target, "removing target node from replication pool"); // TODO(xp): just drop the replication then the task will be terminated. - if let Some(node) = self.nodes.remove(&target) { - let _ = node.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH"))); + let removed = self.nodes.remove(&target); + assert!(removed.is_some()); - // remove metrics entry + tracing::info!( + "handle_uniform_consensus_committed: removed replication node: {} {:?}", + target, + removed.as_ref().map(|x| (*x).summary()) + ); + + if let Some(node) = removed { + let _ = node.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH"))); self.leader_metrics.replication.remove(&target); } } diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index d5e6259c5..bbb0e898e 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -22,7 +22,7 @@ impl, S: RaftStorage> Ra /// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2). /// /// See `receiver implementation: AppendEntries RPC` in raft-essentials.md in this repo. - #[tracing::instrument(level="trace", skip(self, msg), fields(msg=%msg.summary()))] + #[tracing::instrument(level="debug", skip(self, msg), fields(msg=%msg.summary()))] pub(super) async fn handle_append_entries_request( &mut self, msg: AppendEntriesRequest, @@ -30,11 +30,6 @@ impl, S: RaftStorage> Ra tracing::debug!(%self.last_log_id, %self.last_applied); let msg_entries = msg.entries.as_slice(); - let prev_log_id = msg.prev_log_id; - - if !msg_entries.is_empty() { - assert_eq!(prev_log_id.index + 1, msg_entries.first().unwrap().log_id.index); - } // If message's term is less than most recent term, then we do not honor the request. if msg.term < self.current_term { @@ -63,7 +58,8 @@ impl, S: RaftStorage> Ra // - R1 accepted this append_entries request but was not aware of that entry {2,3} is inconsistent to leader. // Then it will update commit_index to 3 and apply {2,3} - let valid_commit_index = msg_entries.last().map(|x| x.log_id.index).unwrap_or(prev_log_id.index); + // TODO(xp): cleanup commit index at sender side. + let valid_commit_index = msg_entries.last().map(|x| x.log_id).unwrap_or_else(|| msg.prev_log_id).index; let valid_commit_index = std::cmp::min(msg.leader_commit, valid_commit_index); tracing::debug!("start to check and update to latest term/leader"); @@ -99,118 +95,17 @@ impl, S: RaftStorage> Ra // +----------------+------------------------+ // ` 0 ` last_applied ` last_log_id - // Case 0: prev == 0 - if prev_log_id.index == u64::MIN { - if self.last_log_id.index == prev_log_id.index { - // Matches! Great! - tracing::debug!("append-entries case-0 OK: prev_log_id({}) == 0", prev_log_id,); - return self.append_apply_log_entries(prev_log_id.index + 1, msg_entries, valid_commit_index).await; - } else { - tracing::debug!( - "append-entries case-0: prev_log_id({}) is 0, conflict = last_log_id: {:?}", - prev_log_id, - self.last_log_id - ); - - return Ok(AppendEntriesResponse { - term: self.current_term, - success: false, - conflict_opt: Some(ConflictOpt { - log_id: self.last_log_id, - }), - }); - } - } - - // Case 1: 0 < prev < last_applied - if prev_log_id.index < self.last_applied.index { - tracing::debug!( - "append-entries case-1: prev_log_id({}) < last_applied({}), conflict = last_log_id: {:?}", - prev_log_id, - self.last_applied, - self.last_log_id - ); - - return Ok(AppendEntriesResponse { - term: self.current_term, - success: false, - conflict_opt: Some(ConflictOpt { - log_id: self.last_log_id, - }), - }); - } - - // Case 2: prev == last_applied - if prev_log_id.index == self.last_applied.index { - // The applied entries are also committed thus always be consistent with the leader. - - assert_eq!(prev_log_id, self.last_applied); - - tracing::debug!( - "append-entries case-2: prev_log_id({}) == last_applied({})", - prev_log_id, - self.last_applied, - ); - - return self.append_apply_log_entries(prev_log_id.index + 1, msg_entries, valid_commit_index).await; - } - - // Case 3, 4: last_applied < prev <= last_log_id - if prev_log_id.index <= self.last_log_id.index { - tracing::debug!( - "append-entries case-3,4: prev_log_id({}) <= last_log_id({})", - prev_log_id, - self.last_applied, - ); - - let local = self.get_log_id(prev_log_id.index).await?; - - if prev_log_id == local { - return self.append_apply_log_entries(prev_log_id.index + 1, msg_entries, valid_commit_index).await; - } else { - if prev_log_id.index <= self.last_log_id.index { - self.delete_logs(prev_log_id.index).await?; - } - - let log_id = self.get_older_log_id(prev_log_id).await?; - - tracing::debug!( - "append-entries case-3,4: prev_log_id({}) <= last_log_id({}), conflict = {}", - prev_log_id, - self.last_applied, - log_id - ); - - return Ok(AppendEntriesResponse { - term: self.current_term, - success: false, - conflict_opt: Some(ConflictOpt { log_id }), - }); - } - } - - // Case 5: prev > last_log_id - - tracing::debug!( - "append-entries case-5 advanced last_log_id: prev_log_id({}) > last_log_id({}), conflict = last_log_id: {:?}", - prev_log_id, - self.last_log_id, - self.last_log_id - ); - - assert!(prev_log_id.index > self.last_log_id.index); - - Ok(AppendEntriesResponse { - term: self.current_term, - success: false, - conflict_opt: Some(ConflictOpt { - log_id: self.last_log_id, - }), - }) + return self.append_apply_log_entries(&msg.prev_log_id, msg_entries, valid_commit_index).await; } #[tracing::instrument(level = "debug", skip(self))] async fn delete_logs(&mut self, start: u64) -> RaftResult<()> { + // TODO(xp): add a StorageAdapter to provide auxiliary APIs. + // e.g.: + // - extract and manage membership config. + // - keep track of last_log_id, first_log_id, + // RaftStorage should only provides the least basic APIs. + self.storage.delete_logs_from(start..).await.map_err(|err| self.map_storage_error(err))?; self.last_log_id = self.get_log_id(start - 1).await?; @@ -279,43 +174,29 @@ impl, S: RaftStorage> Ra /// If log 5 is committed by R1, and log 3 is not removed, R5 in future could become a new leader and overrides log /// 5 on R3. #[tracing::instrument(level="debug", skip(self, msg_entries), fields(msg_entries=%msg_entries.summary()))] - async fn delete_inconsistent_log<'s, 'e>( - &'s mut self, - index: u64, - msg_entries: &'e [Entry], - ) -> RaftResult<&'e [Entry]> { - let end = std::cmp::min(index + msg_entries.len() as u64, self.last_log_id.index + 1); - - if index == end { - return Ok(msg_entries); + async fn delete_inconsistent_log<'s, 'e>(&'s mut self, msg_entries: &'e [Entry]) -> RaftResult<()> { + // all msg_entries are inconsistent logs + + let l = msg_entries.len(); + if l == 0 { + return Ok(()); + } + + if msg_entries[0].log_id.index > self.last_log_id.index { + return Ok(()); } tracing::debug!( - "find and delete inconsistent log entries [{}, {}), last_log_id: {}, entries: {}", - index, - end, + "delete inconsistent log entries [{}, {}), last_log_id: {}, entries: {}", + msg_entries[0].log_id, + msg_entries[l - 1].log_id, self.last_log_id, msg_entries.summary() ); - let entries = self.storage.get_log_entries(index..end).await.map_err(|err| self.map_storage_error(err))?; - - for (i, ent) in entries.iter().enumerate() { - assert_eq!(msg_entries[i].log_id.index, ent.log_id.index); + self.delete_logs(msg_entries[0].log_id.index).await?; - if ent.log_id.term != msg_entries[i].log_id.term { - tracing::debug!( - "delete inconsistent log entries from: {}-th msg.entries: {}", - i, - ent.log_id - ); - - self.delete_logs(ent.log_id.index).await?; - - return Ok(&msg_entries[i..]); - } - } - Ok(&[]) + Ok(()) } /// Walks at most 50 entries backward to get an entry as the `prev_log_id` for next append-entries request. @@ -340,18 +221,56 @@ impl, S: RaftStorage> Ra Ok(log_id) } + /// Append logs only when the first entry(prev_log_id) matches local store + /// This way we keeps the log continuity. #[tracing::instrument(level="debug", skip(self, entries), fields(entries=%entries.summary()))] async fn append_apply_log_entries( &mut self, - index: u64, + prev_log_id: &LogId, entries: &[Entry], commit_index: u64, ) -> RaftResult { + let matching = self.does_log_id_match(prev_log_id).await?; + tracing::debug!( + "check prev_log_id {} match: commit_index: {}, matches: {}", + prev_log_id, + self.commit_index, + matching, + ); + + if !matching { + // prev_log_id mismatches, the logs [prev_log_id.index, +oo) are all inconsistent and should be removed + if prev_log_id.index <= self.last_log_id.index { + tracing::debug!(%prev_log_id, "delete inconsistent log since prev_log_id"); + self.delete_logs(prev_log_id.index).await?; + } + + return Ok(AppendEntriesResponse { + term: self.current_term, + success: false, + conflict_opt: Some(ConflictOpt { log_id: *prev_log_id }), + }); + } + + // The entries left are all inconsistent log or absent + let (n_matching, entries) = self.skip_matching_entries(entries).await?; + + tracing::debug!( + commit_index = self.commit_index, + n_matching, + entries = %entries.summary(), + "skip matching entries", + ); + // Before appending, if an entry overrides an inconsistent one, the entries after it must be deleted first. - let entries = self.delete_inconsistent_log(index, entries).await?; + // Raft requires log ids are in total order by (term,index). + // Otherwise the log id with max index makes committed entry invisible in election. + self.delete_inconsistent_log(entries).await?; self.append_log_entries(entries).await?; + // commit index must not > last_log_id.index + // This is guaranteed by caller. self.commit_index = commit_index; self.replicate_to_state_machine_if_needed().await?; @@ -365,6 +284,68 @@ impl, S: RaftStorage> Ra }) } + /// Returns number of entries that match local storage by comparing log_id, + /// and the the unmatched entries. + /// + /// The entries in request that are matches local ones does not need to be append again. + /// Filter them out. + pub async fn skip_matching_entries<'s, 'e>( + &'s self, + entries: &'e [Entry], + ) -> RaftResult<(usize, &'e [Entry])> { + let l = entries.len(); + + for i in 0..l { + let log_id = entries[i].log_id; + let index = log_id.index; + + if index <= self.commit_index { + continue; + } + + // TODO(xp): this is a naive impl. Batch loading entries from storage. + let log = self.storage.try_get_log_entry(index).await.map_err(|err| RaftError::RaftStorage(err.into()))?; + + if let Some(local) = log { + if local.log_id == log_id { + continue; + } + } + + return Ok((i, &entries[i..])); + } + + Ok((l, &[])) + } + + /// Return true if local store contains the log id. + /// + /// This way to check if the entries in append-entries request is consecutive with local logs. + /// Raft only accept consecutive logs to be appended. + pub async fn does_log_id_match(&self, remote_log_id: &LogId) -> RaftResult { + let index = remote_log_id.index; + + // Committed entries are always safe and are consistent to a valid leader. + if index <= self.commit_index { + return Ok(true); + } + + let log = self.storage.try_get_log_entry(index).await.map_err(|err| RaftError::RaftStorage(err.into()))?; + tracing::debug!( + "check log id matching: local: {:?} remote: {}", + log.as_ref().map(|x| x.log_id), + remote_log_id + ); + + if let Some(local) = log { + if local.log_id == *remote_log_id { + return Ok(true); + } + } + + Ok(false) + } + /// Append the given entries to the log. /// /// Configuration changes are also detected and applied here. See `configuration changes` @@ -387,6 +368,10 @@ impl, S: RaftStorage> Ra }) .last(); + // TODO(xp): only when last_conf_change is newer than current one. + // For now it is guaranteed by `delete_logs()`, for it updates membership config when delete logs. + // and `skip_matching_entries()`, for it does not re-append existent log entries. + // This task should be done by StorageAdaptor. if let Some(conf) = last_conf_change { tracing::debug!({membership=?conf}, "applying new membership config received from leader"); self.update_membership(conf)?; diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index 52ddcf321..95e46aa2f 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -245,7 +245,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } /// Transform the given payload into an entry, assign an index and term, and append the entry to the log. - #[tracing::instrument(level = "trace", skip(self, payload))] + #[tracing::instrument(level = "debug", skip(self, payload))] pub(super) async fn append_payload_to_log(&mut self, payload: EntryPayload) -> RaftResult> { let entry = Entry { log_id: LogId { @@ -255,6 +255,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage payload, }; self.core.storage.append_to_log(&[&entry]).await.map_err(|err| self.core.map_storage_error(err))?; + + tracing::debug!("append log: {}", entry.summary()); self.core.last_log_id.index = entry.log_id.index; Ok(entry) @@ -265,7 +267,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// NOTE WELL: this routine does not wait for the request to actually finish replication, it /// merely beings the process. Once the request is committed to the cluster, its response will /// be generated asynchronously. - #[tracing::instrument(level = "trace", skip(self, req), fields(req=%req.summary()))] + #[tracing::instrument(level = "debug", skip(self, req), fields(req=%req.summary()))] pub(super) async fn replicate_client_request(&mut self, req: ClientRequestEntry) { // Replicate the request if there are other cluster members. The client response will be // returned elsewhere after the entry has been committed to the cluster. diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index e51282236..d9fe206ab 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -219,6 +219,9 @@ impl, S: RaftStorage> Ra // snapshot is installed self.last_applied = last_applied; + if self.commit_index < self.last_applied.index { + self.commit_index = self.last_applied.index; + } if self.last_log_id < self.last_applied { self.last_log_id = self.last_applied; } diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index 8923428e5..b09a4b7b5 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -191,7 +191,7 @@ impl, S: RaftStorage> Ra tx_metrics, rx_shutdown, }; - tokio::spawn(this.main().instrument(tracing::debug_span!("spawn"))) + tokio::spawn(this.main()) } /// The main loop of the Raft protocol. @@ -273,7 +273,7 @@ impl, S: RaftStorage> Ra } /// Report a metrics payload on the current state of the Raft node. - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument(level = "debug", skip(self))] fn report_metrics(&mut self, leader_metrics: Update>) { let leader_metrics = match leader_metrics { Update::Update(v) => v.cloned(), @@ -546,8 +546,13 @@ where S: RaftStorage, { // TODO(xp): periodically batch delete - let x = last_applied.index.saturating_sub(max_keep); - sto.delete_logs_from(..=x).await + let x = last_applied.index + 1; + let x = x.saturating_sub(max_keep); + if x > 0 { + sto.delete_logs_from(..x).await + } else { + Ok(()) + } } /// An enum describing the way the current leader property is to be updated. @@ -758,7 +763,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } /// Report metrics with leader specific states. - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument(level = "debug", skip(self))] pub fn leader_report_metrics(&mut self) { self.core.report_metrics(Update::Update(Some(&self.leader_metrics))); } @@ -774,6 +779,15 @@ struct ReplicationState { pub tx: Option, } +impl MessageSummary for ReplicationState { + fn summary(&self) -> String { + format!( + "matched: {}, remove_after_commit: {:?}", + self.matched, self.remove_after_commit + ) + } +} + impl ReplicationState where D: AppData { diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 3b2a01c25..ee3d750dc 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -116,15 +116,28 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } } else { - panic!("repliation is removed: {}", target); - // no such node - // return Ok(()); + // TODO(xp): this should be get rid of. + // handle_update_mactched_and_rate() and send_append_entries() runs async-ly. + // There is chance another update-matched event is sent just before the replication node is + // removed. + // It is not a bug. + // panic!("replication is removed: {}", target); + + return Ok(()); } // TODO(xp): use Vec<_> to replace the two membership configs. // Drop replication stream if needed. if needs_removal { - if let Some(node) = self.nodes.remove(&target) { + let removed = self.nodes.remove(&target); + tracing::info!( + "handle_update_matched_and_rate: removed replication node: {} {:?}", + target, + removed.as_ref().map(|x| x.summary()) + ); + + if let Some(node) = removed { + // TODO(xp): do not need to send, just close. let _ = node.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH"))); self.leader_metrics.replication.remove(&target); } diff --git a/async-raft/src/error.rs b/async-raft/src/error.rs index 3d0210661..f1ff8738a 100644 --- a/async-raft/src/error.rs +++ b/async-raft/src/error.rs @@ -2,12 +2,14 @@ use std::collections::BTreeSet; use std::fmt; +use std::time::Duration; use crate::raft::MembershipConfig; use crate::raft_types::SnapshotSegmentId; use crate::AppData; use crate::LogId; use crate::NodeId; +use crate::StorageError; /// A result type where the error variant is always a `RaftError`. pub type RaftResult = std::result::Result; @@ -36,6 +38,55 @@ pub enum RaftError { ShuttingDown, } +/// Error variants related to the Replication. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +#[allow(clippy::large_enum_variant)] +pub enum ReplicationError { + #[error("seen a higher term: {higher} GT mine: {mine}")] + HigherTerm { higher: u64, mine: u64 }, + + #[error("Replication is closed")] + Closed, + + #[error("{0}")] + LackEntry(#[from] LackEntry), + + #[error("leader committed index {committed_index} advances target log index {target_index} too many")] + CommittedAdvanceTooMany { committed_index: u64, target_index: u64 }, + + // TODO(xp): two sub type: StorageError / TransportError + // TODO(xp): a sub error for just send_append_entries() + #[error("{0}")] + StorageError(#[from] StorageError), + + #[error(transparent)] + IO { + #[backtrace] + #[from] + source: std::io::Error, + }, + + #[error("timeout after {timeout:?} to replicate {id}->{target}")] + Timeout { + id: NodeId, + target: NodeId, + timeout: Duration, + }, + + #[error(transparent)] + Network { + #[backtrace] + source: anyhow::Error, + }, +} + +#[derive(Debug, thiserror::Error)] +#[error("store has no log at: {index}")] +pub struct LackEntry { + pub index: u64, +} + impl From for RaftError { fn from(src: tokio::io::Error) -> Self { RaftError::RaftStorage(src.into()) diff --git a/async-raft/src/lib.rs b/async-raft/src/lib.rs index ce8accf40..442f12d1c 100644 --- a/async-raft/src/lib.rs +++ b/async-raft/src/lib.rs @@ -29,6 +29,7 @@ pub use crate::error::ClientWriteError; pub use crate::error::ConfigError; pub use crate::error::InitializeError; pub use crate::error::RaftError; +pub use crate::error::ReplicationError; pub use crate::metrics::RaftMetrics; pub use crate::network::RaftNetwork; pub use crate::raft::Raft; diff --git a/async-raft/src/metrics.rs b/async-raft/src/metrics.rs index 3e8e091fc..a1443696f 100644 --- a/async-raft/src/metrics.rs +++ b/async-raft/src/metrics.rs @@ -15,6 +15,7 @@ use serde::Serialize; use thiserror::Error; use tokio::sync::watch; use tokio::time::Duration; +use tokio::time::Instant; use crate::core::ActiveMembership; use crate::core::State; @@ -98,6 +99,8 @@ impl Wait { #[tracing::instrument(level = "debug", skip(self, func), fields(msg=msg.to_string().as_str()))] pub async fn metrics(&self, func: T, msg: impl ToString) -> Result where T: Fn(&RaftMetrics) -> bool + Send { + let timeout_at = Instant::now() + self.timeout; + let mut rx = self.rx.clone(); loop { let latest = rx.borrow().clone(); @@ -109,7 +112,17 @@ impl Wait { return Ok(latest); } - let delay = tokio::time::sleep(self.timeout); + let now = Instant::now(); + if now >= timeout_at { + return Err(WaitError::Timeout( + self.timeout, + format!("{} latest: {:?}", msg.to_string(), latest), + )); + } + + let sleep_time = timeout_at - now; + tracing::debug!(?sleep_time, "wait timeout"); + let delay = tokio::time::sleep(sleep_time); tokio::select! { _ = delay => { diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index c0620aa5a..f96ad3246 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -87,7 +87,7 @@ impl, S: RaftStorage> Ra /// ### `storage` /// An implementation of the `RaftStorage` trait which will be used by Raft for data storage. /// See the docs on the `RaftStorage` trait for more details. - #[tracing::instrument(level="trace", skip(config, network, storage), fields(cluster=%config.cluster_name))] + #[tracing::instrument(level="debug", skip(config, network, storage), fields(cluster=%config.cluster_name))] pub fn new(id: NodeId, config: Arc, network: Arc, storage: Arc) -> Self { let (tx_api, rx_api) = mpsc::unbounded_channel(); let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id)); @@ -529,10 +529,10 @@ pub(crate) enum RaftMsg { pub struct AppendEntriesRequest { /// The leader's current term. pub term: u64, + /// The leader's ID. Useful in redirecting clients. pub leader_id: u64, - /// The log entry immediately preceding the new entries. pub prev_log_id: LogId, /// The new log entries to store. @@ -541,6 +541,7 @@ pub struct AppendEntriesRequest { /// are batched for efficiency. #[serde(bound = "D: AppData")] pub entries: Vec>, + /// The leader's commit index. pub leader_commit: u64, } diff --git a/async-raft/src/raft_types.rs b/async-raft/src/raft_types.rs index ad483461d..eea09b351 100644 --- a/async-raft/src/raft_types.rs +++ b/async-raft/src/raft_types.rs @@ -24,6 +24,12 @@ impl Display for LogId { } } +impl LogId { + pub fn new(term: u64, index: u64) -> Self { + LogId { term, index } + } +} + // Everytime a snapshot is created, it is assigned with a globally unique id. pub type SnapshotId = String; diff --git a/async-raft/src/replication/mod.rs b/async-raft/src/replication/mod.rs index 236dc067a..449175995 100644 --- a/async-raft/src/replication/mod.rs +++ b/async-raft/src/replication/mod.rs @@ -21,7 +21,7 @@ use tracing::Span; use crate::config::Config; use crate::config::SnapshotPolicy; -use crate::error::RaftResult; +use crate::error::LackEntry; use crate::raft::AppendEntriesRequest; use crate::raft::Entry; use crate::raft::InstallSnapshotRequest; @@ -31,9 +31,9 @@ use crate::AppDataResponse; use crate::LogId; use crate::MessageSummary; use crate::NodeId; -use crate::RaftError; use crate::RaftNetwork; use crate::RaftStorage; +use crate::ReplicationError; #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ReplicationMetrics { @@ -110,9 +110,10 @@ struct ReplicationCore, S: Raf ////////////////////////////////////////////////////////////////////////// // Dynamic Fields //////////////////////////////////////////////////////// /// The target state of this replication stream. - target_state: TargetReplState, + target_repl_state: TargetReplState, /// The index of the log entry to most recently be appended to the log by the leader. + /// TODO(xp): remove this last_log_index: u64, /// The index of the highest log entry which is known to be committed in the cluster. commit_index: u64, @@ -126,6 +127,9 @@ struct ReplicationCore, S: Raf /// replication proceeds. matched: LogId, + // The last possible matching entry on a follower. + max_possible_matched_index: u64, + /// The heartbeat interval for ensuring that heartbeats are always delivered in a timely fashion. heartbeat: Interval, @@ -159,10 +163,11 @@ impl, S: RaftStorage> Re storage, config, marker_r: std::marker::PhantomData, - target_state: TargetReplState::LineRate, + target_repl_state: TargetReplState::LineRate, last_log_index: last_log.index, commit_index, matched: LogId { term: 0, index: 0 }, + max_possible_matched_index: last_log.index, raft_core_tx, repl_rx, heartbeat: interval(heartbeat_timeout), @@ -179,16 +184,61 @@ impl, S: RaftStorage> Re #[tracing::instrument(level="trace", skip(self), fields(id=self.id, target=self.target, cluster=%self.config.cluster_name))] async fn main(mut self) { - // Perform an initial heartbeat. - self.send_append_entries().await; - - // Proceed to the replication stream's inner loop. loop { - match &self.target_state { + // If it returns Ok(), always go back to LineRate state. + let res = match &self.target_repl_state { TargetReplState::LineRate => self.line_rate_loop().await, TargetReplState::Snapshotting => self.replicate_snapshot().await, TargetReplState::Shutdown => return, - } + }; + + let err = match res { + Ok(_) => { + self.set_target_repl_state(TargetReplState::LineRate); + continue; + } + Err(err) => err, + }; + + tracing::warn!(error=%err, "error replication to target={}", self.target); + + match err { + ReplicationError::Closed => { + self.set_target_repl_state(TargetReplState::Shutdown); + } + ReplicationError::HigherTerm { higher, mine: _ } => { + let _ = self.raft_core_tx.send(( + ReplicaEvent::RevertToFollower { + target: self.target, + term: higher, + }, + tracing::debug_span!("CH"), + )); + return; + } + ReplicationError::IO { .. } => { + tracing::error!(error=%err, "error replication to target={}", self.target); + // TODO(xp): tell core to quit? + return; + } + ReplicationError::LackEntry(_) => { + self.set_target_repl_state(TargetReplState::Snapshotting); + } + ReplicationError::CommittedAdvanceTooMany { .. } => { + self.set_target_repl_state(TargetReplState::Snapshotting); + } + ReplicationError::StorageError(_err) => { + self.set_target_repl_state(TargetReplState::Shutdown); + let _ = self.raft_core_tx.send((ReplicaEvent::Shutdown, tracing::debug_span!("CH"))); + return; + } + ReplicationError::Timeout { .. } => { + // nothing to do + } + ReplicationError::Network { .. } => { + // nothing to do + } + }; } } @@ -196,171 +246,167 @@ impl, S: RaftStorage> Re /// /// This request will timeout if no response is received within the /// configured heartbeat interval. - #[tracing::instrument(level = "trace", skip(self))] - async fn send_append_entries(&mut self) { - let start = self.matched.index + 1; - let end = self.last_log_index + 1; + #[tracing::instrument(level = "debug", skip(self))] + async fn send_append_entries(&mut self) -> Result<(), ReplicationError> { + // find the mid position aligning to 8 + let diff = self.max_possible_matched_index - self.matched.index; + let mut prev_index = self.matched.index + diff / 16 * 8; - let chunk_size = std::cmp::min(self.config.max_payload_entries, end - start); - let end = start + chunk_size; + // TODO(xp): make this part a job of StorageAdaptor. + let (prev_log_id, logs) = loop { + // It is last_applied_id or the id of the first present log. + let first_log_id = self.storage.first_known_log_id().await?; - let logs; - if chunk_size == 0 { - // just a heartbeat - logs = vec![]; - } else { - let res = self.load_log_entries(start, end).await; + self.check_consecutive(first_log_id.index)?; - let logs_opt = match res { - Ok(x) => x, - Err(err) => { - tracing::warn!(error=%err, "storage.get_log_entries, [{}, {})", start,start+chunk_size as u64); - self.set_target_state(TargetReplState::Shutdown); - let _ = self.raft_core_tx.send((ReplicaEvent::Shutdown, tracing::debug_span!("CH"))); - return; + if prev_index < first_log_id.index { + prev_index = first_log_id.index; + } + + let start = prev_index + 1; + let end = std::cmp::min(start + self.config.max_payload_entries, self.last_log_index + 1); + + tracing::debug!( + "load entries: matched: {}, send_prev_log_index: {} first_log: {} prev_index: {}, end: {}", + self.matched, + self.max_possible_matched_index, + first_log_id, + prev_index, + end, + ); + + assert!(end - prev_index > 0); + + let prev_log_id = if prev_index == first_log_id.index { + first_log_id + } else { + let first = self.storage.try_get_log_entry(prev_index).await?; + match first { + Some(f) => f.log_id, + None => { + tracing::info!("can not load first entry: at {}, retry loading logs", prev_index); + continue; + } } }; - logs = match logs_opt { - // state changed to snapshot - None => return, - Some(x) => x, + let logs = if start == end { + vec![] + } else { + let logs = self.storage.try_get_log_entries(start..end).await?; + if !logs.is_empty() && logs[0].log_id.index > prev_log_id.index + 1 { + // There is still chance the first log is removed. + // log entry is just deleted after fetching first_log_id. + // Without consecutive logs, we have to retry loading. + continue; + } + + logs }; - } - let last_log_id = logs.last().map(|last| last.log_id); + break (prev_log_id, logs); + }; + + let last_log_id = if logs.is_empty() { + prev_log_id + } else { + logs[logs.len() - 1].log_id + }; // Build the heartbeat frame to be sent to the follower. let payload = AppendEntriesRequest { term: self.term, leader_id: self.id, - prev_log_id: self.matched, + prev_log_id, leader_commit: self.commit_index, entries: logs, }; // Send the payload. tracing::debug!( + payload=%payload.summary(), "start sending append_entries, timeout: {:?}", self.config.heartbeat_interval ); - let res = timeout( - Duration::from_millis(self.config.heartbeat_interval), - self.network.send_append_entries(self.target, payload), - ) - .await; + let the_timeout = Duration::from_millis(self.config.heartbeat_interval); + let res = timeout(the_timeout, self.network.send_append_entries(self.target, payload)).await; - let res = match res { - Ok(outer_res) => match outer_res { + let append_resp = match res { + Ok(append_res) => match append_res { Ok(res) => res, Err(err) => { tracing::warn!(error=%err, "error sending AppendEntries RPC to target"); - return; + return Err(ReplicationError::Network { source: err }); } }, - Err(err) => { - tracing::warn!(error=%err, "timeout while sending AppendEntries RPC to target"); - return; + Err(timeout_err) => { + tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target"); + return Err(ReplicationError::Timeout { + id: self.id, + target: self.target, + timeout: the_timeout, + }); } }; - tracing::debug!("append_entries last: {:?}", last_log_id); + tracing::debug!(%last_log_id, "append_entries resp: {:?}", append_resp); // Handle success conditions. - if res.success { - tracing::debug!("append_entries success: last: {:?}", last_log_id); - // If this was a proper replication event (last index & term were provided), then update state. - if let Some(log_id) = last_log_id { - self.matched = log_id; - self.update_matched(); - } - return; + if append_resp.success { + self.matched = last_log_id; + // TODO(xp): if matched does not change, do not bother the core. + self.update_max_possible_matched_index(last_log_id.index); + self.update_matched(); + + return Ok(()); } // Failed // Replication was not successful, if a newer term has been returned, revert to follower. - if res.term > self.term { - tracing::debug!({ res.term }, "append entries failed, reverting to follower"); - let _ = self.raft_core_tx.send(( - ReplicaEvent::RevertToFollower { - target: self.target, - term: res.term, - }, - tracing::debug_span!("CH"), - )); - self.set_target_state(TargetReplState::Shutdown); - return; + if append_resp.term > self.term { + tracing::debug!({ append_resp.term }, "append entries failed, reverting to follower"); + + return Err(ReplicationError::HigherTerm { + higher: append_resp.term, + mine: self.term, + }); } // Replication was not successful, handle conflict optimization record, else decrement `next_index`. - let mut conflict = match res.conflict_opt { - None => { - panic!("append_entries failed but without a reason: {:?}", res); - } - Some(x) => x, - }; + let conflict = append_resp.conflict_opt.unwrap(); - tracing::debug!(?conflict, res.term, "append entries failed, handling conflict opt"); + tracing::debug!( + ?conflict, + append_resp.term, + "append entries failed, handling conflict opt" + ); - // If conflict index is 0, we will not be able to fetch that index from storage because - // it will never exist. So instead, we just return, and accept the conflict data. - if conflict.log_id.index == 0 { - self.matched = LogId { term: 0, index: 0 }; - self.update_matched(); + assert_eq!( + conflict.log_id, prev_log_id, + "if conflict, it is always the prev_log_id" + ); - return; - } + // Continue to find the matching log id on follower. + self.max_possible_matched_index = conflict.log_id.index - 1; + Ok(()) + } - // The follower has more log and set the conflict.log_id to its last_log_id if: - // - req.prev_log_id.index is 0 - // - req.prev_log_id.index is applied, in which case the follower does not know if the prev_log_id is valid. - // - // In such case, fake a conflict log_id that never matches the log term in local store, in order to not - // update_matched(). - if conflict.log_id.index > self.last_log_index { - conflict.log_id = LogId { - term: 0, - index: self.last_log_index, - }; + /// max_possible_matched_index is the least index for `prev_log_id` to form a consecutive log sequence + fn check_consecutive(&self, first_log_index: u64) -> Result<(), ReplicationError> { + if first_log_index > self.max_possible_matched_index { + return Err(ReplicationError::LackEntry(LackEntry { + index: self.max_possible_matched_index, + })); } - // Fetch the entry at conflict index and use the term specified there. - let ent = self.storage.try_get_log_entry(conflict.log_id.index).await; - let ent = match ent { - Ok(x) => x, - Err(err) => { - tracing::error!(error=?err, "error fetching log entry due to returned AppendEntries RPC conflict_opt"); - self.set_target_state(TargetReplState::Shutdown); - let _ = self.raft_core_tx.send((ReplicaEvent::Shutdown, tracing::debug_span!("CH"))); - return; - } - }; - - let ent = match ent { - Some(x) => x, - None => { - // This condition would only ever be reached if the log has been removed due to - // log compaction (barring critical storage failure), so transition to snapshotting. - self.set_target_state(TargetReplState::Snapshotting); - return; - } - }; - - let term = ent.log_id.term; - - // Next time try sending from the log at conflict.log_id.index. - self.matched = ent.log_id; - - if term == conflict.log_id.term { - self.update_matched(); - } + Ok(()) } #[tracing::instrument(level = "debug", skip(self))] - fn set_target_state(&mut self, state: TargetReplState) { - self.target_state = state; + fn set_target_repl_state(&mut self, state: TargetReplState) { + self.target_repl_state = state; } #[tracing::instrument(level = "debug", skip(self))] @@ -391,47 +437,55 @@ impl, S: RaftStorage> Re } } - /// Fully drain the channel coming in from the Raft node. - pub(self) fn drain_raft_rx(&mut self, first: RaftEvent, span: Span) { - let mut event_opt = Some((first, span)); - let mut iters = 0; - loop { - // Just ensure we don't get stuck draining a REALLY hot replication feed. - if iters > self.config.max_payload_entries { - return; - } + #[tracing::instrument(level = "debug", skip(self))] + pub async fn try_drain_raft_rx(&mut self) -> Result<(), ReplicationError> { + for _i in 0..self.config.max_payload_entries { + let ev = self.repl_rx.recv().now_or_never(); + let ev = match ev { + None => { + // no event in self.repl_rx + return Ok(()); + } + Some(x) => x, + }; - // Unpack the event opt, else return if we don't have one to process. - let (event, span) = match event_opt.take() { - Some(event) => event, - None => return, + let ev_and_span = match ev { + None => { + // channel is closed, Leader quited. + return Err(ReplicationError::Closed); + } + Some(x) => x, }; - let _ent = span.enter(); + // TODO(xp): the span is not used. remove it from event. + self.process_raft_event(ev_and_span.0)?; + } - // Process the event. - match event { - RaftEvent::UpdateCommitIndex { commit_index } => { - self.commit_index = commit_index; - } + Ok(()) + } - RaftEvent::Replicate { entry, commit_index } => { - self.commit_index = commit_index; - self.last_log_index = entry.log_id.index; - } + #[tracing::instrument(level = "debug", skip(self), fields(event=%event.summary()))] + pub fn process_raft_event(&mut self, event: RaftEvent) -> Result<(), ReplicationError> { + match event { + RaftEvent::UpdateCommitIndex { commit_index } => { + self.commit_index = commit_index; + } - RaftEvent::Terminate => { - self.set_target_state(TargetReplState::Shutdown); - return; - } + RaftEvent::Replicate { entry, commit_index } => { + // TODO(xp): Message Replicate does not need to send an entry. + self.commit_index = commit_index; + self.last_log_index = entry.log_id.index; } - // Attempt to unpack the next event for the next loop iteration. - if let Some(event_span) = self.repl_rx.recv().now_or_never() { - event_opt = event_span; + RaftEvent::Terminate => { + tracing::debug!("received: RaftEvent::Terminate"); + // TODO(xp): just close the channel to shut replication down. + self.set_target_repl_state(TargetReplState::Shutdown); + return Err(ReplicationError::Closed); } - iters += 1; } + + Ok(()) } } @@ -448,6 +502,7 @@ enum TargetReplState { Shutdown, } +// TODO(xp): remove Replicate /// An event from the Raft node. pub(crate) enum RaftEvent { Replicate { @@ -467,6 +522,20 @@ pub(crate) enum RaftEvent { Terminate, } +impl MessageSummary for RaftEvent { + fn summary(&self) -> String { + match self { + RaftEvent::Replicate { entry: _, commit_index } => { + format!("Replicate: commit_index: {}", commit_index) + } + RaftEvent::UpdateCommitIndex { commit_index } => { + format!("UpdateCommitIndex: commit_index: {}", commit_index) + } + RaftEvent::Terminate => "Terminate".to_string(), + } + } +} + /// An event coming from a replication stream. pub(crate) enum ReplicaEvent where S: AsyncRead + AsyncSeek + Send + Unpin + 'static @@ -517,26 +586,46 @@ impl MessageSummary for Repli } impl, S: RaftStorage> ReplicationCore { - #[tracing::instrument(level = "trace", skip(self), fields(state = "line-rate"))] - pub async fn line_rate_loop(&mut self) { + #[tracing::instrument(level = "debug", skip(self), fields(state = "line-rate"))] + pub async fn line_rate_loop(&mut self) -> Result<(), ReplicationError> { loop { - if self.target_state != TargetReplState::LineRate { - return; - } + loop { + tracing::debug!( + "current matched: {} send_prev_log_index: {}", + self.matched, + self.max_possible_matched_index + ); - if self.needs_snapshot() { - self.set_target_state(TargetReplState::Snapshotting); - return; - } + let res = self.send_append_entries().await; - if self.matched.index < self.last_log_index { - self.send_append_entries().await; + if let Err(err) = res { + tracing::error!(error=%err, "error replication to target={}", self.target); - if self.target_state != TargetReplState::LineRate { - return; + // For transport error, just keep retrying. + match err { + ReplicationError::Timeout { .. } => { + break; + } + ReplicationError::Network { .. } => { + break; + } + _ => { + return Err(err); + } + } } - continue; + if self.matched.index == self.max_possible_matched_index { + break; + } + } + + if self.needs_snapshot() { + return Err(ReplicationError::CommittedAdvanceTooMany { + // TODO(xp) fill them + committed_index: 0, + target_index: 0, + }); } let span = tracing::debug_span!("CHrx:LineRate"); @@ -544,14 +633,19 @@ impl, S: RaftStorage> Re tokio::select! { _ = self.heartbeat.tick() => { - self.send_append_entries().await; + tracing::debug!("heartbeat triggered"); + // continue } event_span = self.repl_rx.recv() => { match event_span { - Some((event, span)) => self.drain_raft_rx(event, span), + Some((event, _span)) => { + self.process_raft_event(event)?; + self.try_drain_raft_rx().await?; + }, None => { - self.set_target_state(TargetReplState::Shutdown); + tracing::debug!("received: RaftEvent::Terminate: closed"); + return Err(ReplicationError::Closed); }, } } @@ -559,63 +653,20 @@ impl, S: RaftStorage> Re } } - /// Ensure there are no gaps in the outbound buffer due to transition from lagging. - #[tracing::instrument(level = "debug", skip(self))] - async fn load_log_entries(&mut self, start: u64, stop: u64) -> Result>>, RaftError> { - // TODO(xp): get_log_entries() should return an error that is programmable readable. - // EntryNotFound - let entries = match self.storage.get_log_entries(start..stop).await { - Ok(entries) => entries, - Err(err) => { - // TODO non-EntryNotFound error should not shutdown raft core. - tracing::info!(error=%err, "loading log entries, switch to snapshot replication"); - self.set_target_state(TargetReplState::Snapshotting); - return Ok(None); - } - }; - - tracing::debug!(entries=%entries.as_slice().summary(), "load_log_entries"); - - let first = entries.first().map(|x| x.log_id.index); - if first != Some(start) { - tracing::info!( - "entry {} to replicate not found, first: {:?}, switch to snapshot replication", - start, - first - ); - self.set_target_state(TargetReplState::Snapshotting); - return Ok(None); - } - - Ok(Some(entries)) - } - - #[tracing::instrument(level = "trace", skip(self), fields(state = "snapshotting"))] - pub async fn replicate_snapshot(&mut self) { - let res = self.wait_for_snapshot().await; - - // TODO(xp): bad impl: res is error only when replication is closed. - // use specific error to describe these behavior. - - let snapshot = match res { - Ok(x) => x, - Err(e) => { - tracing::error!(error=%e, "replication shutting down"); - return; - } - }; + #[tracing::instrument(level = "debug", skip(self), fields(state = "snapshotting"))] + pub async fn replicate_snapshot(&mut self) -> Result<(), ReplicationError> { + let snapshot = self.wait_for_snapshot().await?; + self.stream_snapshot(snapshot).await?; - if let Err(err) = self.stream_snapshot(snapshot).await { - tracing::warn!(error=%err, "error streaming snapshot to target"); - } + Ok(()) } /// Wait for a response from the storage layer for the current snapshot. /// /// If an error comes up during processing, this routine should simple be called again after /// issuing a new request to the storage layer. - #[tracing::instrument(level = "trace", skip(self))] - async fn wait_for_snapshot(&mut self) -> Result, RaftError> { + #[tracing::instrument(level = "debug", skip(self))] + async fn wait_for_snapshot(&mut self) -> Result, ReplicationError> { // Ask raft core for a snapshot. // - If raft core has a ready snapshot, it sends back through tx. // - Otherwise raft core starts a new task taking snapshot, and **close** `tx` when finished. Thus there has to @@ -625,7 +676,8 @@ impl, S: RaftStorage> Re // channel to communicate with raft-core let (tx, mut rx) = oneshot::channel(); - // TODO(xp): handle sending error. + // TODO(xp): handle sending error. If channel is closed, quite replication by returning + // ReplicationError::Closed. let _ = self.raft_core_tx.send(( ReplicaEvent::NeedsSnapshot { target: self.target, @@ -636,19 +688,43 @@ impl, S: RaftStorage> Re let mut waiting_for_snapshot = true; + // TODO(xp): use a watch channel to let the core to send one of the 3 event: + // heartbeat, new-log, or snapshot is ready. while waiting_for_snapshot { tokio::select! { - _ = self.heartbeat.tick() => self.send_append_entries().await, + _ = self.heartbeat.tick() => { + // TODO(xp): just heartbeat: + let res = self.send_append_entries().await; + match res { + Ok(_) => { + // + }, + Err(err) => { + match err { + ReplicationError::StorageError(_) => { + return Err(err); + }, + ReplicationError::IO {..} => { + return Err(err); + } + _=> { + // nothing to do + } + } + } + } + }, event_span = self.repl_rx.recv() => { match event_span { - Some((event, span)) => self.drain_raft_rx(event, span), + Some((event, _span)) => { + self.process_raft_event(event)?; + self.try_drain_raft_rx().await? + }, None => { - tracing::error!("repl_rx is closed"); - self.set_target_state(TargetReplState::Shutdown); - // TODO: make it two errors: ReplicationShutdown and CoreShutdown - return Err(RaftError::ShuttingDown); + tracing::info!("repl_rx is closed"); + return Err(ReplicationError::Closed); } } }, @@ -660,6 +736,7 @@ impl, S: RaftStorage> Re } Err(_) => { // TODO(xp): This channel is closed to notify an in progress snapshotting is completed. + // Start a new round to get the snapshot. tracing::info!("rx for waiting for snapshot is closed, may be snapshot is ready. re-send need-snapshot."); waiting_for_snapshot = false; @@ -672,7 +749,7 @@ impl, S: RaftStorage> Re } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn stream_snapshot(&mut self, mut snapshot: Snapshot) -> RaftResult<()> { + async fn stream_snapshot(&mut self, mut snapshot: Snapshot) -> Result<(), ReplicationError> { let end = snapshot.snapshot.seek(SeekFrom::End(0)).await?; let mut offset = 0; @@ -726,31 +803,28 @@ impl, S: RaftStorage> Re // Handle response conditions. if res.term > self.term { - let _ = self.raft_core_tx.send(( - ReplicaEvent::RevertToFollower { - target: self.target, - term: res.term, - }, - tracing::debug_span!("CH"), - )); - self.set_target_state(TargetReplState::Shutdown); - return Ok(()); + return Err(ReplicationError::HigherTerm { + higher: res.term, + mine: self.term, + }); } // If we just sent the final chunk of the snapshot, then transition to lagging state. if done { - self.set_target_state(TargetReplState::LineRate); - tracing::debug!( "done install snapshot: snapshot last_log_id: {}, matched: {}", snapshot.meta.last_log_id, self.matched, ); + // TODO(xp): combine update_matched() and update_max_possible_matched_index()? if snapshot.meta.last_log_id > self.matched { self.matched = snapshot.meta.last_log_id; self.update_matched(); } + + self.update_max_possible_matched_index(snapshot.meta.last_log_id.index); + return Ok(()); } @@ -758,9 +832,14 @@ impl, S: RaftStorage> Re offset += n_read as u64; // Check raft channel to ensure we are staying up-to-date, then loop. - if let Some(Some((event, span))) = self.repl_rx.recv().now_or_never() { - self.drain_raft_rx(event, span); - } + self.try_drain_raft_rx().await?; + } + } + + #[tracing::instrument(level = "debug", skip(self))] + fn update_max_possible_matched_index(&mut self, i: u64) { + if self.max_possible_matched_index < i { + self.max_possible_matched_index = i; } } } diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index e07eb6352..93f60fca0 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -105,11 +105,13 @@ where D: AppData, R: AppDataResponse, { + // TODO(xp): simplify storage API + /// The storage engine's associated type used for exposing a snapshot for reading & writing. /// /// See the [storage chapter of the guide](https://async-raft.github.io/async-raft/storage.html) /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static; + type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; /// Set if to turn on defensive check to unexpected input. /// E.g. discontinuous log appending. @@ -161,10 +163,25 @@ where range: RNG, ) -> Result>, StorageError>; + /// Get a series of log entries from storage. + /// + /// Entry not found is allowed + async fn try_get_log_entries + Clone + Debug + Send + Sync>( + &self, + range: RNG, + ) -> Result>, StorageError>; + /// Try to get an log entry. /// It does not return an error if in defensive mode and the log entry at `log_index` is not found. async fn try_get_log_entry(&self, log_index: u64) -> Result>, StorageError>; + /// Returns the first log id in log. + /// + /// The impl should not consider the applied log id in state machine. + async fn first_id_in_log(&self) -> Result, StorageError>; + + async fn first_known_log_id(&self) -> Result; + /// Returns the last log id in log. /// /// The impl should not consider the applied log id in state machine. diff --git a/async-raft/src/storage_error.rs b/async-raft/src/storage_error.rs index 7f46dc2a2..d16e515d6 100644 --- a/async-raft/src/storage_error.rs +++ b/async-raft/src/storage_error.rs @@ -96,9 +96,13 @@ pub enum Violation { #[error("range is not half-open: start: {start:?}, end: {end:?}")] RangeNotHalfOpen { start: Bound, end: Bound }, + // TODO(xp): rename this to some input related error name. #[error("empty log vector")] LogsEmpty, + #[error("all logs are removed. It requires at least one log to track continuity")] + StoreLogsEmpty, + #[error("logs are not consecutive, prev: {prev}, next: {next}")] LogsNonConsecutive { prev: LogId, next: LogId }, diff --git a/async-raft/tests/append_conflicts.rs b/async-raft/tests/append_conflicts.rs index 74a37bd12..54c87af52 100644 --- a/async-raft/tests/append_conflicts.rs +++ b/async-raft/tests/append_conflicts.rs @@ -4,8 +4,6 @@ use anyhow::Result; use async_raft::raft::AppendEntriesRequest; use async_raft::raft::ConflictOpt; use async_raft::raft::Entry; -use async_raft::raft::EntryPayload; -use async_raft::AppData; use async_raft::Config; use async_raft::LogId; use async_raft::MessageSummary; @@ -16,6 +14,8 @@ use maplit::btreeset; use memstore::ClientRequest; use memstore::MemStore; +use crate::fixtures::ent; + #[macro_use] mod fixtures; @@ -42,14 +42,30 @@ async fn append_conflicts() -> Result<()> { router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; let (r0, sto0) = router.remove_node(0).await.unwrap(); - check_logs(&sto0, vec![]).await?; + check_logs(&sto0, vec![0]).await?; + + tracing::info!("--- case 0: prev_log_id.index == 0, no logs"); + + let req = AppendEntriesRequest { + term: 1, + leader_id: 0, + prev_log_id: LogId::new(0, 0), + entries: vec![], + leader_commit: 2, + }; + + let resp = r0.append_entries(req.clone()).await?; + assert!(resp.success); + assert_eq!(None, resp.conflict_opt); + + check_logs(&sto0, vec![0]).await?; tracing::info!("--- case 0: prev_log_id.index == 0, "); let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 1, index: 0 }, + prev_log_id: LogId::new(0, 0), entries: vec![ent(1, 1), ent(1, 2), ent(1, 3), ent(1, 4)], // this set the last_applied to 2 leader_commit: 2, @@ -59,43 +75,39 @@ async fn append_conflicts() -> Result<()> { assert!(resp.success); assert_eq!(None, resp.conflict_opt); + check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?; + tracing::info!("--- case 0: prev_log_id.index == 0, last_log_id mismatch"); let resp = r0.append_entries(req.clone()).await?; - assert!(!resp.success); - assert_eq!( - Some(ConflictOpt { - log_id: LogId { term: 1, index: 4 } - }), - resp.conflict_opt - ); + assert!(resp.success); + assert_eq!(None, resp.conflict_opt); + + check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?; - // last_applied is 2 - tracing::info!("--- case 1: 0 < prev_log_id.index < last_applied.index"); + // committed index is 2 + tracing::info!("--- case 1: 0 < prev_log_id.index < commit_index"); let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 1, index: 1 }, + prev_log_id: LogId::new(1, 1), entries: vec![ent(1, 2)], leader_commit: 2, }; let resp = r0.append_entries(req).await?; - assert!(!resp.success); - assert_eq!( - Some(ConflictOpt { - log_id: LogId { term: 1, index: 4 } - }), - resp.conflict_opt - ); + assert!(resp.success); + assert_eq!(None, resp.conflict_opt); + + check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?; tracing::info!("--- case 2: prev_log_id.index == last_applied, inconsistent log should be removed"); let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 1, index: 2 }, + prev_log_id: LogId::new(1, 2), entries: vec![ent(2, 3)], // this set the last_applied to 2 leader_commit: 2, @@ -105,32 +117,34 @@ async fn append_conflicts() -> Result<()> { assert!(resp.success); assert_eq!(None, resp.conflict_opt); - check_logs(&sto0, vec![1, 1, 2]).await?; + check_logs(&sto0, vec![0, 1, 1, 2]).await?; // check last_log_id is updated: let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 1, index: 2000 }, + prev_log_id: LogId::new(1, 2000), entries: vec![], leader_commit: 2, }; let resp = r0.append_entries(req).await?; + assert!(!resp.success); assert_eq!( Some(ConflictOpt { - log_id: LogId { term: 2, index: 3 } + log_id: LogId { term: 1, index: 2000 } }), resp.conflict_opt ); + check_logs(&sto0, vec![0, 1, 1, 2]).await?; + tracing::info!("--- case 3,4: prev_log_id.index <= last_log_id, prev_log_id mismatch, inconsistent log is removed"); - // store: 1 1 2 let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 3, index: 3 }, + prev_log_id: LogId::new(3, 3), entries: vec![], leader_commit: 2, }; @@ -140,19 +154,19 @@ async fn append_conflicts() -> Result<()> { // returns the id just before prev_log_id.index assert_eq!( Some(ConflictOpt { - log_id: LogId { term: 1, index: 2 } + log_id: LogId { term: 3, index: 3 } }), resp.conflict_opt ); - check_logs(&sto0, vec![1, 1]).await?; + check_logs(&sto0, vec![0, 1, 1]).await?; tracing::info!("--- case 3,4: prev_log_id.index <= last_log_id, prev_log_id matches, inconsistent log is removed"); // refill logs let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 1, index: 2 }, + prev_log_id: LogId::new(1, 2), entries: vec![ent(2, 3), ent(2, 4), ent(2, 5)], leader_commit: 2, }; @@ -162,13 +176,13 @@ async fn append_conflicts() -> Result<()> { assert_eq!(None, resp.conflict_opt); // check prepared store - check_logs(&sto0, vec![1, 1, 2, 2, 2]).await?; + check_logs(&sto0, vec![0, 1, 1, 2, 2, 2]).await?; // prev_log_id matches let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 2, index: 3 }, + prev_log_id: LogId::new(2, 3), entries: vec![ent(3, 4)], leader_commit: 2, }; @@ -177,7 +191,7 @@ async fn append_conflicts() -> Result<()> { assert!(resp.success); assert_eq!(None, resp.conflict_opt); - check_logs(&sto0, vec![1, 1, 2, 3]).await?; + check_logs(&sto0, vec![0, 1, 1, 2, 3]).await?; tracing::info!("--- case 5: last_log_id.index < prev_log_id.index"); @@ -185,7 +199,7 @@ async fn append_conflicts() -> Result<()> { let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 1, index: 200 }, + prev_log_id: LogId::new(1, 200), entries: vec![], leader_commit: 2, }; @@ -194,7 +208,7 @@ async fn append_conflicts() -> Result<()> { assert!(!resp.success); assert_eq!( Some(ConflictOpt { - log_id: LogId { term: 3, index: 4 } + log_id: LogId { term: 1, index: 200 } }), resp.conflict_opt ); @@ -202,19 +216,11 @@ async fn append_conflicts() -> Result<()> { Ok(()) } -/// Create a blonk log entry for test. -fn ent(term: u64, index: u64) -> Entry { - Entry { - log_id: LogId { term, index }, - payload: EntryPayload::Blank, - } -} - /// To check if logs is as expected. async fn check_logs(sto: &Arc, terms: Vec) -> Result<()> { let logs = sto.get_log_entries(..).await?; let want: Vec> = - terms.iter().enumerate().map(|(i, term)| ent(*term, (i + 1) as u64)).collect::>(); + terms.iter().enumerate().map(|(i, term)| ent(*term, (i) as u64)).collect::>(); assert_eq!(want.as_slice().summary(), logs.as_slice().summary()); diff --git a/async-raft/tests/append_updates_membership.rs b/async-raft/tests/append_updates_membership.rs index e0bb167d8..1e17d7b51 100644 --- a/async-raft/tests/append_updates_membership.rs +++ b/async-raft/tests/append_updates_membership.rs @@ -17,7 +17,7 @@ use maplit::btreeset; #[macro_use] mod fixtures; -/// append-entries should update membership correctlly when adding new logs and deleting +/// append-entries should update membership correctly when adding new logs and deleting /// inconsistent logs. /// /// - bring up a non-voter and send to it append_entries request. Check the membership updated. @@ -48,7 +48,7 @@ async fn append_updates_membership() -> Result<()> { let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 1, index: 0 }, + prev_log_id: LogId::new(0, 0), entries: vec![ ent(1, 1), Entry { @@ -87,7 +87,7 @@ async fn append_updates_membership() -> Result<()> { let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: LogId { term: 1, index: 2 }, + prev_log_id: LogId::new(1, 2), entries: vec![ent(2, 3)], leader_commit: 0, }; diff --git a/async-raft/tests/clean_applied_logs.rs b/async-raft/tests/clean_applied_logs.rs index 0642aed8a..df2cd9ddf 100644 --- a/async-raft/tests/clean_applied_logs.rs +++ b/async-raft/tests/clean_applied_logs.rs @@ -6,13 +6,14 @@ use async_raft::Config; use async_raft::RaftStorage; use fixtures::RaftRouter; use maplit::btreeset; +use tokio::time::sleep; #[macro_use] mod fixtures; /// Logs should be deleted by raft after applying them, on leader and non-voter. /// -/// - assert logs are deleted on leader aftre applying them. +/// - assert logs are deleted on leader after applying them. /// - assert logs are deleted on replication target after installing a snapshot. /// /// RUST_LOG=async_raft,memstore,clean_applied_logs=trace cargo test -p async-raft --test clean_applied_logs @@ -33,7 +34,14 @@ async fn clean_applied_logs() -> Result<()> { let mut n_logs = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?; - router.client_request_many(0, "0", (10 - n_logs) as usize).await; + let count = (10 - n_logs) as usize; + for idx in 0..count { + router.client_request(0, "0", idx as u64).await; + // raft commit at once with a single leader cluster. + // If we send too fast, logs are removed before forwarding to non-voter. + // Then it triggers snapshot replication, which is not expected. + sleep(Duration::from_millis(50)).await; + } n_logs = 10; router.wait_for_log(&btreeset! {0,1}, n_logs, timeout(), "write upto 10 logs").await?; diff --git a/async-raft/tests/compaction.rs b/async-raft/tests/compaction.rs index 131195e95..301e58927 100644 --- a/async-raft/tests/compaction.rs +++ b/async-raft/tests/compaction.rs @@ -1,11 +1,13 @@ use std::sync::Arc; use anyhow::Result; +use async_raft::raft::AppendEntriesRequest; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; use async_raft::raft::MembershipConfig; use async_raft::Config; use async_raft::LogId; +use async_raft::RaftNetwork; use async_raft::RaftStorage; use async_raft::SnapshotPolicy; use async_raft::State; @@ -122,5 +124,23 @@ async fn compaction() -> Result<()> { ) .await?; + tracing::info!( + "--- send a heartbeat with prev_log_id to be some value <= last_applied to ensure the commit index is updated" + ); + { + let res = router + .send_append_entries(1, AppendEntriesRequest { + term: 1, + leader_id: 0, + prev_log_id: LogId::new(1, 2), + entries: vec![], + leader_commit: 0, + }) + .await?; + + assert!(res.success); + assert_eq!(None, res.conflict_opt); + } + Ok(()) } diff --git a/async-raft/tests/conflict_with_empty_entries.rs b/async-raft/tests/conflict_with_empty_entries.rs index 3e7cd053f..1e9452ab2 100644 --- a/async-raft/tests/conflict_with_empty_entries.rs +++ b/async-raft/tests/conflict_with_empty_entries.rs @@ -52,7 +52,7 @@ async fn conflict_with_empty_entries() -> Result<()> { let rpc = AppendEntriesRequest:: { term: 1, leader_id: 1, - prev_log_id: LogId { term: 1, index: 5 }, + prev_log_id: LogId::new(1, 5), entries: vec![], leader_commit: 5, }; @@ -63,7 +63,7 @@ async fn conflict_with_empty_entries() -> Result<()> { let c = resp.conflict_opt.unwrap(); assert_eq!( ConflictOpt { - log_id: LogId { term: 0, index: 0 } + log_id: LogId { term: 1, index: 5 } }, c ); @@ -73,7 +73,7 @@ async fn conflict_with_empty_entries() -> Result<()> { let rpc = AppendEntriesRequest:: { term: 1, leader_id: 1, - prev_log_id: LogId { term: 1, index: 0 }, + prev_log_id: LogId::new(0, 0), entries: vec![ Entry { log_id: (1, 1).into(), @@ -102,7 +102,7 @@ async fn conflict_with_empty_entries() -> Result<()> { let rpc = AppendEntriesRequest:: { term: 1, leader_id: 1, - prev_log_id: LogId { term: 1, index: 3 }, + prev_log_id: LogId::new(1, 3), entries: vec![], leader_commit: 5, }; @@ -113,7 +113,7 @@ async fn conflict_with_empty_entries() -> Result<()> { let c = resp.conflict_opt.unwrap(); assert_eq!( ConflictOpt { - log_id: LogId { term: 1, index: 2 } + log_id: LogId { term: 1, index: 3 } }, c ); diff --git a/async-raft/tests/fixtures/mod.rs b/async-raft/tests/fixtures/mod.rs index db7a4fa88..baded9564 100644 --- a/async-raft/tests/fixtures/mod.rs +++ b/async-raft/tests/fixtures/mod.rs @@ -21,6 +21,8 @@ use async_raft::metrics::Wait; use async_raft::raft::AppendEntriesRequest; use async_raft::raft::AppendEntriesResponse; use async_raft::raft::ClientWriteRequest; +use async_raft::raft::Entry; +use async_raft::raft::EntryPayload; use async_raft::raft::InstallSnapshotRequest; use async_raft::raft::InstallSnapshotResponse; use async_raft::raft::MembershipConfig; @@ -28,6 +30,7 @@ use async_raft::raft::RaftResponse; use async_raft::raft::VoteRequest; use async_raft::raft::VoteResponse; use async_raft::storage::RaftStorage; +use async_raft::AppData; use async_raft::Config; use async_raft::LogId; use async_raft::NodeId; @@ -155,6 +158,7 @@ impl RaftRouter { /// Create a cluster: 0 is the initial leader, others are voters non_voters /// NOTE: it create a single node cluster first, then change it to a multi-voter cluster. + #[tracing::instrument(level = "debug", skip(self))] pub async fn new_nodes_from_single( self: &Arc, node_ids: BTreeSet, @@ -168,8 +172,8 @@ impl RaftRouter { tracing::info!("--- wait for init node to ready"); - self.wait_for_log(&btreeset![0], want, None, "empty").await?; - self.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; + self.wait_for_log(&btreeset![0], want, timeout(), "empty").await?; + self.wait_for_state(&btreeset![0], State::NonVoter, timeout(), "empty").await?; tracing::info!("--- initializing single node cluster: {}", 0); @@ -178,7 +182,7 @@ impl RaftRouter { tracing::info!("--- wait for init node to become leader"); - self.wait_for_log(&btreeset![0], want, None, "init").await?; + self.wait_for_log(&btreeset![0], want, timeout(), "init").await?; self.assert_stable_cluster(Some(1), Some(want)).await; for id in node_ids.iter() { @@ -197,7 +201,7 @@ impl RaftRouter { self.change_membership(0, node_ids.clone()).await?; want += 2; - self.wait_for_log(&node_ids, want, None, &format!("cluster of {:?}", node_ids)).await?; + self.wait_for_log(&node_ids, want, timeout(), &format!("cluster of {:?}", node_ids)).await?; } for id in non_voters { @@ -218,7 +222,7 @@ impl RaftRouter { pub async fn new_store(self: &Arc, id: u64) -> Arc { let defensive = env::var("RAFT_STORE_DEFENSIVE").ok(); - let sto = Arc::new(MemStore::new(id)); + let sto = Arc::new(MemStore::new(id).await); if let Some(d) = defensive { tracing::info!("RAFT_STORE_DEFENSIVE set store defensive to {}", d); @@ -241,6 +245,7 @@ impl RaftRouter { sto } + #[tracing::instrument(level = "debug", skip(self, sto))] pub async fn new_raft_node_with_sto(self: &Arc, id: NodeId, sto: Arc) { let node = Raft::new(id, self.config.clone(), self.clone(), sto.clone()); let mut rt = self.routing_table.write().await; @@ -760,3 +765,15 @@ impl From> for ValueTest { Self::Range(src) } } + +fn timeout() -> Option { + Some(Duration::from_millis(5000)) +} + +/// Create a blank log entry for test. +pub fn ent(term: u64, index: u64) -> Entry { + Entry { + log_id: LogId { term, index }, + payload: EntryPayload::Blank, + } +} diff --git a/async-raft/tests/members.rs b/async-raft/tests/members.rs index 857d372a7..4122cd186 100644 --- a/async-raft/tests/members.rs +++ b/async-raft/tests/members.rs @@ -115,7 +115,9 @@ async fn members_add_absent_non_voter_blocking() -> Result<()> { for node_id in 0..2 { let sto = router.get_storage_handle(&node_id).await?; let logs = sto.get_log_entries(..).await?; - assert_eq!(n_logs, logs.len() as u64); + assert_eq!(n_logs, logs[logs.len() - 1].log_id.index); + // 0-th log + assert_eq!(n_logs + 1, logs.len() as u64); } } diff --git a/async-raft/tests/non_voter_add.rs b/async-raft/tests/non_voter_add.rs index 4964d4785..d61f43d16 100644 --- a/async-raft/tests/non_voter_add.rs +++ b/async-raft/tests/non_voter_add.rs @@ -26,6 +26,7 @@ async fn non_voter_add_readd() -> Result<()> { let config = Arc::new( Config { replication_lag_threshold: 0, + max_applied_log_to_keep: 2000, // prevent snapshot ..Default::default() } .validate()?, @@ -56,7 +57,12 @@ async fn non_voter_add_readd() -> Result<()> { tracing::info!("--- add_non_voter blocks until the replication catches up"); let sto1 = router.get_storage_handle(&1).await?; - assert_eq!(n_logs, sto1.get_log_entries(..).await?.len() as u64); + + let logs = sto1.get_log_entries(..).await?; + + assert_eq!(n_logs, logs[logs.len() - 1].log_id.index); + // 0-th log + assert_eq!(n_logs + 1, logs.len() as u64); router.wait_for_log(&btreeset! {0,1}, n_logs, timeout(), "replication to non_voter").await?; } diff --git a/async-raft/tests/snapshot_overrides_membership.rs b/async-raft/tests/snapshot_overrides_membership.rs index 9205b6e81..f29905fb3 100644 --- a/async-raft/tests/snapshot_overrides_membership.rs +++ b/async-raft/tests/snapshot_overrides_membership.rs @@ -97,7 +97,7 @@ async fn snapshot_overrides_membership() -> Result<()> { let req = AppendEntriesRequest { term: 1, leader_id: 0, - prev_log_id: Default::default(), + prev_log_id: LogId::new(0, 0), entries: vec![Entry { log_id: LogId { term: 1, index: 1 }, payload: EntryPayload::ConfigChange(EntryConfigChange { diff --git a/async-raft/tests/snapshot_uses_prev_snap_membership.rs b/async-raft/tests/snapshot_uses_prev_snap_membership.rs index dca0ed085..469d97640 100644 --- a/async-raft/tests/snapshot_uses_prev_snap_membership.rs +++ b/async-raft/tests/snapshot_uses_prev_snap_membership.rs @@ -16,6 +16,9 @@ use maplit::btreeset; mod fixtures; /// Test a second compaction should not lose membership. +/// To ensure the bug is fixed: +/// - Snapshot stores membership when compaction. +/// - But compaction does not extract membership config from Snapshot entry, only from MembershipConfig entry. /// /// What does this test do? /// @@ -36,7 +39,9 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), - max_applied_log_to_keep: 1, + // Use 2, with 1 it triggers a compaction when replicating ent-1, + // because ent-0 is removed. + max_applied_log_to_keep: 2, ..Default::default() } .validate()?, @@ -79,7 +84,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { { let logs = sto0.get_log_entries(..).await?; println!("{}", logs.as_slice().summary()); - assert_eq!(1, logs.len(), "only one applied log is kept"); + assert_eq!(2, logs.len(), "only one applied log is kept"); } let m = sto0.get_membership_config().await?; assert_eq!( @@ -122,7 +127,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { { { let logs = sto0.get_log_entries(..).await?; - assert_eq!(1, logs.len(), "only one applied log"); + assert_eq!(2, logs.len(), "only one applied log"); } let m = sto0.get_membership_config().await?; assert_eq!( diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index eb92e14f6..6f67d029d 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -108,11 +108,20 @@ pub struct MemStore { impl MemStore { /// Create a new `MemStore` instance. - pub fn new(id: NodeId) -> Self { + pub async fn new(id: NodeId) -> Self { let log = RwLock::new(BTreeMap::new()); let sm = RwLock::new(MemStoreStateMachine::default()); let hs = RwLock::new(None); let current_snapshot = RwLock::new(None); + + { + let mut l = log.write().await; + l.insert(0, Entry { + log_id: LogId::default(), + payload: EntryPayload::Blank, + }); + } + Self { defensive: RwLock::new(true), id, @@ -359,7 +368,8 @@ impl MemStore { Ok(()) } - /// Requires a range must be at least half open: (-oo, n] or [n, +oo) + /// Requires a range must be at least half open: (-oo, n] or [n, +oo); + /// In order to keep logs continuity. pub async fn defensive_half_open_range + Clone + Debug + Send>( &self, range: RNG, @@ -604,12 +614,42 @@ impl RaftStorage for MemStore { Ok(res) } + async fn try_get_log_entries + Clone + Debug + Send + Sync>( + &self, + range: RNG, + ) -> Result>, StorageError> { + self.defensive_nonempty_range(range.clone()).await?; + + let res = { + let log = self.log.read().await; + log.range(range.clone()).map(|(_, val)| val.clone()).collect::>() + }; + + Ok(res) + } + #[tracing::instrument(level = "trace", skip(self))] async fn try_get_log_entry(&self, log_index: u64) -> Result>, StorageError> { let log = self.log.read().await; Ok(log.get(&log_index).cloned()) } + async fn first_id_in_log(&self) -> Result, StorageError> { + let log = self.log.read().await; + let first = log.iter().next().map(|(_, ent)| ent.log_id); + Ok(first) + } + + async fn first_known_log_id(&self) -> Result { + let first = self.first_id_in_log().await?; + if let Some(x) = first { + return Ok(x); + } + + let (last_applied, _) = self.last_applied_state().await?; + Ok(last_applied) + } + async fn last_id_in_log(&self) -> Result { let log = self.log.read().await; let last = log.iter().last().map(|(_, ent)| ent.log_id).unwrap_or_default(); @@ -621,7 +661,7 @@ impl RaftStorage for MemStore { Ok((sm.last_applied_log, sm.last_membership.clone())) } - #[tracing::instrument(level = "trace", skip(self, range), fields(range=?range))] + #[tracing::instrument(level = "debug", skip(self, range), fields(range=?range))] async fn delete_logs_from + Clone + Debug + Send + Sync>( &self, range: R, @@ -629,11 +669,13 @@ impl RaftStorage for MemStore { self.defensive_nonempty_range(range.clone()).await?; self.defensive_half_open_range(range.clone()).await?; - let mut log = self.log.write().await; + { + let mut log = self.log.write().await; - let keys = log.range(range).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); + let keys = log.range(range).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } } Ok(()) diff --git a/memstore/src/test.rs b/memstore/src/test.rs index 2a88a63f7..e662a92b7 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -25,7 +25,7 @@ struct MemStoreBuilder {} #[async_trait] impl StoreBuilder for MemStoreBuilder { async fn new_store(&self, id: NodeId) -> MemStore { - let sto = MemStore::new(id); + let sto = MemStore::new(id).await; sto.defensive(false).await; sto } @@ -114,6 +114,9 @@ where run_fut(Suite::save_hard_state(builder))?; run_fut(Suite::get_log_entries(builder))?; run_fut(Suite::try_get_log_entry(builder))?; + run_fut(Suite::initial_logs(builder))?; + run_fut(Suite::first_known_log_id(builder))?; + run_fut(Suite::first_id_in_log(builder))?; run_fut(Suite::last_id_in_log(builder))?; run_fut(Suite::last_applied_state(builder))?; run_fut(Suite::delete_logs_from(builder))?; @@ -508,6 +511,8 @@ where let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; + store.delete_logs_from(0..=0).await?; + let ent = store.try_get_log_entry(3).await?; assert_eq!(Some(LogId { term: 1, index: 3 }), ent.map(|x| x.log_id)); @@ -520,6 +525,118 @@ where Ok(()) } + pub async fn initial_logs(builder: &B) -> anyhow::Result<()> { + let store = builder.new_store(NODE_ID).await; + + let ent = store.try_get_log_entry(0).await?.unwrap(); + assert_eq!( + LogId { term: 0, index: 0 }, + ent.log_id, + "store initialized with a log at 0" + ); + + tracing::info!("--- no logs, return None"); + { + store.delete_logs_from(..).await?; + + let ent = store.try_get_log_entry(0).await?; + assert!(ent.is_none()); + } + + Ok(()) + } + + pub async fn first_known_log_id(builder: &B) -> anyhow::Result<()> { + let store = builder.new_store(NODE_ID).await; + + let log_id = store.first_known_log_id().await?; + assert_eq!(LogId::new(0, 0), log_id, "store initialized with a log at 0"); + + tracing::info!("--- only logs"); + { + store + .append_to_log(&[ + &Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Blank, + }, + ]) + .await?; + + store.delete_logs_from(0..2).await?; + + // NOTE: it assumes non applied logs always exist. + let log_id = store.first_known_log_id().await?; + assert_eq!(LogId::new(1, 2), log_id); + } + + tracing::info!("--- return applied_log_id only when there is no log at all"); + { + store + .apply_to_state_machine(&[&Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }]) + .await?; + + // NOTE: it assumes non applied logs always exist. + let log_id = store.first_known_log_id().await?; + assert_eq!(LogId { term: 1, index: 2 }, log_id); + + // When there is no logs, return applied_log_id + store.delete_logs_from(0..3).await?; + let log_id = store.first_known_log_id().await?; + assert_eq!(LogId { term: 1, index: 1 }, log_id); + } + + Ok(()) + } + + pub async fn first_id_in_log(builder: &B) -> anyhow::Result<()> { + let store = builder.new_store(NODE_ID).await; + + let log_id = store.first_id_in_log().await?; + assert_eq!(Some(LogId::new(0, 0)), log_id, "store initialized with a log at 0"); + + tracing::info!("--- only logs"); + { + store + .append_to_log(&[ + &Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Blank, + }, + ]) + .await?; + + let log_id = store.first_id_in_log().await?; + assert_eq!(Some(LogId::new(0, 0)), log_id); + + store.delete_logs_from(0..1).await?; + + let log_id = store.first_id_in_log().await?; + assert_eq!(Some(LogId::new(1, 1)), log_id); + } + + tracing::info!("--- no logs, return default"); + { + store.delete_logs_from(..).await?; + + let log_id = store.first_id_in_log().await?; + assert_eq!(None, log_id); + } + + Ok(()) + } + pub async fn last_id_in_log(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; @@ -646,6 +763,8 @@ where let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; + store.delete_logs_from(..=0).await?; + store.delete_logs_from(1..4).await?; let logs = store.get_log_entries(0..100).await?; @@ -658,6 +777,8 @@ where let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; + store.delete_logs_from(..=0).await?; + store.delete_logs_from(1..1000).await?; let logs = store.get_log_entries(0..).await?; @@ -669,6 +790,8 @@ where let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; + store.delete_logs_from(..=0).await?; + store.delete_logs_from(1..).await?; let logs = store.get_log_entries(0..100).await?; @@ -682,6 +805,8 @@ where let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; + store.delete_logs_from(..=0).await?; + store .append_to_log(&[&Entry { log_id: (2, 10).into(), @@ -1102,6 +1227,8 @@ where let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; + store.delete_logs_from(..=0).await?; + store.get_log_entries(..).await?; store.get_log_entries(5..).await?; store.get_log_entries(..5).await?;