From 4d1a03cf98dbfb0c4b3eec772e69be1965ad7ccf Mon Sep 17 00:00:00 2001 From: drdr xp Date: Mon, 28 Jun 2021 10:26:32 +0800 Subject: [PATCH] refactor: rename vars to underscore style; Use simpler tracing macro syntax, to reduce false syntax error in clion. Refine rustfmt config, use longer line: 120 chars; --- async-raft/Cargo.toml | 2 +- async-raft/src/config.rs | 29 +-- async-raft/src/core/admin.rs | 45 +--- async-raft/src/core/append_entries.rs | 24 +- async-raft/src/core/client.rs | 108 ++++----- async-raft/src/core/install_snapshot.rs | 58 ++--- async-raft/src/core/mod.rs | 105 +++------ async-raft/src/core/replication.rs | 156 ++++--------- async-raft/src/core/vote.rs | 35 +-- async-raft/src/error.rs | 7 +- async-raft/src/metrics.rs | 38 +--- async-raft/src/metrics_wait_test.rs | 5 +- async-raft/src/network.rs | 12 +- async-raft/src/raft.rs | 106 ++------- async-raft/src/replication/mod.rs | 210 ++++++++---------- async-raft/tests/add_remove_voter.rs | 16 +- async-raft/tests/client_reads.rs | 34 +-- async-raft/tests/client_writes.rs | 26 +-- async-raft/tests/compaction.rs | 34 +-- .../concurrent_write_and_add_non_voter.rs | 9 +- .../tests/conflict_with_empty_entries.rs | 9 +- async-raft/tests/current_leader.rs | 25 +-- async-raft/tests/dynamic_membership.rs | 52 +---- async-raft/tests/fixtures/mod.rs | 96 ++------ async-raft/tests/initialization.rs | 22 +- async-raft/tests/lagging_network_write.rs | 40 +--- async-raft/tests/members_012_to_234.rs | 29 +-- .../metrics_state_machine_consistency.rs | 24 +- async-raft/tests/metrics_wait.rs | 26 +-- async-raft/tests/non_voter_restart.rs | 30 +-- async-raft/tests/shutdown.rs | 37 +-- async-raft/tests/singlenode.rs | 22 +- async-raft/tests/stepdown.rs | 48 +--- memstore/src/lib.rs | 40 +--- memstore/src/test.rs | 66 +----- rustfmt.toml | 11 +- 36 files changed, 438 insertions(+), 1198 deletions(-) diff --git a/async-raft/Cargo.toml b/async-raft/Cargo.toml index 83d78d9df..7c0794048 100644 --- a/async-raft/Cargo.toml +++ b/async-raft/Cargo.toml @@ -23,7 +23,7 @@ rand = "0.8" serde = { version="1", features=["derive"] } thiserror = "1.0.20" tokio = { version="1.7", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] } -tracing = "0.1" +tracing = "0.1.26" tracing-futures = "0.2.4" [dev-dependencies] diff --git a/async-raft/src/config.rs b/async-raft/src/config.rs index 796492d31..0c0fc8ac0 100644 --- a/async-raft/src/config.rs +++ b/async-raft/src/config.rs @@ -201,35 +201,23 @@ impl ConfigBuilder { /// Validate the state of this builder and produce a new `Config` instance if valid. pub fn validate(self) -> Result { // Roll a random election time out based on the configured min & max or their respective defaults. - let election_timeout_min = self - .election_timeout_min - .unwrap_or(DEFAULT_ELECTION_TIMEOUT_MIN); - let election_timeout_max = self - .election_timeout_max - .unwrap_or(DEFAULT_ELECTION_TIMEOUT_MAX); + let election_timeout_min = self.election_timeout_min.unwrap_or(DEFAULT_ELECTION_TIMEOUT_MIN); + let election_timeout_max = self.election_timeout_max.unwrap_or(DEFAULT_ELECTION_TIMEOUT_MAX); if election_timeout_min >= election_timeout_max { return Err(ConfigError::InvalidElectionTimeoutMinMax); } // Get other values or their defaults. - let heartbeat_interval = self - .heartbeat_interval - .unwrap_or(DEFAULT_HEARTBEAT_INTERVAL); + let heartbeat_interval = self.heartbeat_interval.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL); if election_timeout_min <= heartbeat_interval { return Err(ConfigError::InvalidElectionTimeoutMinMax); } - let max_payload_entries = self - .max_payload_entries - .unwrap_or(DEFAULT_MAX_PAYLOAD_ENTRIES); + let max_payload_entries = self.max_payload_entries.unwrap_or(DEFAULT_MAX_PAYLOAD_ENTRIES); if max_payload_entries == 0 { return Err(ConfigError::MaxPayloadEntriesTooSmall); } - let replication_lag_threshold = self - .replication_lag_threshold - .unwrap_or(DEFAULT_REPLICATION_LAG_THRESHOLD); + let replication_lag_threshold = self.replication_lag_threshold.unwrap_or(DEFAULT_REPLICATION_LAG_THRESHOLD); let snapshot_policy = self.snapshot_policy.unwrap_or_else(SnapshotPolicy::default); - let snapshot_max_chunk_size = self - .snapshot_max_chunk_size - .unwrap_or(DEFAULT_SNAPSHOT_CHUNKSIZE); + let snapshot_max_chunk_size = self.snapshot_max_chunk_size.unwrap_or(DEFAULT_SNAPSHOT_CHUNKSIZE); Ok(Config { cluster_name: self.cluster_name, election_timeout_min, @@ -287,10 +275,7 @@ mod tests { #[test] fn test_invalid_election_timeout_config_produces_expected_error() { - let res = Config::build("cluster0".into()) - .election_timeout_min(1000) - .election_timeout_max(700) - .validate(); + let res = Config::build("cluster0".into()).election_timeout_min(1000).election_timeout_max(700).validate(); assert!(res.is_err()); let err = res.unwrap_err(); assert_eq!(err, ConfigError::InvalidElectionTimeoutMinMax); diff --git a/async-raft/src/core/admin.rs b/async-raft/src/core/admin.rs index 8e5b3ea29..e19285c73 100644 --- a/async-raft/src/core/admin.rs +++ b/async-raft/src/core/admin.rs @@ -24,9 +24,7 @@ use crate::NodeId; use crate::RaftNetwork; use crate::RaftStorage; -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - NonVoterState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> NonVoterState<'a, D, R, N, S> { /// Handle the admin `init_with_config` command. #[tracing::instrument(level = "trace", skip(self))] pub(super) async fn handle_init_with_config( @@ -66,17 +64,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - LeaderState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> LeaderState<'a, D, R, N, S> { /// Add a new node to the cluster as a non-voter, bringing it up-to-speed, and then responding /// on the given channel. #[tracing::instrument(level = "trace", skip(self, tx))] - pub(super) fn add_member( - &mut self, - target: NodeId, - tx: oneshot::Sender>, - ) { + pub(super) fn add_member(&mut self, target: NodeId, tx: oneshot::Sender>) { // Ensure the node doesn't already exist in the current config, in the set of new nodes // alreading being synced, or in the nodes being removed. if self.core.membership.members.contains(&target) @@ -105,11 +97,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } #[tracing::instrument(level = "trace", skip(self, tx))] - pub(super) async fn change_membership( - &mut self, - members: HashSet, - tx: ChangeMembershipTx, - ) { + pub(super) async fn change_membership(&mut self, members: HashSet, tx: ChangeMembershipTx) { // Ensure cluster will have at least one node. if members.is_empty() { let _ = tx.send(Err(ChangeConfigError::InoperableConfig)); @@ -155,11 +143,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // If there are new nodes which need to sync, then we need to wait until they are synced. // Once they've finished, this routine will be called again to progress further. if !awaiting.is_empty() { - self.consensus_state = ConsensusState::NonVoterSync { - awaiting, - members, - tx, - }; + self.consensus_state = ConsensusState::NonVoterSync { awaiting, members, tx }; return; } @@ -167,9 +151,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage if !members.contains(&self.core.id) { self.is_stepping_down = true; } - self.consensus_state = ConsensusState::Joint { - is_committed: false, - }; + self.consensus_state = ConsensusState::Joint { is_committed: false }; self.core.membership.members_after_consensus = Some(members); // Propagate the command as any other client request. @@ -309,16 +291,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Handle the commitment of a uniform consensus cluster configuration. #[tracing::instrument(level = "trace", skip(self))] - pub(super) async fn handle_uniform_consensus_committed( - &mut self, - index: u64, - ) -> Result<(), RaftError> { + pub(super) async fn handle_uniform_consensus_committed(&mut self, index: u64) -> Result<(), RaftError> { // Step down if needed. if self.is_stepping_down { tracing::debug!("raft node is stepping down"); self.core.set_target_state(State::NonVoter); - self.core - .update_current_leader(UpdateCurrentLeader::Unknown); + self.core.update_current_leader(UpdateCurrentLeader::Unknown); return Ok(()); } @@ -348,12 +326,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage tracing::debug!("nodes_to_remove: {:?}", nodes_to_remove); for node in nodes_to_remove { - tracing::debug!( - { target = node }, - "removing target node from replication pool" - ); + tracing::debug!({ target = node }, "removing target node from replication pool"); if let Some(node) = self.nodes.remove(&node) { - let _ = node.replstream.repltx.send(RaftEvent::Terminate); + let _ = node.replstream.repl_tx.send(RaftEvent::Terminate); } } self.core.report_metrics(); diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 51589b258..7c9f9fe2e 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -60,8 +60,8 @@ impl, S: RaftStorage> Ra // If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local // log info, then replication is g2g. let msg_prev_index_is_min = msg.prev_log_index == u64::min_value(); - let msg_index_and_term_match = (msg.prev_log_index == self.last_log_index) - && (msg.prev_log_term == self.last_log_term); + let msg_index_and_term_match = + (msg.prev_log_index == self.last_log_index) && (msg.prev_log_term == self.last_log_term); if msg_prev_index_is_min || msg_index_and_term_match { self.append_log_entries(&msg.entries).await?; self.replicate_to_state_machine_if_needed(msg.entries).await; @@ -79,7 +79,8 @@ impl, S: RaftStorage> Ra //// Begin Log Consistency Check //// tracing::trace!("begin log consistency check"); - // Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its result. + // Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its + // result. let entries = self .storage .get_log_entries(msg.prev_log_index, msg.prev_log_index + 1) @@ -113,11 +114,8 @@ impl, S: RaftStorage> Ra .delete_logs_from(target_entry.index + 1, None) .await .map_err(|err| self.map_fatal_storage_error(err))?; - let membership = self - .storage - .get_membership_config() - .await - .map_err(|err| self.map_fatal_storage_error(err))?; + let membership = + self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?; self.update_membership(membership)?; } } @@ -134,10 +132,7 @@ impl, S: RaftStorage> Ra .get_log_entries(start, msg.prev_log_index) .await .map_err(|err| self.map_fatal_storage_error(err))?; - let opt = match old_entries - .iter() - .find(|entry| entry.term == msg.prev_log_term) - { + let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log_term) { Some(entry) => Some(ConflictOpt { term: entry.term, index: entry.index, @@ -193,10 +188,7 @@ impl, S: RaftStorage> Ra }; // Replicate entries to log (same as append, but in follower mode). - self.storage - .replicate_to_log(entries) - .await - .map_err(|err| self.map_fatal_storage_error(err))?; + self.storage.replicate_to_log(entries).await.map_err(|err| self.map_fatal_storage_error(err))?; if let Some(entry) = entries.last() { self.last_log_index = entry.index; self.last_log_term = entry.term; diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index 3b98906b9..496903964 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -40,10 +40,7 @@ pub(super) struct ClientRequestEntry { impl ClientRequestEntry { /// Create a new instance from the raw components of a client request. - pub(crate) fn from_entry>>( - entry: Entry, - tx: T, - ) -> Self { + pub(crate) fn from_entry>>(entry: Entry, tx: T) -> Self { Self { entry: Arc::new(entry), tx: tx.into(), @@ -58,9 +55,7 @@ pub enum ClientOrInternalResponseTx { Internal(oneshot::Sender>), } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - LeaderState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> LeaderState<'a, D, R, N, S> { /// Commit the initial entry which new leaders are obligated to create when first coming to power, per §8. #[tracing::instrument(level = "trace", skip(self))] pub(super) async fn commit_initial_leader_entry(&mut self) -> RaftResult<()> { @@ -76,8 +71,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // we need to drive the commitment of the config change to the cluster. let mut pending_config = None; // The inner bool represents `is_in_joint_consensus`. if self.core.last_log_index > self.core.commit_index { - let (stale_logs_start, stale_logs_stop) = - (self.core.commit_index + 1, self.core.last_log_index + 1); + let (stale_logs_start, stale_logs_stop) = (self.core.commit_index + 1, self.core.last_log_index + 1); pending_config = self .core .storage @@ -89,9 +83,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage .rev() .filter_map(|entry| match &entry.payload { EntryPayload::ConfigChange(cfg) => Some(cfg.membership.is_in_joint_consensus()), - EntryPayload::SnapshotPointer(cfg) => { - Some(cfg.membership.is_in_joint_consensus()) - } + EntryPayload::SnapshotPointer(cfg) => Some(cfg.membership.is_in_joint_consensus()), _ => None, }) .next(); @@ -108,9 +100,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Setup any callbacks needed for responding to commitment of a pending config. if let Some(is_in_joint_consensus) = pending_config { if is_in_joint_consensus { - self.joint_consensus_cb.push(rx_payload_committed); // Receiver for when the joint consensus is committed. + self.joint_consensus_cb.push(rx_payload_committed); // Receiver for when the joint consensus is + // committed. } else { - self.uniform_consensus_cb.push(rx_payload_committed); // Receiver for when the uniform consensus is committed. + self.uniform_consensus_cb.push(rx_payload_committed); // Receiver for when the uniform consensus is + // committed. } } Ok(()) @@ -142,11 +136,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let mut c1_needed = 0usize; if let Some(joint_members) = &self.core.membership.members_after_consensus { let len = joint_members.len(); // Will never be zero, as we don't allow it when proposing config changes. - c1_needed = if (len % 2) == 0 { - (len / 2) - 1 - } else { - len / 2 - }; + c1_needed = if (len % 2) == 0 { (len / 2) - 1 } else { len / 2 }; } // Increment confirmations for self, including post-joint-consensus config if applicable. @@ -187,10 +177,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage match timeout(ttl, network.append_entries(target, rpc)).await { Ok(Ok(data)) => Ok((target, data)), Ok(Err(err)) => Err((target, err)), - Err(_timeout) => Err(( - target, - anyhow!("timeout waiting for leadership confirmation"), - )), + Err(_timeout) => Err((target, anyhow!("timeout waiting for leadership confirmation"))), } }) .map_err(move |err| (*id, err)); @@ -239,9 +226,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // If we've hit this location, then we've failed to gather needed confirmations due to // request failures. - let _ = tx.send(Err(ClientReadError::RaftError(RaftError::RaftNetwork( - anyhow!("too many requests failed, could not confirm leadership"), - )))); + let _ = tx.send(Err(ClientReadError::RaftError(RaftError::RaftNetwork(anyhow!( + "too many requests failed, could not confirm leadership" + ))))); } /// Handle client write requests. @@ -263,10 +250,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Transform the given payload into an entry, assign an index and term, and append the entry to the log. #[tracing::instrument(level = "trace", skip(self, payload))] - pub(super) async fn append_payload_to_log( - &mut self, - payload: EntryPayload, - ) -> RaftResult> { + pub(super) async fn append_payload_to_log(&mut self, payload: EntryPayload) -> RaftResult> { let entry = Entry { index: self.core.last_log_index + 1, term: self.core.current_term, @@ -294,7 +278,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage if !self.nodes.is_empty() { self.awaiting_committed.push(req); for node in self.nodes.values() { - let _ = node.replstream.repltx.send(RaftEvent::Replicate { + let _ = node.replstream.repl_tx.send(RaftEvent::Replicate { entry: entry_arc.clone(), commit_index: self.core.commit_index, }); @@ -309,7 +293,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Replicate to non-voters. if !self.non_voters.is_empty() { for node in self.non_voters.values() { - let _ = node.state.replstream.repltx.send(RaftEvent::Replicate { + let _ = node.state.replstream.repl_tx.send(RaftEvent::Replicate { entry: entry_arc.clone(), commit_index: self.core.commit_index, }); @@ -323,20 +307,19 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage match req.tx { ClientOrInternalResponseTx::Client(tx) => { match &req.entry.payload { - EntryPayload::Normal(inner) => match self - .apply_entry_to_state_machine(&req.entry.index, &inner.data) - .await - { - Ok(data) => { - let _ = tx.send(Ok(ClientWriteResponse { - index: req.entry.index, - data, - })); + EntryPayload::Normal(inner) => { + match self.apply_entry_to_state_machine(&req.entry.index, &inner.data).await { + Ok(data) => { + let _ = tx.send(Ok(ClientWriteResponse { + index: req.entry.index, + data, + })); + } + Err(err) => { + let _ = tx.send(Err(ClientWriteError::RaftError(err))); + } } - Err(err) => { - let _ = tx.send(Err(ClientWriteError::RaftError(err))); - } - }, + } _ => { // Why is this a bug, and why are we shutting down? This is because we can not easily // encode these constraints in the type system, and client requests should be the only @@ -360,11 +343,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Apply the given log entry to the state machine. #[tracing::instrument(level = "trace", skip(self, entry))] - pub(super) async fn apply_entry_to_state_machine( - &mut self, - index: &u64, - entry: &D, - ) -> RaftResult { + pub(super) async fn apply_entry_to_state_machine(&mut self, index: &u64, entry: &D) -> RaftResult { // First, we just ensure that we apply any outstanding up to, but not including, the index // of the given entry. We need to be able to return the data response from applying this // entry to the state machine. @@ -401,27 +380,20 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // no pending task to replicate entries to the state machine. This is edge case, and would only // happen once very early in a new leader's term. if !self.core.replicate_to_sm_handle.is_empty() { - if let Some(Ok(replicate_to_sm_result)) = self.core.replicate_to_sm_handle.next().await - { - self.core - .handle_replicate_to_sm_result(replicate_to_sm_result)?; + if let Some(Ok(replicate_to_sm_result)) = self.core.replicate_to_sm_handle.next().await { + self.core.handle_replicate_to_sm_result(replicate_to_sm_result)?; } } // Apply this entry to the state machine and return its data response. - let res = self - .core - .storage - .apply_entry_to_state_machine(index, entry) - .await - .map_err(|err| { - if err.downcast_ref::().is_some() { - // If this is an instance of the storage impl's shutdown error, then trigger shutdown. - self.core.map_fatal_storage_error(err) - } else { - // Else, we propagate normally. - RaftError::RaftStorage(err) - } - }); + let res = self.core.storage.apply_entry_to_state_machine(index, entry).await.map_err(|err| { + if err.downcast_ref::().is_some() { + // If this is an instance of the storage impl's shutdown error, then trigger shutdown. + self.core.map_fatal_storage_error(err) + } else { + // Else, we propagate normally. + RaftError::RaftStorage(err) + } + }); self.core.last_applied = *index; self.core.report_metrics(); res diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index 0b5a7a475..92171f539 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -66,33 +66,22 @@ impl, S: RaftStorage> Ra handle.abort(); // Abort the current compaction in favor of installation from leader. Ok(self.begin_installing_snapshot(req).await?) } - Some(SnapshotState::Streaming { - snapshot, - id, - offset, - }) => Ok(self - .continue_installing_snapshot(req, offset, id, snapshot) - .await?), + Some(SnapshotState::Streaming { snapshot, id, offset }) => { + Ok(self.continue_installing_snapshot(req, offset, id, snapshot).await?) + } } } #[tracing::instrument(level = "trace", skip(self, req))] - async fn begin_installing_snapshot( - &mut self, - req: InstallSnapshotRequest, - ) -> RaftResult { + async fn begin_installing_snapshot(&mut self, req: InstallSnapshotRequest) -> RaftResult { // Create a new snapshot and begin writing its contents. - let (id, mut snapshot) = self - .storage - .create_snapshot() - .await - .map_err(|err| self.map_fatal_storage_error(err))?; + let (id, mut snapshot) = + self.storage.create_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?; snapshot.as_mut().write_all(&req.data).await?; // If this was a small snapshot, and it is already done, then finish up. if req.done { - self.finalize_snapshot_installation(req, id, snapshot) - .await?; + self.finalize_snapshot_installation(req, id, snapshot).await?; return Ok(InstallSnapshotResponse { term: self.current_term, }); @@ -120,11 +109,7 @@ impl, S: RaftStorage> Ra // Always seek to the target offset if not an exact match. if req.offset != offset { if let Err(err) = snapshot.as_mut().seek(SeekFrom::Start(req.offset)).await { - self.snapshot_state = Some(SnapshotState::Streaming { - offset, - id, - snapshot, - }); + self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot }); return Err(err.into()); } offset = req.offset; @@ -132,25 +117,16 @@ impl, S: RaftStorage> Ra // Write the next segment & update offset. if let Err(err) = snapshot.as_mut().write_all(&req.data).await { - self.snapshot_state = Some(SnapshotState::Streaming { - offset, - id, - snapshot, - }); + self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot }); return Err(err.into()); } offset += req.data.len() as u64; // If the snapshot stream is done, then finalize. if req.done { - self.finalize_snapshot_installation(req, id, snapshot) - .await?; + self.finalize_snapshot_installation(req, id, snapshot).await?; } else { - self.snapshot_state = Some(SnapshotState::Streaming { - offset, - id, - snapshot, - }); + self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot }); } Ok(InstallSnapshotResponse { term: self.current_term, @@ -167,11 +143,7 @@ impl, S: RaftStorage> Ra id: String, mut snapshot: Box, ) -> RaftResult<()> { - snapshot - .as_mut() - .shutdown() - .await - .map_err(|err| self.map_fatal_storage_error(err.into()))?; + snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?; let delete_through = if self.last_log_index > req.last_included_index { Some(req.last_included_index) } else { @@ -187,11 +159,7 @@ impl, S: RaftStorage> Ra ) .await .map_err(|err| self.map_fatal_storage_error(err))?; - let membership = self - .storage - .get_membership_config() - .await - .map_err(|err| self.map_fatal_storage_error(err))?; + let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?; self.update_membership(membership)?; self.last_log_index = req.last_included_index; self.last_log_term = req.last_included_term; diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index d96320e19..251ebad5d 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -192,11 +192,7 @@ impl, S: RaftStorage> Ra #[tracing::instrument(level="trace", skip(self), fields(id=self.id, cluster=%self.config.cluster_name))] async fn main(mut self) -> RaftResult<()> { tracing::trace!("raft node is initializing"); - let state = self - .storage - .get_initial_state() - .await - .map_err(|err| self.map_fatal_storage_error(err))?; + let state = self.storage.get_initial_state().await.map_err(|err| self.map_fatal_storage_error(err))?; self.last_log_index = state.last_log_index; self.last_log_term = state.last_log_term; self.current_term = state.hard_state.current_term; @@ -209,11 +205,8 @@ impl, S: RaftStorage> Ra self.commit_index = 0; // Fetch the most recent snapshot in the system. - if let Some(snapshot) = self - .storage - .get_current_snapshot() - .await - .map_err(|err| self.map_fatal_storage_error(err))? + if let Some(snapshot) = + self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))? { self.snapshot_index = snapshot.index; } @@ -298,10 +291,7 @@ impl, S: RaftStorage> Ra current_term: self.current_term, voted_for: self.voted_for, }; - self.storage - .save_hard_state(&hs) - .await - .map_err(|err| self.map_fatal_storage_error(err)) + self.storage.save_hard_state(&hs).await.map_err(|err| self.map_fatal_storage_error(err)) } /// Update core's target state, ensuring all invariants are upheld. @@ -396,8 +386,7 @@ impl, S: RaftStorage> Ra self.membership = cfg; if !self.membership.contains(&self.id) { self.set_target_state(State::NonVoter); - } else if self.target_state == State::NonVoter && self.membership.members.contains(&self.id) - { + } else if self.target_state == State::NonVoter && self.membership.members.contains(&self.id) { // The node is a NonVoter and the new config has it configured as a normal member. // Transition to follower. self.set_target_state(State::Follower); @@ -429,11 +418,8 @@ impl, S: RaftStorage> Ra return; } // If we are below the threshold, then there is nothing to do. - let is_below_threshold = self - .last_applied - .checked_sub(self.snapshot_index) - .map(|diff| diff < *threshold) - .unwrap_or(false); + let is_below_threshold = + self.last_applied.checked_sub(self.snapshot_index).map(|diff| diff < *threshold).unwrap_or(false); if is_below_threshold { return; } @@ -453,8 +439,7 @@ impl, S: RaftStorage> Ra match res { Ok(res) => match res { Ok(snapshot) => { - let _ = tx_compaction - .try_send(SnapshotUpdate::SnapshotComplete(snapshot.index)); + let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.index)); let _ = chan_tx.send(snapshot.index); // This will always succeed. } Err(err) => { @@ -473,10 +458,7 @@ impl, S: RaftStorage> Ra /// Handle the output of an async task replicating entries to the state machine. #[tracing::instrument(level = "trace", skip(self, res))] - pub(self) fn handle_replicate_to_sm_result( - &mut self, - res: anyhow::Result>, - ) -> RaftResult<()> { + pub(self) fn handle_replicate_to_sm_result(&mut self, res: anyhow::Result>) -> RaftResult<()> { let last_applied_opt = res.map_err(|err| self.map_fatal_storage_error(err))?; if let Some(last_applied) = last_applied_opt { self.last_applied = last_applied; @@ -500,22 +482,17 @@ impl, S: RaftStorage> Ra /// Forward the given client write request to the leader. #[tracing::instrument(level = "trace", skip(self, req, tx))] - fn forward_client_write_request( - &self, - req: ClientWriteRequest, - tx: ClientWriteResponseTx, - ) { + fn forward_client_write_request(&self, req: ClientWriteRequest, tx: ClientWriteResponseTx) { match req.entry { EntryPayload::Normal(entry) => { - let _ = tx.send(Err(ClientWriteError::ForwardToLeader( - entry.data, - self.current_leader, - ))); + let _ = tx.send(Err(ClientWriteError::ForwardToLeader(entry.data, self.current_leader))); } _ => { // This is unreachable, and well controlled by the type system, but let's log an // error for good measure. - tracing::error!("unreachable branch hit within async-raft, attempting to forward a Raft internal entry"); + tracing::error!( + "unreachable branch hit within async-raft, attempting to forward a Raft internal entry" + ); } } } @@ -564,7 +541,6 @@ pub(self) enum SnapshotUpdate { SnapshotFailed, } -/////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////// /// All possible states of a Raft node. @@ -604,7 +580,6 @@ impl State { } } -/////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////// /// Volatile state specific to the Raft leader. @@ -634,15 +609,11 @@ struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: Raf pub(super) uniform_consensus_cb: FuturesOrdered>>, } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - LeaderState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> LeaderState<'a, D, R, N, S> { /// Create a new instance. pub(self) fn new(core: &'a mut RaftCore) -> Self { let consensus_state = if core.membership.is_in_joint_consensus() { - ConsensusState::Joint { - is_committed: false, - } + ConsensusState::Joint { is_committed: false } } else { ConsensusState::Uniform }; @@ -681,8 +652,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Setup state as leader. self.core.last_heartbeat = None; self.core.next_election_timeout = None; - self.core - .update_current_leader(UpdateCurrentLeader::ThisNode); + self.core.update_current_leader(UpdateCurrentLeader::ThisNode); self.core.report_metrics(); // Per §8, commit an initial entry as part of becoming the cluster leader. @@ -691,10 +661,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage loop { if !self.core.target_state.is_leader() { for node in self.nodes.values() { - let _ = node.replstream.repltx.send(RaftEvent::Terminate); + let _ = node.replstream.repl_tx.send(RaftEvent::Terminate); } for node in self.non_voters.values() { - let _ = node.state.replstream.repltx.send(RaftEvent::Terminate); + let _ = node.state.replstream.repl_tx.send(RaftEvent::Terminate); } return Ok(()); } @@ -801,7 +771,8 @@ pub enum ConsensusState { } impl ConsensusState { - /// Check the current state to determine if it is in joint consensus, and if it is safe to finalize the joint consensus. + /// Check the current state to determine if it is in joint consensus, and if it is safe to finalize the joint + /// consensus. /// /// The return value will be true if: /// 1. this object currently represents a joint consensus state. @@ -814,7 +785,6 @@ impl ConsensusState { } } -/////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////// /// Volatile state specific to a Raft node in candidate state. @@ -830,9 +800,7 @@ struct CandidateState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: votes_needed_new: u64, } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - CandidateState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> CandidateState<'a, D, R, N, S> { pub(self) fn new(core: &'a mut RaftCore) -> Self { Self { core, @@ -864,8 +832,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage self.core.update_next_election_timeout(false); // Generates a new rand value within range. self.core.current_term += 1; self.core.voted_for = Some(self.core.id); - self.core - .update_current_leader(UpdateCurrentLeader::Unknown); + self.core.update_current_leader(UpdateCurrentLeader::Unknown); self.core.save_hard_state().await?; self.core.report_metrics(); @@ -919,23 +886,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } -/////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////// /// Volatile state specific to a Raft node in follower state. -pub struct FollowerState< - 'a, - D: AppData, - R: AppDataResponse, - N: RaftNetwork, - S: RaftStorage, -> { +pub struct FollowerState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> { core: &'a mut RaftCore, } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - FollowerState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> FollowerState<'a, D, R, N, S> { pub(self) fn new(core: &'a mut RaftCore) -> Self { Self { core } } @@ -989,23 +947,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } -/////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////// /// Volatile state specific to a Raft node in non-voter state. -pub struct NonVoterState< - 'a, - D: AppData, - R: AppDataResponse, - N: RaftNetwork, - S: RaftStorage, -> { +pub struct NonVoterState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> { core: &'a mut RaftCore, } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - NonVoterState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> NonVoterState<'a, D, R, N, S> { pub(self) fn new(core: &'a mut RaftCore) -> Self { Self { core } } diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 2e28ab884..929539e81 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -20,9 +20,7 @@ use crate::NodeId; use crate::RaftNetwork; use crate::RaftStorage; -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - LeaderState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> LeaderState<'a, D, R, N, S> { /// Spawn a new replication stream returning its replication state handle. #[tracing::instrument(level = "trace", skip(self))] pub(super) fn spawn_replication_stream(&self, target: NodeId) -> ReplicationState { @@ -50,24 +48,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage #[tracing::instrument(level = "trace", skip(self, event))] pub(super) async fn handle_replica_event(&mut self, event: ReplicaEvent) { let res = match event { - ReplicaEvent::RateUpdate { - target, - is_line_rate, - } => self.handle_rate_update(target, is_line_rate).await, - ReplicaEvent::RevertToFollower { target, term } => { - self.handle_revert_to_follower(target, term).await - } + ReplicaEvent::RateUpdate { target, is_line_rate } => self.handle_rate_update(target, is_line_rate).await, + ReplicaEvent::RevertToFollower { target, term } => self.handle_revert_to_follower(target, term).await, ReplicaEvent::UpdateMatchIndex { target, match_index, match_term, - } => { - self.handle_update_match_index(target, match_index, match_term) - .await - } - ReplicaEvent::NeedsSnapshot { target, tx } => { - self.handle_needs_snapshot(target, tx).await - } + } => self.handle_update_match_index(target, match_index, match_term).await, + ReplicaEvent::NeedsSnapshot { target, tx } => self.handle_needs_snapshot(target, tx).await, ReplicaEvent::Shutdown => { self.core.set_target_state(State::Shutdown); return; @@ -107,11 +95,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage self.change_membership(members, tx).await; } else { // We are still awaiting additional nodes, so replace our original state. - self.consensus_state = ConsensusState::NonVoterSync { - awaiting, - members, - tx, - }; + self.consensus_state = ConsensusState::NonVoterSync { awaiting, members, tx }; } } other => self.consensus_state = other, // Set the original value back to what it was. @@ -127,8 +111,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage if term > self.core.current_term { self.core.update_current_term(term, None); self.core.save_hard_state().await?; - self.core - .update_current_leader(UpdateCurrentLeader::Unknown); + self.core.update_current_leader(UpdateCurrentLeader::Unknown); self.core.set_target_state(State::Follower); } Ok(()) @@ -136,12 +119,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Handle events from a replication stream which updates the target node's match index. #[tracing::instrument(level = "trace", skip(self))] - async fn handle_update_match_index( - &mut self, - target: NodeId, - match_index: u64, - match_term: u64, - ) -> RaftResult<()> { + async fn handle_update_match_index(&mut self, target: NodeId, match_index: u64, match_term: u64) -> RaftResult<()> { let mut found = false; if let Some(state) = self.non_voters.get_mut(&target) { @@ -173,7 +151,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Drop replication stream if needed. if needs_removal { if let Some(node) = self.nodes.remove(&target) { - let _ = node.replstream.repltx.send(RaftEvent::Terminate); + let _ = node.replstream.repl_tx.send(RaftEvent::Terminate); } } @@ -181,15 +159,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let indices_c0 = self.get_match_indexes(&self.core.membership.members); tracing::debug!("indices_c0: {:?}", indices_c0); - let commit_index_c0 = - calculate_new_commit_index(indices_c0, self.core.commit_index, self.core.current_term); + let commit_index_c0 = calculate_new_commit_index(indices_c0, self.core.commit_index, self.core.current_term); tracing::debug!("commit_index_c0: {}", commit_index_c0); tracing::debug!("c1: {:?}", self.core.membership.members_after_consensus); - tracing::debug!( - "follower nodes: {:?}", - self.nodes.keys().collect::>() - ); + tracing::debug!("follower nodes: {:?}", self.nodes.keys().collect::>()); // If we are in joint consensus, then calculate the new commit index of the new membership config nodes. let mut commit_index_c1 = commit_index_c0; // Defaults to just matching C0. @@ -197,35 +171,26 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let indices_c1 = self.get_match_indexes(members); tracing::debug!("indices_c1: {:?}", indices_c1); - commit_index_c1 = calculate_new_commit_index( - indices_c1, - self.core.commit_index, - self.core.current_term, - ); + commit_index_c1 = calculate_new_commit_index(indices_c1, self.core.commit_index, self.core.current_term); tracing::debug!("commit_index_c1: {}", commit_index_c1); } // Determine if we have a new commit index, accounting for joint consensus. // If a new commit index has been established, then update a few needed elements. - let has_new_commit_index = - commit_index_c0 > self.core.commit_index && commit_index_c1 > self.core.commit_index; + let has_new_commit_index = commit_index_c0 > self.core.commit_index && commit_index_c1 > self.core.commit_index; if has_new_commit_index { self.core.commit_index = std::cmp::min(commit_index_c0, commit_index_c1); // Update all replication streams based on new commit index. for node in self.nodes.values() { - let _ = node.replstream.repltx.send(RaftEvent::UpdateCommitIndex { + let _ = node.replstream.repl_tx.send(RaftEvent::UpdateCommitIndex { commit_index: self.core.commit_index, }); } for node in self.non_voters.values() { - let _ = node - .state - .replstream - .repltx - .send(RaftEvent::UpdateCommitIndex { - commit_index: self.core.commit_index, - }); + let _ = node.state.replstream.repl_tx.send(RaftEvent::UpdateCommitIndex { + commit_index: self.core.commit_index, + }); } // Check if there are any pending requests which need to be processed. @@ -302,11 +267,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage if let Some(snapshot) = current_snapshot_opt { // If snapshot exists, ensure its distance from the leader's last log index is <= half // of the configured snapshot threshold, else create a new snapshot. - if snapshot_is_within_half_of_threshold( - &snapshot.index, - &self.core.last_log_index, - &threshold, - ) { + if snapshot_is_within_half_of_threshold(&snapshot.index, &self.core.last_log_index, &threshold) { let _ = tx.send(snapshot); return Ok(()); } @@ -316,9 +277,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // completion (or cancellation), and respond to the replication stream. The repl stream // will wait for the completion and will then send anothe request to fetch the finished snapshot. // Else we just drop any other state and continue. Leaders never enter `Streaming` state. - if let Some(SnapshotState::Snapshotting { handle, sender }) = - self.core.snapshot_state.take() - { + if let Some(SnapshotState::Snapshotting { handle, sender }) = self.core.snapshot_state.take() { let mut chan = sender.subscribe(); tokio::spawn(async move { let _ = chan.recv().await; @@ -350,20 +309,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// /// NOTE: there are a few edge cases accounted for in this routine which will never practically /// be hit, but they are accounted for in the name of good measure. -fn calculate_new_commit_index( - mut entries: Vec<(u64, u64)>, - current_commit: u64, - leader_term: u64, -) -> u64 { +fn calculate_new_commit_index(mut entries: Vec<(u64, u64)>, current_commit: u64, leader_term: u64) -> u64 { if entries.is_empty() { return current_commit; } entries.sort_unstable_by(|a, b| a.0.cmp(&b.0)); let offset = (entries.len() + 1) / 2 - 1; - let new_val = entries - .get(offset) - .cloned() - .unwrap_or((current_commit, leader_term)); + let new_val = entries.get(offset).cloned().unwrap_or((current_commit, leader_term)); if new_val.0 > current_commit && new_val.1 == leader_term { new_val.0 } else { @@ -372,11 +324,7 @@ fn calculate_new_commit_index( } /// Check if the given snapshot data is within half of the configured threshold. -fn snapshot_is_within_half_of_threshold( - snapshot_last_index: &u64, - last_log_index: &u64, - threshold: &u64, -) -> bool { +fn snapshot_is_within_half_of_threshold(snapshot_last_index: &u64, last_log_index: &u64, threshold: &u64) -> bool { // Calculate distance from actor's last log index. let distance_from_line = if snapshot_last_index > last_log_index { 0u64 @@ -387,7 +335,6 @@ fn snapshot_is_within_half_of_threshold( distance_from_line <= half_of_threshold } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// #[cfg(test)] @@ -404,11 +351,7 @@ mod tests { ({test=>$name:ident, snapshot_last_index=>$snapshot_last_index:expr, last_log_index=>$last_log:expr, threshold=>$thresh:expr, expected=>$exp:literal}) => { #[test] fn $name() { - let res = snapshot_is_within_half_of_threshold( - $snapshot_last_index, - $last_log, - $thresh, - ); + let res = snapshot_is_within_half_of_threshold($snapshot_last_index, $last_log, $thresh); assert_eq!(res, $exp) } }; @@ -441,40 +384,25 @@ mod tests { #[test] fn $name() { let mut entries = $entries; - let output = - calculate_new_commit_index(entries.clone(), $current, $leader_term); + let output = calculate_new_commit_index(entries.clone(), $current, $leader_term); entries.sort_unstable_by(|a, b| a.0.cmp(&b.0)); assert_eq!(output, $expected, "Sorted values: {:?}", entries); } }; } - test_calculate_new_commit_index!(basic_values, 10, 5, 3, vec![ - (20, 3), - (5, 2), - (0, 2), - (15, 3), - (10, 3) - ]); + test_calculate_new_commit_index!(basic_values, 10, 5, 3, vec![(20, 3), (5, 2), (0, 2), (15, 3), (10, 3)]); test_calculate_new_commit_index!(len_zero_should_return_current_commit, 20, 20, 10, vec![]); - test_calculate_new_commit_index!(len_one_where_greater_than_current, 100, 0, 3, vec![( - 100, 3 - )]); + test_calculate_new_commit_index!(len_one_where_greater_than_current, 100, 0, 3, vec![(100, 3)]); - test_calculate_new_commit_index!( - len_one_where_greater_than_current_but_smaller_term, - 0, - 0, - 3, - vec![(100, 2)] - ); - - test_calculate_new_commit_index!(len_one_where_less_than_current, 100, 100, 3, vec![( - 50, 3 + test_calculate_new_commit_index!(len_one_where_greater_than_current_but_smaller_term, 0, 0, 3, vec![( + 100, 2 )]); + test_calculate_new_commit_index!(len_one_where_less_than_current, 100, 100, 3, vec![(50, 3)]); + test_calculate_new_commit_index!(even_number_of_nodes, 0, 0, 3, vec![ (0, 3), (100, 3), @@ -494,20 +422,14 @@ mod tests { (100, 3) ]); - test_calculate_new_commit_index!( - majority_entries_wins_but_not_current_term, - 0, - 0, - 3, - vec![ - (0, 2), - (100, 2), - (0, 2), - (101, 3), - (0, 2), - (101, 3), - (101, 3) - ] - ); + test_calculate_new_commit_index!(majority_entries_wins_but_not_current_term, 0, 0, 3, vec![ + (0, 2), + (100, 2), + (0, 2), + (101, 3), + (0, 2), + (101, 3), + (101, 3) + ]); } } diff --git a/async-raft/src/core/vote.rs b/async-raft/src/core/vote.rs index 8174678dd..27b78eace 100644 --- a/async-raft/src/core/vote.rs +++ b/async-raft/src/core/vote.rs @@ -20,10 +20,7 @@ impl, S: RaftStorage> Ra /// /// See `receiver implementation: RequestVote RPC` in raft-essentials.md in this repo. #[tracing::instrument(level = "trace", skip(self, msg))] - pub(super) async fn handle_vote_request( - &mut self, - msg: VoteRequest, - ) -> RaftResult { + pub(super) async fn handle_vote_request(&mut self, msg: VoteRequest) -> RaftResult { // If candidate's current term is less than this nodes current term, reject. if msg.term < self.current_term { tracing::trace!({candidate=msg.candidate_id, self.current_term, rpc_term=msg.term}, "RequestVote RPC term is less than current term"); @@ -61,8 +58,8 @@ impl, S: RaftStorage> Ra // Check if candidate's log is at least as up-to-date as this node's. // If candidate's log is not at least as up-to-date as this node, then reject. - let client_is_uptodate = (msg.last_log_term >= self.last_log_term) - && (msg.last_log_index >= self.last_log_index); + let client_is_uptodate = + (msg.last_log_term >= self.last_log_term) && (msg.last_log_index >= self.last_log_index); if !client_is_uptodate { tracing::trace!( { candidate = msg.candidate_id }, @@ -105,21 +102,14 @@ impl, S: RaftStorage> Ra } } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - CandidateState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> CandidateState<'a, D, R, N, S> { /// Handle response from a vote request sent to a peer. #[tracing::instrument(level = "trace", skip(self, res, target))] - pub(super) async fn handle_vote_response( - &mut self, - res: VoteResponse, - target: NodeId, - ) -> RaftResult<()> { + pub(super) async fn handle_vote_response(&mut self, res: VoteResponse, target: NodeId) -> RaftResult<()> { // If peer's term is greater than current term, revert to follower state. if res.term > self.core.current_term { self.core.update_current_term(res.term, None); - self.core - .update_current_leader(UpdateCurrentLeader::Unknown); + self.core.update_current_leader(UpdateCurrentLeader::Unknown); self.core.set_target_state(State::Follower); self.core.save_hard_state().await?; tracing::trace!("reverting to follower state due to greater term observed in RequestVote RPC response"); @@ -144,12 +134,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage self.votes_granted_new += 1; } // If we've received enough votes from both config groups, then transition to leader state`. - if self.votes_granted_old >= self.votes_needed_old - && self.votes_granted_new >= self.votes_needed_new - { - tracing::trace!( - "transitioning to leader state as minimum number of votes have been received" - ); + if self.votes_granted_old >= self.votes_needed_old && self.votes_granted_new >= self.votes_needed_new { + tracing::trace!("transitioning to leader state as minimum number of votes have been received"); self.core.set_target_state(State::Leader); return Ok(()); } @@ -164,10 +150,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage pub(super) fn spawn_parallel_vote_requests(&self) -> mpsc::Receiver<(VoteResponse, NodeId)> { let all_members = self.core.membership.all_nodes(); let (tx, rx) = mpsc::channel(all_members.len()); - for member in all_members - .into_iter() - .filter(|member| member != &self.core.id) - { + for member in all_members.into_iter().filter(|member| member != &self.core.id) { let rpc = VoteRequest::new( self.core.current_term, self.core.id, diff --git a/async-raft/src/error.rs b/async-raft/src/error.rs index d4384d938..ae910ffc1 100644 --- a/async-raft/src/error.rs +++ b/async-raft/src/error.rs @@ -68,10 +68,9 @@ impl fmt::Debug for ClientWriteError { #[derive(Debug, Error, Eq, PartialEq)] #[non_exhaustive] pub enum ConfigError { - /// A configuration error indicating that the given values for election timeout min & max are invalid: max must be greater than min. - #[error( - "given values for election timeout min & max are invalid: max must be greater than min" - )] + /// A configuration error indicating that the given values for election timeout min & max are invalid: max must be + /// greater than min. + #[error("given values for election timeout min & max are invalid: max must be greater than min")] InvalidElectionTimeoutMinMax, /// The given value for max_payload_entries is too small, must be > 0. #[error("the given value for max_payload_entries is too small, must be > 0")] diff --git a/async-raft/src/metrics.rs b/async-raft/src/metrics.rs index 6f8d9d24a..057cdf947 100644 --- a/async-raft/src/metrics.rs +++ b/async-raft/src/metrics.rs @@ -79,20 +79,10 @@ impl Wait { loop { let latest = rx.borrow().clone(); - tracing::debug!( - "id={} wait {:} latest: {:?}", - latest.id, - msg.to_string(), - latest - ); + tracing::debug!("id={} wait {:} latest: {:?}", latest.id, msg.to_string(), latest); if func(&latest) { - tracing::debug!( - "id={} done wait {:} latest: {:?}", - latest.id, - msg.to_string(), - latest - ); + tracing::debug!("id={} done wait {:} latest: {:?}", latest.id, msg.to_string(), latest); return Ok(latest); } @@ -126,11 +116,7 @@ impl Wait { /// Wait for `current_leader` to become `Some(leader_id)` until timeout. #[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn current_leader( - &self, - leader_id: NodeId, - msg: impl ToString, - ) -> Result { + pub async fn current_leader(&self, leader_id: NodeId, msg: impl ToString) -> Result { self.metrics( |x| x.current_leader == Some(leader_id), &format!("{} .current_leader -> {}", msg.to_string(), leader_id), @@ -156,11 +142,7 @@ impl Wait { /// Wait for `state` to become `want_state` or timeout. #[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn state( - &self, - want_state: State, - msg: impl ToString, - ) -> Result { + pub async fn state(&self, want_state: State, msg: impl ToString) -> Result { self.metrics( |x| x.state == want_state, &format!("{} .state -> {:?}", msg.to_string(), want_state), @@ -170,18 +152,10 @@ impl Wait { /// Wait for `membership_config.members` to become expected node set or timeout. #[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn members( - &self, - want_members: HashSet, - msg: impl ToString, - ) -> Result { + pub async fn members(&self, want_members: HashSet, msg: impl ToString) -> Result { self.metrics( |x| x.membership_config.members == want_members, - &format!( - "{} .membership_config.members -> {:?}", - msg.to_string(), - want_members - ), + &format!("{} .membership_config.members -> {:?}", msg.to_string(), want_members), ) .await } diff --git a/async-raft/src/metrics_wait_test.rs b/async-raft/src/metrics_wait_test.rs index 6bacf2511..ec0914bd6 100644 --- a/async-raft/src/metrics_wait_test.rs +++ b/async-raft/src/metrics_wait_test.rs @@ -96,10 +96,7 @@ async fn test_wait() -> anyhow::Result<()> { let got = w.next_members(Some(hashset![1, 2]), "next_members").await?; h.await?; - assert_eq!( - Some(hashset![1, 2]), - got.membership_config.members_after_consensus - ); + assert_eq!(Some(hashset![1, 2]), got.membership_config.members_after_consensus); } { diff --git a/async-raft/src/network.rs b/async-raft/src/network.rs index b00e20c2d..a18ef76d8 100644 --- a/async-raft/src/network.rs +++ b/async-raft/src/network.rs @@ -21,18 +21,10 @@ pub trait RaftNetwork: Send + Sync + 'static where D: AppData { /// Send an AppendEntries RPC to the target Raft node (§5). - async fn append_entries( - &self, - target: NodeId, - rpc: AppendEntriesRequest, - ) -> Result; + async fn append_entries(&self, target: NodeId, rpc: AppendEntriesRequest) -> Result; /// Send an InstallSnapshot RPC to the target Raft node (§7). - async fn install_snapshot( - &self, - target: NodeId, - rpc: InstallSnapshotRequest, - ) -> Result; + async fn install_snapshot(&self, target: NodeId, rpc: InstallSnapshotRequest) -> Result; /// Send a RequestVote RPC to the target Raft node (§5). async fn vote(&self, target: NodeId, rpc: VoteRequest) -> Result; diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index 99817d29b..4e943838b 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -85,15 +85,7 @@ impl, S: RaftStorage> Ra let (tx_api, rx_api) = mpsc::unbounded_channel(); let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id)); let (tx_shutdown, rx_shutdown) = oneshot::channel(); - let raft_handle = RaftCore::spawn( - id, - config, - network, - storage, - rx_api, - tx_metrics, - rx_shutdown, - ); + let raft_handle = RaftCore::spawn(id, config, network, storage, rx_api, tx_metrics, rx_shutdown); let inner = RaftInner { tx_api, rx_metrics, @@ -102,9 +94,7 @@ impl, S: RaftStorage> Ra marker_n: std::marker::PhantomData, marker_s: std::marker::PhantomData, }; - Self { - inner: Arc::new(inner), - } + Self { inner: Arc::new(inner) } } /// Submit an AppendEntries RPC to this Raft node. @@ -112,18 +102,10 @@ impl, S: RaftStorage> Ra /// These RPCs are sent by the cluster leader to replicate log entries (§5.3), and are also /// used as heartbeats (§5.2). #[tracing::instrument(level = "debug", skip(self, rpc))] - pub async fn append_entries( - &self, - rpc: AppendEntriesRequest, - ) -> Result { + pub async fn append_entries(&self, rpc: AppendEntriesRequest) -> Result { let (tx, rx) = oneshot::channel(); - self.inner - .tx_api - .send(RaftMsg::AppendEntries { rpc, tx }) - .map_err(|_| RaftError::ShuttingDown)?; - rx.await - .map_err(|_| RaftError::ShuttingDown) - .and_then(|res| res) + self.inner.tx_api.send(RaftMsg::AppendEntries { rpc, tx }).map_err(|_| RaftError::ShuttingDown)?; + rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res) } /// Submit a VoteRequest (RequestVote in the spec) RPC to this Raft node. @@ -132,13 +114,8 @@ impl, S: RaftStorage> Ra #[tracing::instrument(level = "debug", skip(self, rpc))] pub async fn vote(&self, rpc: VoteRequest) -> Result { let (tx, rx) = oneshot::channel(); - self.inner - .tx_api - .send(RaftMsg::RequestVote { rpc, tx }) - .map_err(|_| RaftError::ShuttingDown)?; - rx.await - .map_err(|_| RaftError::ShuttingDown) - .and_then(|res| res) + self.inner.tx_api.send(RaftMsg::RequestVote { rpc, tx }).map_err(|_| RaftError::ShuttingDown)?; + rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res) } /// Submit an InstallSnapshot RPC to this Raft node. @@ -146,18 +123,10 @@ impl, S: RaftStorage> Ra /// These RPCs are sent by the cluster leader in order to bring a new node or a slow node up-to-speed /// with the leader (§7). #[tracing::instrument(level = "debug", skip(self, rpc))] - pub async fn install_snapshot( - &self, - rpc: InstallSnapshotRequest, - ) -> Result { + pub async fn install_snapshot(&self, rpc: InstallSnapshotRequest) -> Result { let (tx, rx) = oneshot::channel(); - self.inner - .tx_api - .send(RaftMsg::InstallSnapshot { rpc, tx }) - .map_err(|_| RaftError::ShuttingDown)?; - rx.await - .map_err(|_| RaftError::ShuttingDown) - .and_then(|res| res) + self.inner.tx_api.send(RaftMsg::InstallSnapshot { rpc, tx }).map_err(|_| RaftError::ShuttingDown)?; + rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res) } /// Get the ID of the current leader from this Raft node. @@ -181,9 +150,7 @@ impl, S: RaftStorage> Ra .tx_api .send(RaftMsg::ClientReadRequest { tx }) .map_err(|_| ClientReadError::RaftError(RaftError::ShuttingDown))?; - rx.await - .map_err(|_| ClientReadError::RaftError(RaftError::ShuttingDown)) - .and_then(|res| res) + rx.await.map_err(|_| ClientReadError::RaftError(RaftError::ShuttingDown)).and_then(|res| res) } /// Submit a mutating client request to Raft to update the state of the system (§5.1). @@ -213,9 +180,7 @@ impl, S: RaftStorage> Ra .tx_api .send(RaftMsg::ClientWriteRequest { rpc, tx }) .map_err(|_| ClientWriteError::RaftError(RaftError::ShuttingDown))?; - rx.await - .map_err(|_| ClientWriteError::RaftError(RaftError::ShuttingDown)) - .and_then(|res| res) + rx.await.map_err(|_| ClientWriteError::RaftError(RaftError::ShuttingDown)).and_then(|res| res) } /// Initialize a pristine Raft node with the given config. @@ -249,13 +214,8 @@ impl, S: RaftStorage> Ra #[tracing::instrument(level = "debug", skip(self))] pub async fn initialize(&self, members: HashSet) -> Result<(), InitializeError> { let (tx, rx) = oneshot::channel(); - self.inner - .tx_api - .send(RaftMsg::Initialize { members, tx }) - .map_err(|_| RaftError::ShuttingDown)?; - rx.await - .map_err(|_| InitializeError::RaftError(RaftError::ShuttingDown)) - .and_then(|res| res) + self.inner.tx_api.send(RaftMsg::Initialize { members, tx }).map_err(|_| RaftError::ShuttingDown)?; + rx.await.map_err(|_| InitializeError::RaftError(RaftError::ShuttingDown)).and_then(|res| res) } /// Synchronize a new Raft node, bringing it up-to-speed (§6). @@ -273,13 +233,8 @@ impl, S: RaftStorage> Ra #[tracing::instrument(level = "debug", skip(self))] pub async fn add_non_voter(&self, id: NodeId) -> Result<(), ChangeConfigError> { let (tx, rx) = oneshot::channel(); - self.inner - .tx_api - .send(RaftMsg::AddNonVoter { id, tx }) - .map_err(|_| RaftError::ShuttingDown)?; - rx.await - .map_err(|_| ChangeConfigError::RaftError(RaftError::ShuttingDown)) - .and_then(|res| res) + self.inner.tx_api.send(RaftMsg::AddNonVoter { id, tx }).map_err(|_| RaftError::ShuttingDown)?; + rx.await.map_err(|_| ChangeConfigError::RaftError(RaftError::ShuttingDown)).and_then(|res| res) } /// Propose a cluster configuration change (§6). @@ -294,18 +249,13 @@ impl, S: RaftStorage> Ra /// If this Raft node is not the cluster leader, then the proposed configuration change will be /// rejected. #[tracing::instrument(level = "debug", skip(self))] - pub async fn change_membership( - &self, - members: HashSet, - ) -> Result<(), ChangeConfigError> { + pub async fn change_membership(&self, members: HashSet) -> Result<(), ChangeConfigError> { let (tx, rx) = oneshot::channel(); self.inner .tx_api .send(RaftMsg::ChangeMembership { members, tx }) .map_err(|_| RaftError::ShuttingDown)?; - rx.await - .map_err(|_| ChangeConfigError::RaftError(RaftError::ShuttingDown)) - .and_then(|res| res) + rx.await.map_err(|_| ChangeConfigError::RaftError(RaftError::ShuttingDown)).and_then(|res| res) } /// Get a handle to the metrics channel. @@ -329,7 +279,6 @@ impl, S: RaftStorage> Ra /// /// // wait for raft state to become a follower /// r.wait(None).state(State::Follower).await?; - /// /// ``` pub fn wait(&self, timeout: Option) -> Wait { let timeout = match timeout { @@ -354,9 +303,7 @@ impl, S: RaftStorage> Ra } } -impl, S: RaftStorage> Clone - for Raft -{ +impl, S: RaftStorage> Clone for Raft { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -364,8 +311,7 @@ impl, S: RaftStorage> Cl } } -pub(crate) type ClientWriteResponseTx = - oneshot::Sender, ClientWriteError>>; +pub(crate) type ClientWriteResponseTx = oneshot::Sender, ClientWriteError>>; pub(crate) type ClientReadResponseTx = oneshot::Sender>; pub(crate) type ChangeMembershipTx = oneshot::Sender>; @@ -404,7 +350,6 @@ pub(crate) enum RaftMsg { }, } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). @@ -481,12 +426,7 @@ impl Entry { /// ### membership /// The cluster membership config which is contained in the snapshot, which will always be the /// latest membership covered by the snapshot. - pub fn new_snapshot_pointer( - index: u64, - term: u64, - id: String, - membership: MembershipConfig, - ) -> Self { + pub fn new_snapshot_pointer(index: u64, term: u64, id: String, membership: MembershipConfig) -> Self { Entry { term, index, @@ -537,7 +477,6 @@ pub struct EntrySnapshotPointer { pub membership: MembershipConfig, } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// The membership configuration of the cluster. @@ -591,7 +530,6 @@ impl MembershipConfig { } } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// An RPC sent by candidates to gather votes (§5.2). @@ -628,7 +566,6 @@ pub struct VoteResponse { pub vote_granted: bool, } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// An RPC sent by the Raft leader to send chunks of a snapshot to a follower (§7). @@ -657,7 +594,6 @@ pub struct InstallSnapshotResponse { pub term: u64, } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// An application specific client request to update the state of the system (§5.1). diff --git a/async-raft/src/replication/mod.rs b/async-raft/src/replication/mod.rs index d0dfc7b3e..02dc6b230 100644 --- a/async-raft/src/replication/mod.rs +++ b/async-raft/src/replication/mod.rs @@ -35,7 +35,7 @@ pub(crate) struct ReplicationStream { /// The spawn handle the `ReplicationCore` task. // pub handle: JoinHandle<()>, /// The channel used for communicating with the replication task. - pub repltx: mpsc::UnboundedSender>, + pub repl_tx: mpsc::UnboundedSender>, } impl ReplicationStream { @@ -82,9 +82,9 @@ struct ReplicationCore, S: Raf /// The current term, which will never change during the lifetime of this task. term: u64, /// A channel for sending events to the Raft node. - rafttx: mpsc::UnboundedSender>, + raft_tx: mpsc::UnboundedSender>, /// A channel for receiving events from the Raft node. - raftrx: mpsc::UnboundedReceiver>, + raft_rx: mpsc::UnboundedReceiver>, /// The `RaftNetwork` interface. network: Arc, /// The `RaftStorage` interface. @@ -151,9 +151,7 @@ struct ReplicationCore, S: Raf heartbeat_timeout: Duration, } -impl, S: RaftStorage> - ReplicationCore -{ +impl, S: RaftStorage> ReplicationCore { /// Spawn a new replication task for the target node. pub(self) fn spawn( id: NodeId, @@ -167,7 +165,7 @@ impl, S: RaftStorage> storage: Arc, rafttx: mpsc::UnboundedSender>, ) -> ReplicationStream { - let (raftrx_tx, raftrx) = mpsc::unbounded_channel(); + let (raft_tx, raft_rx) = mpsc::unbounded_channel(); let heartbeat_timeout = Duration::from_millis(config.heartbeat_interval); let max_payload_entries = config.max_payload_entries as usize; let this = Self { @@ -185,8 +183,8 @@ impl, S: RaftStorage> next_index: last_log_index + 1, match_index: last_log_index, match_term: last_log_term, - rafttx, - raftrx, + raft_tx: rafttx, + raft_rx, heartbeat: interval(heartbeat_timeout), heartbeat_timeout, replication_buffer: Vec::new(), @@ -195,7 +193,7 @@ impl, S: RaftStorage> let _handle = tokio::spawn(this.main()); ReplicationStream { // handle, - repltx: raftrx_tx, + repl_tx: raft_tx, } } @@ -230,11 +228,7 @@ impl, S: RaftStorage> } else { self.max_payload_entries }; - self.outbound_buffer.extend( - self.replication_buffer - .drain(..chunk_size) - .map(OutboundEntry::Arc), - ); + self.outbound_buffer.extend(self.replication_buffer.drain(..chunk_size).map(OutboundEntry::Arc)); } } @@ -245,18 +239,11 @@ impl, S: RaftStorage> prev_log_index: self.match_index, prev_log_term: self.match_term, leader_commit: self.commit_index, - entries: self - .outbound_buffer - .iter() - .map(|entry| entry.as_ref().clone()) - .collect(), + entries: self.outbound_buffer.iter().map(|entry| entry.as_ref().clone()).collect(), }; // Send the payload. - tracing::debug!( - "start sending append_entries, timeout: {:?}", - self.heartbeat_timeout - ); + tracing::debug!("start sending append_entries, timeout: {:?}", self.heartbeat_timeout); let res = match timeout( self.heartbeat_timeout, self.network.append_entries(self.target, payload), @@ -266,20 +253,19 @@ impl, S: RaftStorage> Ok(outer_res) => match outer_res { Ok(res) => res, Err(err) => { - tracing::error!({error=%err}, "error sending AppendEntries RPC to target"); + tracing::error!(error=%err, "error sending AppendEntries RPC to target"); return; } }, Err(err) => { - tracing::error!({error=%err}, "timeout while sending AppendEntries RPC to target"); + tracing::error!(error=%err, "timeout while sending AppendEntries RPC to target"); return; } }; - let last_index_and_term = self - .outbound_buffer - .last() - .map(|last| (last.as_ref().index, last.as_ref().term)); - self.outbound_buffer.clear(); // Once we've successfully sent a payload of entries, don't send them again. + let last_index_and_term = self.outbound_buffer.last().map(|last| (last.as_ref().index, last.as_ref().term)); + + // Once we've successfully sent a payload of entries, don't send them again. + self.outbound_buffer.clear(); tracing::debug!("append_entries last: {:?}", last_index_and_term); @@ -292,7 +278,7 @@ impl, S: RaftStorage> self.next_index = index + 1; // This should always be the next expected index. self.match_index = index; self.match_term = term; - let _ = self.rafttx.send(ReplicaEvent::UpdateMatchIndex { + let _ = self.raft_tx.send(ReplicaEvent::UpdateMatchIndex { target: self.target, match_index: index, match_term: term, @@ -316,7 +302,7 @@ impl, S: RaftStorage> // Replication was not successful, if a newer term has been returned, revert to follower. if res.term > self.term { tracing::trace!({ res.term }, "append entries failed, reverting to follower"); - let _ = self.rafttx.send(ReplicaEvent::RevertToFollower { + let _ = self.raft_tx.send(ReplicaEvent::RevertToFollower { target: self.target, term: res.term, }); @@ -326,7 +312,8 @@ impl, S: RaftStorage> // Replication was not successful, handle conflict optimization record, else decrement `next_index`. if let Some(conflict) = res.conflict_opt { - tracing::trace!({?conflict, res.term}, "append entries failed, handling conflict opt"); + tracing::trace!(?conflict, res.term, "append entries failed, handling conflict opt"); + // If the returned conflict opt index is greater than last_log_index, then this is a // logical error, and no action should be taken. This represents a replication failure. if conflict.index > self.last_log_index { @@ -340,7 +327,7 @@ impl, S: RaftStorage> // it will never exist. So instead, we just return, and accept the conflict data. if conflict.index == 0 { self.target_state = TargetReplState::Lagging; - let _ = self.rafttx.send(ReplicaEvent::UpdateMatchIndex { + let _ = self.raft_tx.send(ReplicaEvent::UpdateMatchIndex { target: self.target, match_index: self.match_index, match_term: self.match_term, @@ -362,7 +349,7 @@ impl, S: RaftStorage> // This condition would only ever be reached if the log has been removed due to // log compaction (barring critical storage failure), so transition to snapshotting. self.target_state = TargetReplState::Snapshotting; - let _ = self.rafttx.send(ReplicaEvent::UpdateMatchIndex { + let _ = self.raft_tx.send(ReplicaEvent::UpdateMatchIndex { target: self.target, match_index: self.match_index, match_term: self.match_term, @@ -370,15 +357,15 @@ impl, S: RaftStorage> return; } Err(err) => { - tracing::error!({error=%err}, "error fetching log entry due to returned AppendEntries RPC conflict_opt"); - let _ = self.rafttx.send(ReplicaEvent::Shutdown); + tracing::error!(error=%err, "error fetching log entry due to returned AppendEntries RPC conflict_opt"); + let _ = self.raft_tx.send(ReplicaEvent::Shutdown); self.target_state = TargetReplState::Shutdown; return; } }; // Check snapshot policy and handle conflict as needed. - let _ = self.rafttx.send(ReplicaEvent::UpdateMatchIndex { + let _ = self.raft_tx.send(ReplicaEvent::UpdateMatchIndex { target: self.target, match_index: self.match_index, match_term: self.match_term, @@ -405,11 +392,8 @@ impl, S: RaftStorage> pub(self) fn needs_snapshot(&self) -> bool { match &self.config.snapshot_policy { SnapshotPolicy::LogsSinceLast(threshold) => { - let needs_snap = self - .commit_index - .checked_sub(self.match_index) - .map(|diff| diff >= *threshold) - .unwrap_or(false); + let needs_snap = + self.commit_index.checked_sub(self.match_index).map(|diff| diff >= *threshold).unwrap_or(false); if needs_snap { tracing::trace!("snapshot needed"); true @@ -422,7 +406,7 @@ impl, S: RaftStorage> } /// Fully drain the channel coming in from the Raft node. - pub(self) fn drain_raftrx(&mut self, first: RaftEvent) { + pub(self) fn drain_raft_rx(&mut self, first: RaftEvent) { let mut event_opt = Some(first); let mut iters = 0; loop { @@ -430,33 +414,35 @@ impl, S: RaftStorage> if iters > self.config.max_payload_entries { return; } + // Unpack the event opt, else return if we don't have one to process. let event = match event_opt.take() { Some(event) => event, None => return, }; + // Process the event. match event { RaftEvent::UpdateCommitIndex { commit_index } => { self.commit_index = commit_index; } - RaftEvent::Replicate { - entry, - commit_index, - } => { + + RaftEvent::Replicate { entry, commit_index } => { self.commit_index = commit_index; self.last_log_index = entry.index; if self.target_state == TargetReplState::LineRate { self.replication_buffer.push(entry); } } + RaftEvent::Terminate => { self.target_state = TargetReplState::Shutdown; return; } } + // Attempt to unpack the next event for the next loop iteration. - if let Some(event) = self.raftrx.recv().now_or_never() { + if let Some(event) = self.raft_rx.recv().now_or_never() { event_opt = event; } iters += 1; @@ -481,7 +467,6 @@ impl AsRef> for OutboundEntry { } } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// The state of the replication stream. @@ -558,7 +543,6 @@ where S: AsyncRead + AsyncSeek + Send + Unpin + 'static Shutdown, } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// LineRate specific state. @@ -567,9 +551,7 @@ struct LineRateState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: R core: &'a mut ReplicationCore, } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - LineRateState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> LineRateState<'a, D, R, N, S> { /// Create a new instance. pub fn new(core: &'a mut ReplicationCore) -> Self { Self { core } @@ -581,7 +563,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage target: self.core.target, is_line_rate: true, }; - let _ = self.core.rafttx.send(event); + let _ = self.core.raft_tx.send(event); loop { if self.core.target_state != TargetReplState::LineRate { return; @@ -593,17 +575,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage .outbound_buffer .first() .map(|entry| entry.as_ref().index) - .or_else(|| { - self.core - .replication_buffer - .first() - .map(|entry| entry.index) - }); + .or_else(|| self.core.replication_buffer.first().map(|entry| entry.index)); // When converting to `LaggingState`, `outbound_buffer` and `replication_buffer` is cleared, // in which there may be uncommitted logs. // Thus when converting back to `LineRateState`, when these two buffers are empty, we - // need to resend all unommitted logs. + // need to resend all uncommitted logs. // Otherwise these logs have no chance to be replicated, unless a new log is written. let index = match next_buf_index { Some(i) => i, @@ -615,8 +592,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // skipped this replication stream during transition. In such cases, a single update from // storage will put this stream back on track. if self.core.next_index != index { - self.frontload_outbound_buffer(self.core.next_index, index) - .await; + self.frontload_outbound_buffer(self.core.next_index, index).await; if self.core.target_state != TargetReplState::LineRate { return; } @@ -627,8 +603,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage tokio::select! { _ = self.core.heartbeat.tick() => self.core.send_append_entries().await, - event = self.core.raftrx.recv() => match event { - Some(event) => self.core.drain_raftrx(event), + + event = self.core.raft_rx.recv() => match event { + Some(event) => self.core.drain_raft_rx(event), None => self.core.target_state = TargetReplState::Shutdown, } } @@ -641,8 +618,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let entries = match self.core.storage.get_log_entries(start, stop).await { Ok(entries) => entries, Err(err) => { - tracing::error!({error=%err}, "error while frontloading outbound buffer"); - let _ = self.core.rafttx.send(ReplicaEvent::Shutdown); + tracing::error!(error=%err, "error while frontloading outbound buffer"); + let _ = self.core.raft_tx.send(ReplicaEvent::Shutdown); return; } }; @@ -654,14 +631,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } // Prepend. self.core.outbound_buffer.reverse(); - self.core - .outbound_buffer - .extend(entries.into_iter().rev().map(OutboundEntry::Raw)); + self.core.outbound_buffer.extend(entries.into_iter().rev().map(OutboundEntry::Raw)); self.core.outbound_buffer.reverse(); } } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// Lagging specific state. @@ -670,9 +644,7 @@ struct LaggingState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: Ra core: &'a mut ReplicationCore, } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - LaggingState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> LaggingState<'a, D, R, N, S> { /// Create a new instance. pub fn new(core: &'a mut ReplicationCore) -> Self { Self { core } @@ -684,7 +656,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage target: self.core.target, is_line_rate: false, }; - let _ = self.core.rafttx.send(event); + let _ = self.core.raft_tx.send(event); self.core.replication_buffer.clear(); self.core.outbound_buffer.clear(); loop { @@ -710,8 +682,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } // Check raft channel to ensure we are staying up-to-date, then loop. - if let Some(Some(event)) = self.core.raftrx.recv().now_or_never() { - self.core.drain_raftrx(event); + if let Some(Some(event)) = self.core.raft_rx.recv().now_or_never() { + self.core.drain_raft_rx(event); } } } @@ -728,9 +700,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // If the send buffer is empty, we need to fill it. if self.core.outbound_buffer.is_empty() { // Determine an appropriate stop index for the storage fetch operation. Avoid underflow. - let distance_behind = self.core.commit_index - self.core.next_index; // Underflow is guarded against in the `is_up_to_speed` check in the outer loop. - let is_within_payload_distance = - distance_behind <= self.core.config.max_payload_entries; + // + // Logs in storage: + // 0 ... next_index ... commit_index ... last_uncommitted_index + + // Underflow is guarded against in the `is_up_to_speed` check in the outer loop. + let distance_behind = self.core.commit_index - self.core.next_index; + + let is_within_payload_distance = distance_behind <= self.core.config.max_payload_entries; + let stop_idx = if is_within_payload_distance { // If we have caught up to the line index, then that means we will be running at // line rate after this payload is successfully replicated. @@ -742,16 +720,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Bringing the target up-to-date by fetching the largest possible payload of entries // from storage within permitted configuration & ensure no snapshot pointer was returned. - let entries = match self - .core - .storage - .get_log_entries(self.core.next_index, stop_idx) - .await - { + let entries = match self.core.storage.get_log_entries(self.core.next_index, stop_idx).await { Ok(entries) => entries, Err(err) => { - tracing::error!({error=%err}, "error fetching logs from storage"); - let _ = self.core.rafttx.send(ReplicaEvent::Shutdown); + tracing::error!(error=%err, "error fetching logs from storage"); + let _ = self.core.raft_tx.send(ReplicaEvent::Shutdown); self.core.target_state = TargetReplState::Shutdown; return; } @@ -762,33 +735,22 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage return; } } - self.core - .outbound_buffer - .extend(entries.into_iter().map(OutboundEntry::Raw)); + self.core.outbound_buffer.extend(entries.into_iter().map(OutboundEntry::Raw)); } } } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// Snapshotting specific state. -struct SnapshottingState< - 'a, - D: AppData, - R: AppDataResponse, - N: RaftNetwork, - S: RaftStorage, -> { +struct SnapshottingState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> { /// An exclusive handle to the replication core. core: &'a mut ReplicationCore, snapshot: Option>, snapshot_fetch_rx: Option>>, } -impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> - SnapshottingState<'a, D, R, N, S> -{ +impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> SnapshottingState<'a, D, R, N, S> { /// Create a new instance. pub fn new(core: &'a mut ReplicationCore) -> Self { Self { @@ -804,7 +766,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage target: self.core.target, is_line_rate: false, }; - let _ = self.core.rafttx.send(event); + let _ = self.core.raft_tx.send(event); self.core.replication_buffer.clear(); self.core.outbound_buffer.clear(); @@ -816,7 +778,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // If we don't have any of the components we need, fetch the current snapshot. if self.snapshot.is_none() && self.snapshot_fetch_rx.is_none() { let (tx, rx) = oneshot::channel(); - let _ = self.core.rafttx.send(ReplicaEvent::NeedsSnapshot { + let _ = self.core.raft_tx.send(ReplicaEvent::NeedsSnapshot { target: self.core.target, tx, }); @@ -833,7 +795,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // If we have a snapshot to work with, then stream it. if let Some(snapshot) = self.snapshot.take() { if let Err(err) = self.stream_snapshot(snapshot).await { - tracing::error!({error=%err}, "error streaming snapshot to target"); + tracing::error!(error=%err, "error streaming snapshot to target"); } continue; } @@ -845,20 +807,19 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// If an error comes up during processing, this routine should simple be called again after /// issuing a new request to the storage layer. #[tracing::instrument(level = "trace", skip(self, rx))] - async fn wait_for_snapshot( - &mut self, - mut rx: oneshot::Receiver>, - ) { + async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver>) { loop { tokio::select! { _ = self.core.heartbeat.tick() => self.core.send_append_entries().await, - event = self.core.raftrx.recv() => match event { - Some(event) => self.core.drain_raftrx(event), + + event = self.core.raft_rx.recv() => match event { + Some(event) => self.core.drain_raft_rx(event), None => { self.core.target_state = TargetReplState::Shutdown; return; } }, + res = &mut rx => { match res { Ok(snapshot) => { @@ -873,10 +834,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn stream_snapshot( - &mut self, - mut snapshot: CurrentSnapshotData, - ) -> RaftResult<()> { + async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData) -> RaftResult<()> { let mut offset = 0; self.core.next_index = snapshot.index + 1; self.core.match_index = snapshot.index; @@ -899,7 +857,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage buf.clear(); // Send the RPC over to the target. - tracing::trace!({snapshot_size=req.data.len(), nread, req.done, req.offset}, "sending snapshot chunk"); + tracing::trace!( + snapshot_size = req.data.len(), + nread, + req.done, + req.offset, + "sending snapshot chunk" + ); let res = match timeout( self.core.heartbeat_timeout, self.core.network.install_snapshot(self.core.target, req), @@ -909,19 +873,19 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage Ok(outer_res) => match outer_res { Ok(res) => res, Err(err) => { - tracing::error!({error=%err}, "error sending InstallSnapshot RPC to target"); + tracing::error!(error=%err, "error sending InstallSnapshot RPC to target"); continue; } }, Err(err) => { - tracing::error!({error=%err}, "timeout while sending InstallSnapshot RPC to target"); + tracing::error!(error=%err, "timeout while sending InstallSnapshot RPC to target"); continue; } }; // Handle response conditions. if res.term > self.core.term { - let _ = self.core.rafttx.send(ReplicaEvent::RevertToFollower { + let _ = self.core.raft_tx.send(ReplicaEvent::RevertToFollower { target: self.core.target, term: res.term, }); @@ -939,8 +903,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage offset += nread as u64; // Check raft channel to ensure we are staying up-to-date, then loop. - if let Some(Some(event)) = self.core.raftrx.recv().now_or_never() { - self.core.drain_raftrx(event); + if let Some(Some(event)) = self.core.raft_rx.recv().now_or_never() { + self.core.drain_raft_rx(event); } } } diff --git a/async-raft/tests/add_remove_voter.rs b/async-raft/tests/add_remove_voter.rs index 97772b7b9..da5128525 100644 --- a/async-raft/tests/add_remove_voter.rs +++ b/async-raft/tests/add_remove_voter.rs @@ -17,8 +17,8 @@ mod fixtures; /// /// - brings 5 nodes online: one leader and 4 non-voter. /// - add 4 non-voter as follower. -/// - asserts that the leader was able to successfully commit logs and that the followers has -/// successfully replicated the payload. +/// - asserts that the leader was able to successfully commit logs and that the followers has successfully replicated +/// the payload. /// - remove one folower: node-4 /// - asserts node-4 becomes non-voter and the leader stops sending logs to it. /// @@ -32,11 +32,7 @@ async fn add_remove_voter() -> Result<()> { let left_members = hashset![0, 1, 2, 3]; // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; @@ -127,11 +123,7 @@ async fn add_remove_voter() -> Result<()> { Ok(()) } -async fn wait_log( - router: std::sync::Arc, - node_ids: &HashSet, - want_log: u64, -) -> Result<()> { +async fn wait_log(router: std::sync::Arc, node_ids: &HashSet, want_log: u64) -> Result<()> { let timeout = Duration::from_millis(500); for i in node_ids.iter() { router diff --git a/async-raft/tests/client_reads.rs b/async-raft/tests/client_reads.rs index add6138a4..aeb95f61f 100644 --- a/async-raft/tests/client_reads.rs +++ b/async-raft/tests/client_reads.rs @@ -22,11 +22,7 @@ async fn client_reads() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; router.new_raft_node(1).await; @@ -35,12 +31,8 @@ async fn client_reads() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0, 1, 2], want, None, "empty node") - .await?; - router - .wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty node") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "empty node").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty node").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -48,28 +40,18 @@ async fn client_reads() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0, 1, 2], want, None, "init leader") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "init leader").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Get the ID of the leader, and assert that client_read succeeds. let leader = router.leader().await.expect("leader not found"); assert_eq!(leader, 0, "expected leader to be node 0, got {}", leader); - router.client_read(leader).await.unwrap_or_else(|_| { - panic!( - "expected client_read to succeed for cluster leader {}", - leader - ) - }); - router - .client_read(1) - .await - .expect_err("expected client_read on follower node 1 to fail"); router - .client_read(2) + .client_read(leader) .await - .expect_err("expected client_read on follower node 2 to fail"); + .unwrap_or_else(|_| panic!("expected client_read to succeed for cluster leader {}", leader)); + router.client_read(1).await.expect_err("expected client_read on follower node 1 to fail"); + router.client_read(2).await.expect_err("expected client_read on follower node 2 to fail"); Ok(()) } diff --git a/async-raft/tests/client_writes.rs b/async-raft/tests/client_writes.rs index a2d6daa0a..b1344962a 100644 --- a/async-raft/tests/client_writes.rs +++ b/async-raft/tests/client_writes.rs @@ -24,11 +24,7 @@ async fn client_writes() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; router.new_raft_node(1).await; @@ -37,12 +33,8 @@ async fn client_writes() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0, 1, 2], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -50,12 +42,8 @@ async fn client_writes() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0, 1, 2], want, None, "leader init log") - .await?; - router - .wait_for_state(&hashset![0], State::Leader, None, "init") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "leader init log").await?; + router.wait_for_state(&hashset![0], State::Leader, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; @@ -71,9 +59,7 @@ async fn client_writes() -> Result<()> { while clients.next().await.is_some() {} want = 6001; - router - .wait_for_log(&hashset![0, 1, 2], want, None, "sync logs") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "sync logs").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // The extra 1 is from the leader's initial commit entry. router diff --git a/async-raft/tests/compaction.rs b/async-raft/tests/compaction.rs index d49b710be..c2ea36197 100644 --- a/async-raft/tests/compaction.rs +++ b/async-raft/tests/compaction.rs @@ -38,12 +38,8 @@ async fn compaction() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0], want, None, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -51,18 +47,14 @@ async fn compaction() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0], want, None, "init leader") - .await?; + router.wait_for_log(&hashset![0], want, None, "init leader").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Send enough requests to the cluster that compaction on the node should be triggered. router.client_request_many(0, "0", 499).await; // Puts us exactly at the configured snapshot policy threshold. want += 499; - router - .wait_for_log(&hashset![0], want, None, "write") - .await?; + router.wait_for_log(&hashset![0], want, None, "write").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // TODO: add snapshot info into metrics. @@ -83,27 +75,17 @@ async fn compaction() -> Result<()> { // Add a new node and assert that it received the same snapshot. router.new_raft_node(1).await; - router - .add_non_voter(0, 1) - .await - .expect("failed to add new node as non-voter"); - router - .change_membership(0, hashset![0, 1]) - .await - .expect("failed to modify cluster membership"); + router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); + router.change_membership(0, hashset![0, 1]).await.expect("failed to modify cluster membership"); want += 2; // 2 member change logs - router - .wait_for_log(&hashset![0, 1], want, None, "add follower") - .await?; + router.wait_for_log(&hashset![0, 1], want, None, "add follower").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // We expect index to be 500 + 2 (joint & uniform config change entries). let expected_snap = Some((500.into(), 1, MembershipConfig { members: hashset![0u64], members_after_consensus: None, })); - router - .assert_storage_state(1, 502, None, 500, expected_snap) - .await; + router.assert_storage_state(1, 502, None, 500, expected_snap).await; // -------------------------------- ^^^^ this value is None because non-voters do not vote. Ok(()) diff --git a/async-raft/tests/concurrent_write_and_add_non_voter.rs b/async-raft/tests/concurrent_write_and_add_non_voter.rs index 46132c5b3..9aff36fb2 100644 --- a/async-raft/tests/concurrent_write_and_add_non_voter.rs +++ b/async-raft/tests/concurrent_write_and_add_non_voter.rs @@ -34,7 +34,8 @@ mod fixtures; /// - add another non-voter and at the same time write a log. /// - asserts that all of the leader, followers and the non-voter receives all logs. /// -/// RUST_LOG=async_raft,memstore,concurrent_write_and_add_non_voter=trace cargo test -p async-raft --test concurrent_write_and_add_non_voter +/// RUST_LOG=async_raft,memstore,concurrent_write_and_add_non_voter=trace cargo test -p async-raft --test +/// concurrent_write_and_add_non_voter #[tokio::test(flavor = "multi_thread", worker_threads = 6)] async fn concurrent_write_and_add_non_voter() -> Result<()> { fixtures::init_tracing(); @@ -43,11 +44,7 @@ async fn concurrent_write_and_add_non_voter() -> Result<()> { let candidates = hashset![0, 1, 2]; // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; diff --git a/async-raft/tests/conflict_with_empty_entries.rs b/async-raft/tests/conflict_with_empty_entries.rs index 1dae17386..72d33e69c 100644 --- a/async-raft/tests/conflict_with_empty_entries.rs +++ b/async-raft/tests/conflict_with_empty_entries.rs @@ -32,17 +32,14 @@ mod fixtures; /// - send `append_logs` message with conflicting prev_log_index and empty `entries`. /// - asserts that a response with ConflictOpt set. /// -/// RUST_LOG=async_raft,memstore,conflict_with_empty_entries=trace cargo test -p async-raft --test conflict_with_empty_entries +/// RUST_LOG=async_raft,memstore,conflict_with_empty_entries=trace cargo test -p async-raft --test +/// conflict_with_empty_entries #[tokio::test(flavor = "multi_thread", worker_threads = 6)] async fn conflict_with_empty_entries() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; diff --git a/async-raft/tests/current_leader.rs b/async-raft/tests/current_leader.rs index 10b3b9f8a..ddff785d5 100644 --- a/async-raft/tests/current_leader.rs +++ b/async-raft/tests/current_leader.rs @@ -21,11 +21,7 @@ async fn current_leader() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; router.new_raft_node(1).await; @@ -34,12 +30,8 @@ async fn current_leader() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0, 1, 2], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -47,9 +39,7 @@ async fn current_leader() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0, 1, 2], want, None, "init") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Get the ID of the leader, and assert that current_leader succeeds. @@ -58,12 +48,7 @@ async fn current_leader() -> Result<()> { for i in 0..3 { let leader = router.current_leader(i).await; - assert_eq!( - leader, - Some(0), - "expected leader to be node 0, got {:?}", - leader - ); + assert_eq!(leader, Some(0), "expected leader to be node 0, got {:?}", leader); } Ok(()) diff --git a/async-raft/tests/dynamic_membership.rs b/async-raft/tests/dynamic_membership.rs index 72f5ec2d3..83041488c 100644 --- a/async-raft/tests/dynamic_membership.rs +++ b/async-raft/tests/dynamic_membership.rs @@ -27,23 +27,15 @@ async fn dynamic_membership() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0], want, None, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -51,9 +43,7 @@ async fn dynamic_membership() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0], want, None, "init") - .await?; + router.wait_for_log(&hashset![0], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Sync some new nodes. @@ -76,14 +66,7 @@ async fn dynamic_membership() -> Result<()> { router.change_membership(0, hashset![0, 1, 2, 3, 4]).await?; want += 2; - router - .wait_for_log( - &hashset![0, 1, 2, 3, 4], - want, - None, - "cluster of 5 candidates", - ) - .await?; + router.wait_for_log(&hashset![0, 1, 2, 3, 4], want, None, "cluster of 5 candidates").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Still in term 1, so leader is still node 0. // Isolate old leader and assert that a new leader takes over. @@ -108,14 +91,9 @@ async fn dynamic_membership() -> Result<()> { let applied = metrics.last_applied; let leader_id = metrics.current_leader; - router - .assert_stable_cluster(Some(term), Some(applied)) - .await; + router.assert_stable_cluster(Some(term), Some(applied)).await; let leader = router.leader().await.expect("expected new leader"); - assert!( - leader != 0, - "expected new leader to be different from the old leader" - ); + assert!(leader != 0, "expected new leader to be different from the old leader"); // Restore isolated node. router.restore_node(0).await; @@ -128,18 +106,10 @@ async fn dynamic_membership() -> Result<()> { ) .await?; - router - .assert_stable_cluster(Some(term), Some(applied)) - .await; - - let current_leader = router - .leader() - .await - .expect("expected to find current leader"); - assert_eq!( - leader, current_leader, - "expected cluster leadership to stay the same" - ); + router.assert_stable_cluster(Some(term), Some(applied)).await; + + let current_leader = router.leader().await.expect("expected to find current leader"); + assert_eq!(leader, current_leader, "expected cluster leadership to stay the same"); Ok(()) } diff --git a/async-raft/tests/fixtures/mod.rs b/async-raft/tests/fixtures/mod.rs index 0cd052e38..9cbc54ff3 100644 --- a/async-raft/tests/fixtures/mod.rs +++ b/async-raft/tests/fixtures/mod.rs @@ -47,11 +47,9 @@ pub fn init_tracing() { let subscriber = tracing_subscriber::Registry::default() .with(tracing_subscriber::EnvFilter::from_default_env()) .with(fmt_layer); - tracing::subscriber::set_global_default(subscriber) - .expect("error setting global tracing subscriber"); + tracing::subscriber::set_global_default(subscriber).expect("error setting global tracing subscriber"); } -////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// /// A type which emulates a network transport and implements the `RaftNetwork` trait. @@ -91,10 +89,7 @@ impl Builder { impl RaftRouter { pub fn builder(config: Arc) -> Builder { - Builder { - config, - send_delay: 0, - } + Builder { config, send_delay: 0 } } /// Create a new instance. @@ -178,9 +173,7 @@ impl RaftRouter { /// Get a handle to the storage backend for the target node. pub async fn get_storage_handle(&self, node_id: &NodeId) -> Result> { let rt = self.routing_table.read().await; - let addr = rt - .get(node_id) - .with_context(|| format!("could not find node {} in routing table", node_id))?; + let addr = rt.get(node_id).with_context(|| format!("could not find node {} in routing table", node_id))?; let sto = addr.clone().1; Ok(sto) } @@ -204,9 +197,7 @@ impl RaftRouter { pub async fn wait(&self, node_id: &NodeId, timeout: Option) -> Result { let rt = self.routing_table.read().await; - let node = rt - .get(node_id) - .with_context(|| format!("node {} not found", node_id))?; + let node = rt.get(node_id).with_context(|| format!("node {} not found", node_id))?; Ok(node.0.wait(timeout)) } @@ -264,36 +255,22 @@ impl RaftRouter { nodes.remove(&id); } - pub async fn add_non_voter( - &self, - leader: NodeId, - target: NodeId, - ) -> Result<(), ChangeConfigError> { + pub async fn add_non_voter(&self, leader: NodeId, target: NodeId) -> Result<(), ChangeConfigError> { let rt = self.routing_table.read().await; - let node = rt - .get(&leader) - .unwrap_or_else(|| panic!("node with ID {} does not exist", leader)); + let node = rt.get(&leader).unwrap_or_else(|| panic!("node with ID {} does not exist", leader)); node.0.add_non_voter(target).await } - pub async fn change_membership( - &self, - leader: NodeId, - members: HashSet, - ) -> Result<(), ChangeConfigError> { + pub async fn change_membership(&self, leader: NodeId, members: HashSet) -> Result<(), ChangeConfigError> { let rt = self.routing_table.read().await; - let node = rt - .get(&leader) - .unwrap_or_else(|| panic!("node with ID {} does not exist", leader)); + let node = rt.get(&leader).unwrap_or_else(|| panic!("node with ID {} does not exist", leader)); node.0.change_membership(members).await } /// Send a client read request to the target node. pub async fn client_read(&self, target: NodeId) -> Result<(), ClientReadError> { let rt = self.routing_table.read().await; - let node = rt - .get(&target) - .unwrap_or_else(|| panic!("node with ID {} does not exist", target)); + let node = rt.get(&target).unwrap_or_else(|| panic!("node with ID {} does not exist", target)); node.0.client_read().await } @@ -313,9 +290,7 @@ impl RaftRouter { /// Request the current leader from the target node. pub async fn current_leader(&self, target: NodeId) -> Option { let rt = self.routing_table.read().await; - let node = rt - .get(&target) - .unwrap_or_else(|| panic!("node with ID {} does not exist", target)); + let node = rt.get(&target).unwrap_or_else(|| panic!("node with ID {} does not exist", target)); node.0.current_leader().await } @@ -332,16 +307,10 @@ impl RaftRouter { req: MemClientRequest, ) -> std::result::Result> { let rt = self.routing_table.read().await; - let node = rt - .get(&target) - .unwrap_or_else(|| panic!("node '{}' does not exist in routing table", target)); - node.0 - .client_write(ClientWriteRequest::new(req)) - .await - .map(|res| res.data) + let node = rt.get(&target).unwrap_or_else(|| panic!("node '{}' does not exist in routing table", target)); + node.0.client_write(ClientWriteRequest::new(req)).await.map(|res| res.data) } - ////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////// /// Assert that the cluster is in a pristine state, with all nodes as non-voters. @@ -398,18 +367,11 @@ impl RaftRouter { /// If `expected_last_log` is `Some`, then all nodes will be tested to ensure that their last /// log index and last applied log match the given value. Else, the leader's last_log_index /// will be used for the assertion. - pub async fn assert_stable_cluster( - &self, - expected_term: Option, - expected_last_log: Option, - ) { + pub async fn assert_stable_cluster(&self, expected_term: Option, expected_last_log: Option) { let isolated = self.isolated_nodes.read().await; let nodes = self.latest_metrics().await; - let non_isolated_nodes: Vec<_> = nodes - .iter() - .filter(|node| !isolated.contains(&node.id)) - .collect(); + let non_isolated_nodes: Vec<_> = nodes.iter().filter(|node| !isolated.contains(&node.id)).collect(); let leader = nodes .iter() .filter(|node| !isolated.contains(&node.id)) @@ -461,12 +423,7 @@ impl RaftRouter { "node {} has last_log_index {}, expected {}", node.id, node.last_log_index, expected_last_log ); - let mut members = node - .membership_config - .members - .iter() - .cloned() - .collect::>(); + let mut members = node.membership_config.members.iter().cloned().collect::>(); members.sort_unstable(); assert_eq!( members, all_nodes, @@ -493,10 +450,7 @@ impl RaftRouter { let rt = self.routing_table.read().await; for (id, (_node, storage)) in rt.iter() { let log = storage.get_log().await; - let last_log = log - .keys() - .last() - .unwrap_or_else(|| panic!("no last log found for node {}", id)); + let last_log = log.keys().last().unwrap_or_else(|| panic!("no last log found for node {}", id)); assert_eq!( last_log, &expect_last_log, "expected node {} to have last_log {}, got {}", @@ -577,9 +531,7 @@ impl RaftNetwork for RaftRouter { let rt = self.routing_table.read().await; let isolated = self.isolated_nodes.read().await; - let addr = rt - .get(&target) - .expect("target node not found in routing table"); + let addr = rt.get(&target).expect("target node not found in routing table"); if isolated.contains(&target) || isolated.contains(&rpc.leader_id) { return Err(anyhow!("target node is isolated")); } @@ -590,18 +542,12 @@ impl RaftNetwork for RaftRouter { } /// Send an InstallSnapshot RPC to the target Raft node (§7). - async fn install_snapshot( - &self, - target: u64, - rpc: InstallSnapshotRequest, - ) -> Result { + async fn install_snapshot(&self, target: u64, rpc: InstallSnapshotRequest) -> Result { self.rand_send_delay().await; let rt = self.routing_table.read().await; let isolated = self.isolated_nodes.read().await; - let addr = rt - .get(&target) - .expect("target node not found in routing table"); + let addr = rt.get(&target).expect("target node not found in routing table"); if isolated.contains(&target) || isolated.contains(&rpc.leader_id) { return Err(anyhow!("target node is isolated")); } @@ -614,9 +560,7 @@ impl RaftNetwork for RaftRouter { let rt = self.routing_table.read().await; let isolated = self.isolated_nodes.read().await; - let addr = rt - .get(&target) - .expect("target node not found in routing table"); + let addr = rt.get(&target).expect("target node not found in routing table"); if isolated.contains(&target) || isolated.contains(&rpc.candidate_id) { return Err(anyhow!("target node is isolated")); } diff --git a/async-raft/tests/initialization.rs b/async-raft/tests/initialization.rs index d5d9b1e1d..337d6b76c 100644 --- a/async-raft/tests/initialization.rs +++ b/async-raft/tests/initialization.rs @@ -16,8 +16,8 @@ use maplit::hashset; /// - asserts that they remain in non-voter state with no activity (they should be completely passive). /// - initializes the cluster with membership config including all nodes. /// - asserts that the cluster was able to come online, elect a leader and maintain a stable state. -/// - asserts that the leader was able to successfully commit its initial payload and that all -/// followers have successfully replicated the payload. +/// - asserts that the leader was able to successfully commit its initial payload and that all followers have +/// successfully replicated the payload. /// /// RUST_LOG=async_raft,memstore,initialization=trace cargo test -p async-raft --test initialization #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -25,11 +25,7 @@ async fn initialization() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; router.new_raft_node(1).await; @@ -38,12 +34,8 @@ async fn initialization() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0, 1, 2], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -51,9 +43,7 @@ async fn initialization() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0, 1, 2], want, None, "init") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; Ok(()) diff --git a/async-raft/tests/lagging_network_write.rs b/async-raft/tests/lagging_network_write.rs index 4e19421fd..19ac16591 100644 --- a/async-raft/tests/lagging_network_write.rs +++ b/async-raft/tests/lagging_network_write.rs @@ -39,12 +39,8 @@ async fn lagging_network_write() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0], want, timeout, "empty") - .await?; - router - .wait_for_state(&hashset![0], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0], want, timeout, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -52,12 +48,8 @@ async fn lagging_network_write() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0], want, timeout, "init") - .await?; - router - .wait_for_state(&hashset![0], State::Leader, None, "init") - .await?; + router.wait_for_log(&hashset![0], want, timeout, "init").await?; + router.wait_for_state(&hashset![0], State::Leader, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; // Sync some new nodes. @@ -67,33 +59,21 @@ async fn lagging_network_write() -> Result<()> { router.new_raft_node(2).await; router.add_non_voter(0, 2).await?; - router - .wait_for_log(&hashset![1, 2], want, timeout, "non-voter init") - .await?; + router.wait_for_log(&hashset![1, 2], want, timeout, "non-voter init").await?; router.client_request_many(0, "client", 1).await; want += 1; - router - .wait_for_log(&hashset![0, 1, 2], want, timeout, "write one log") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, timeout, "write one log").await?; router.change_membership(0, hashset![0, 1, 2]).await?; want += 2; - router - .wait_for_state(&hashset![0], State::Leader, None, "changed") - .await?; - router - .wait_for_state(&hashset![1, 2], State::Follower, None, "changed") - .await?; - router - .wait_for_log(&hashset![0, 1, 2], want, timeout, "3 candidates") - .await?; + router.wait_for_state(&hashset![0], State::Leader, None, "changed").await?; + router.wait_for_state(&hashset![1, 2], State::Follower, None, "changed").await?; + router.wait_for_log(&hashset![0, 1, 2], want, timeout, "3 candidates").await?; router.client_request_many(0, "client", 1).await; want += 1; - router - .wait_for_log(&hashset![0, 1, 2], want, timeout, "write 2nd log") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, timeout, "write 2nd log").await?; Ok(()) } diff --git a/async-raft/tests/members_012_to_234.rs b/async-raft/tests/members_012_to_234.rs index 151964251..4f5a8eade 100644 --- a/async-raft/tests/members_012_to_234.rs +++ b/async-raft/tests/members_012_to_234.rs @@ -10,7 +10,8 @@ use futures::stream::StreamExt; use maplit::hashset; /// Replace membership with another one with only one common node. -/// To reproduce the bug that new config does not actually examine the term/index of non-voter, but instead only examining the followers +/// To reproduce the bug that new config does not actually examine the term/index of non-voter, but instead only +/// examining the followers /// /// - bring a cluster of node 0,1,2 online. /// - isolate 3,4; change config to 2,3,4 @@ -22,23 +23,15 @@ async fn members_012_to_234() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0], want, None, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -46,9 +39,7 @@ async fn members_012_to_234() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0], want, None, "init") - .await?; + router.wait_for_log(&hashset![0], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(want)).await; tracing::info!("--- adding 4 new nodes to cluster"); @@ -76,9 +67,7 @@ async fn members_012_to_234() -> Result<()> { router.change_membership(0, hashset![0, 1, 2]).await?; want += 2; - router - .wait_for_log(&hashset![0, 1, 2], want, None, "cluster of 0,1,2") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "cluster of 0,1,2").await?; tracing::info!("--- changing config to 2,3,4"); { @@ -91,9 +80,7 @@ async fn members_012_to_234() -> Result<()> { } want += 1; - let wait_rst = router - .wait_for_log(&hashset![0], want, None, "cluster of joint") - .await; + let wait_rst = router.wait_for_log(&hashset![0], want, None, "cluster of joint").await; // the first step of joint should not pass because the new config can not constitute a quorum assert!(wait_rst.is_err()); diff --git a/async-raft/tests/metrics_state_machine_consistency.rs b/async-raft/tests/metrics_state_machine_consistency.rs index d7cb3aa62..5d84e60b1 100644 --- a/async-raft/tests/metrics_state_machine_consistency.rs +++ b/async-raft/tests/metrics_state_machine_consistency.rs @@ -16,17 +16,14 @@ mod fixtures; /// - write one log to the leader. /// - asserts that when metrics.last_applied is upto date, the state machine should be upto date too. /// -/// RUST_LOG=async_raft,memstore,metrics_state_machine_consistency=trace cargo test -p async-raft --test metrics_state_machine_consistency +/// RUST_LOG=async_raft,memstore,metrics_state_machine_consistency=trace cargo test -p async-raft --test +/// metrics_state_machine_consistency #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn metrics_state_machine_consistency() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; @@ -36,9 +33,7 @@ async fn metrics_state_machine_consistency() -> Result<()> { // Wait for node 0 to become leader. router.initialize_with(0, hashset![0]).await?; - router - .wait_for_state(&hashset![0], State::Leader, None, "init") - .await?; + router.wait_for_state(&hashset![0], State::Leader, None, "init").await?; tracing::info!("--- add one non-voter"); router.add_non_voter(0, 1).await?; @@ -51,16 +46,9 @@ async fn metrics_state_machine_consistency() -> Result<()> { tracing::info!("--- wait for log to sync"); let want = 2u64; for node_id in 0..2 { - router - .wait_for_log(&hashset![node_id], want, None, "write one log") - .await?; + router.wait_for_log(&hashset![node_id], want, None, "write one log").await?; let sto = router.get_storage_handle(&node_id).await?; - assert!(sto - .get_state_machine() - .await - .client_status - .get("foo") - .is_some()); + assert!(sto.get_state_machine().await.client_status.get("foo").is_some()); } Ok(()) diff --git a/async-raft/tests/metrics_wait.rs b/async-raft/tests/metrics_wait.rs index ccd642b01..a4cf8bf02 100644 --- a/async-raft/tests/metrics_wait.rs +++ b/async-raft/tests/metrics_wait.rs @@ -24,35 +24,19 @@ async fn metrics_wait() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); let cluster = hashset![0]; router.new_raft_node(0).await; router.initialize_with(0, cluster.clone()).await?; - router - .wait_for_state(&cluster, State::Leader, None, "init") - .await?; - router - .wait(&0, None) - .await? - .current_leader(0, "become leader") - .await?; - router - .wait_for_log(&cluster, 1, None, "initial log") - .await?; + router.wait_for_state(&cluster, State::Leader, None, "init").await?; + router.wait(&0, None).await?.current_leader(0, "become leader").await?; + router.wait_for_log(&cluster, 1, None, "initial log").await?; tracing::info!("--- wait and timeout"); - let rst = router - .wait(&0, Some(Duration::from_millis(200))) - .await? - .log(2, "timeout waiting for log 2") - .await; + let rst = router.wait(&0, Some(Duration::from_millis(200))).await?.log(2, "timeout waiting for log 2").await; match rst { Ok(_) => { diff --git a/async-raft/tests/non_voter_restart.rs b/async-raft/tests/non_voter_restart.rs index e9963cbf6..538799712 100644 --- a/async-raft/tests/non_voter_restart.rs +++ b/async-raft/tests/non_voter_restart.rs @@ -20,8 +20,8 @@ mod fixtures; /// /// - brings 2 nodes online: one leader and one non-voter. /// - write one log to the leader. -/// - asserts that the leader was able to successfully commit its initial payload and that the -/// non-voter has successfully replicated the payload. +/// - asserts that the leader was able to successfully commit its initial payload and that the non-voter has +/// successfully replicated the payload. /// - shutdown all and retstart the non-voter node. /// - asserts the non-voter stays in non-vtoer state. /// @@ -31,11 +31,7 @@ async fn non_voter_restart() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; @@ -44,12 +40,8 @@ async fn non_voter_restart() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0, 1], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0, 1], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0, 1], want, None, "empty").await?; + router.wait_for_state(&hashset![0, 1], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; tracing::info!("--- initializing single node cluster"); @@ -61,9 +53,7 @@ async fn non_voter_restart() -> Result<()> { router.client_request(0, "foo", 1).await; want += 1; - router - .wait_for_log(&hashset![0, 1], want, None, "write one log") - .await?; + router.wait_for_log(&hashset![0, 1], want, None, "write one log").await?; let (node0, _sto0) = router.remove_node(0).await.unwrap(); assert_node_state(0, &node0, 1, 2, State::Leader); @@ -81,13 +71,7 @@ async fn non_voter_restart() -> Result<()> { Ok(()) } -fn assert_node_state( - id: NodeId, - node: &MemRaft, - expected_term: u64, - expected_log: u64, - state: State, -) { +fn assert_node_state(id: NodeId, node: &MemRaft, expected_term: u64, expected_log: u64, state: State) { let m = node.metrics().borrow().clone(); tracing::info!("node {} metrics: {:?}", id, m); diff --git a/async-raft/tests/shutdown.rs b/async-raft/tests/shutdown.rs index aa09faa60..2e61ba7d9 100644 --- a/async-raft/tests/shutdown.rs +++ b/async-raft/tests/shutdown.rs @@ -14,8 +14,8 @@ use maplit::hashset; /// What does this test do? /// /// - this test builds upon the `initialization` test. -/// - after the cluster has been initialize, it performs a shutdown routine -/// on each node, asserting that the shutdown routine succeeded. +/// - after the cluster has been initialize, it performs a shutdown routine on each node, asserting that the shutdown +/// routine succeeded. /// /// RUST_LOG=async_raft,memstore,shutdown=trace cargo test -p async-raft --test shutdown #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -23,11 +23,7 @@ async fn initialization() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; router.new_raft_node(1).await; @@ -36,12 +32,8 @@ async fn initialization() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0, 1, 2], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -49,26 +41,15 @@ async fn initialization() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0, 1, 2], want, None, "init") - .await?; + router.wait_for_log(&hashset![0, 1, 2], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; tracing::info!("--- performing node shutdowns"); - let (node0, _) = router - .remove_node(0) - .await - .ok_or_else(|| anyhow!("failed to find node 0 in router"))?; + let (node0, _) = router.remove_node(0).await.ok_or_else(|| anyhow!("failed to find node 0 in router"))?; node0.shutdown().await?; - let (node1, _) = router - .remove_node(1) - .await - .ok_or_else(|| anyhow!("failed to find node 1 in router"))?; + let (node1, _) = router.remove_node(1).await.ok_or_else(|| anyhow!("failed to find node 1 in router"))?; node1.shutdown().await?; - let (node2, _) = router - .remove_node(2) - .await - .ok_or_else(|| anyhow!("failed to find node 2 in router"))?; + let (node2, _) = router.remove_node(2).await.ok_or_else(|| anyhow!("failed to find node 2 in router"))?; node2.shutdown().await?; Ok(()) diff --git a/async-raft/tests/singlenode.rs b/async-raft/tests/singlenode.rs index a946139ca..9907939a7 100644 --- a/async-raft/tests/singlenode.rs +++ b/async-raft/tests/singlenode.rs @@ -24,23 +24,15 @@ async fn singlenode() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0], want, None, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -48,17 +40,13 @@ async fn singlenode() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0], want, None, "init") - .await?; + router.wait_for_log(&hashset![0], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Write some data to the single node cluster. router.client_request_many(0, "0", 1000).await; router.assert_stable_cluster(Some(1), Some(1001)).await; - router - .assert_storage_state(1, 1001, Some(0), 1001, None) - .await; + router.assert_storage_state(1, 1001, Some(0), 1001, None).await; // Read some data from the single node cluster. router.client_read(0).await?; diff --git a/async-raft/tests/stepdown.rs b/async-raft/tests/stepdown.rs index 03125a1c2..5434af32b 100644 --- a/async-raft/tests/stepdown.rs +++ b/async-raft/tests/stepdown.rs @@ -16,8 +16,8 @@ use tokio::time::sleep; /// /// - create a stable 2-node cluster. /// - starts a config change which adds two new nodes and removes the leader. -/// - the leader should commit the change to C0 & C1 with separate majorities and then stepdown -/// after the config change is committed. +/// - the leader should commit the change to C0 & C1 with separate majorities and then stepdown after the config change +/// is committed. /// /// RUST_LOG=async_raft,memstore,stepdown=trace cargo test -p async-raft --test stepdown #[tokio::test(flavor = "multi_thread", worker_threads = 5)] @@ -25,11 +25,7 @@ async fn stepdown() -> Result<()> { fixtures::init_tracing(); // Setup test dependencies. - let config = Arc::new( - Config::build("test".into()) - .validate() - .expect("failed to build Raft config"), - ); + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; router.new_raft_node(1).await; @@ -37,12 +33,8 @@ async fn stepdown() -> Result<()> { let mut want = 0; // Assert all nodes are in non-voter state & have no entries. - router - .wait_for_log(&hashset![0, 1], want, None, "empty") - .await?; - router - .wait_for_state(&hashset![0, 1], State::NonVoter, None, "empty") - .await?; + router.wait_for_log(&hashset![0, 1], want, None, "empty").await?; + router.wait_for_state(&hashset![0, 1], State::NonVoter, None, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. @@ -50,34 +42,20 @@ async fn stepdown() -> Result<()> { router.initialize_from_single_node(0).await?; want += 1; - router - .wait_for_log(&hashset![0, 1], want, None, "init") - .await?; + router.wait_for_log(&hashset![0, 1], want, None, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Submit a config change which adds two new nodes and removes the current leader. - let orig_leader = router - .leader() - .await - .expect("expected the cluster to have a leader"); + let orig_leader = router.leader().await.expect("expected the cluster to have a leader"); assert_eq!(0, orig_leader, "expected original leader to be node 0"); router.new_raft_node(2).await; router.new_raft_node(3).await; - router - .change_membership(orig_leader, hashset![1, 2, 3]) - .await?; + router.change_membership(orig_leader, hashset![1, 2, 3]).await?; want += 2; for id in 0..4 { if id == orig_leader { - router - .wait_for_log( - &hashset![id], - want, - None, - "update membership: 1, 2, 3; old leader", - ) - .await?; + router.wait_for_log(&hashset![id], want, None, "update membership: 1, 2, 3; old leader").await?; } else { // a new leader elected and propose a log router @@ -147,12 +125,8 @@ async fn stepdown() -> Result<()> { tracing::info!("term: {}", metrics.current_term); tracing::info!("index: {}", metrics.last_log_index); assert!(metrics.current_term >= 2, "term incr when leader changes"); - router - .assert_stable_cluster(Some(metrics.current_term), Some(want)) - .await; - router - .assert_storage_state(metrics.current_term, want, None, 0, None) - .await; + router.assert_stable_cluster(Some(metrics.current_term), Some(want)).await; + router.assert_storage_state(metrics.current_term, want, None, 0, None).await; // ----------------------------------- ^^^ this is `0` instead of `4` because blank payloads from new leaders // and config change entries are never applied to the state machine. diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index def9329d3..d137bf476 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -253,11 +253,7 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self, data))] - async fn apply_entry_to_state_machine( - &self, - index: &u64, - data: &ClientRequest, - ) -> Result { + async fn apply_entry_to_state_machine(&self, index: &u64, data: &ClientRequest) -> Result { let mut sm = self.sm.write().await; sm.last_applied_log = *index; if let Some((serial, res)) = sm.client_serial_responses.get(&data.client) { @@ -265,11 +261,8 @@ impl RaftStorage for MemStore { return Ok(ClientResponse(res.clone())); } } - let previous = sm - .client_status - .insert(data.client.clone(), data.status.clone()); - sm.client_serial_responses - .insert(data.client.clone(), (data.serial, previous.clone())); + let previous = sm.client_status.insert(data.client.clone(), data.status.clone()); + sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone())); Ok(ClientResponse(previous)) } @@ -283,11 +276,8 @@ impl RaftStorage for MemStore { continue; } } - let previous = sm - .client_status - .insert(data.client.clone(), data.status.clone()); - sm.client_serial_responses - .insert(data.client.clone(), (data.serial, previous.clone())); + let previous = sm.client_status.insert(data.client.clone(), data.status.clone()); + sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone())); } Ok(()) } @@ -329,12 +319,7 @@ impl RaftStorage for MemStore { *log = log.split_off(&last_applied_log); log.insert( last_applied_log, - Entry::new_snapshot_pointer( - last_applied_log, - term, - "".into(), - membership_config.clone(), - ), + Entry::new_snapshot_pointer(last_applied_log, term, "".into(), membership_config.clone()), ); let snapshot = MemStoreSnapshot { @@ -347,10 +332,7 @@ impl RaftStorage for MemStore { *current_snapshot = Some(snapshot); } // Release log & snapshot write locks. - tracing::trace!( - { snapshot_size = snapshot_bytes.len() }, - "log compaction complete" - ); + tracing::trace!({ snapshot_size = snapshot_bytes.len() }, "log compaction complete"); Ok(CurrentSnapshotData { term, index: last_applied_log, @@ -361,7 +343,8 @@ impl RaftStorage for MemStore { #[tracing::instrument(level = "trace", skip(self))] async fn create_snapshot(&self) -> Result<(String, Box)> { - Ok((String::from(""), Box::new(Cursor::new(Vec::new())))) // Snapshot IDs are insignificant to this storage engine. + Ok((String::from(""), Box::new(Cursor::new(Vec::new())))) // Snapshot IDs are insignificant to this storage + // engine. } #[tracing::instrument(level = "trace", skip(self, snapshot))] @@ -400,10 +383,7 @@ impl RaftStorage for MemStore { } None => log.clear(), } - log.insert( - index, - Entry::new_snapshot_pointer(index, term, id, membership_config), - ); + log.insert(index, Entry::new_snapshot_pointer(index, term, id, membership_config)); } // Update the state machine. diff --git a/memstore/src/test.rs b/memstore/src/test.rs index 5aa977b65..8e8f46f3b 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -6,7 +6,6 @@ use super::*; const NODE_ID: u64 = 0; -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -55,7 +54,6 @@ async fn test_get_membership_config_with_previous_state() -> Result<()> { Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -69,18 +67,9 @@ async fn test_get_initial_state_default() -> Result<()> { let initial = store.get_initial_state().await?; - assert_eq!( - initial.last_log_index, 0, - "unexpected default value for last log index" - ); - assert_eq!( - initial.last_log_term, 0, - "unexpected default value for last log term" - ); - assert_eq!( - initial.last_applied_log, 0, - "unexpected value for last applied log" - ); + assert_eq!(initial.last_log_index, 0, "unexpected default value for last log index"); + assert_eq!(initial.last_log_term, 0, "unexpected default value for last log term"); + assert_eq!(initial.last_applied_log, 0, "unexpected value for last applied log"); assert_eq!( initial.hard_state, expected_hs, "unexpected value for default hard state" @@ -112,26 +101,13 @@ async fn test_get_initial_state_with_previous_state() -> Result<()> { let initial = store.get_initial_state().await?; - assert_eq!( - initial.last_log_index, 1, - "unexpected default value for last log index" - ); - assert_eq!( - initial.last_log_term, 1, - "unexpected default value for last log term" - ); - assert_eq!( - initial.last_applied_log, 1, - "unexpected value for last applied log" - ); - assert_eq!( - initial.hard_state, hs, - "unexpected value for default hard state" - ); + assert_eq!(initial.last_log_index, 1, "unexpected default value for last log index"); + assert_eq!(initial.last_log_term, 1, "unexpected default value for last log term"); + assert_eq!(initial.last_applied_log, 1, "unexpected value for last applied log"); + assert_eq!(initial.hard_state, hs, "unexpected value for default hard state"); Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -153,7 +129,6 @@ async fn test_save_hard_state() -> Result<()> { Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -180,7 +155,6 @@ async fn test_get_log_entries_returns_expected_entries() -> Result<()> { Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -228,7 +202,6 @@ async fn test_delete_logs_from_deletes_only_target_logs() -> Result<()> { Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -250,7 +223,6 @@ async fn test_append_entry_to_log() -> Result<()> { Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -272,7 +244,6 @@ async fn test_replicate_to_log() -> Result<()> { Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -293,16 +264,11 @@ async fn test_apply_entry_to_state_machine() -> Result<()> { "expected last_applied_log to be 1, got {}", sm.last_applied_log ); - let client_serial = sm - .client_serial_responses - .get("0") - .expect("expected entry to exist in client_serial_responses"); + let client_serial = + sm.client_serial_responses.get("0").expect("expected entry to exist in client_serial_responses"); assert_eq!(client_serial.0, 0, "unexpected client serial response"); assert_eq!(client_serial.1, None, "unexpected client serial response"); - let client_status = sm - .client_status - .get("0") - .expect("expected entry to exist in client_status"); + let client_status = sm.client_status.get("0").expect("expected entry to exist in client_status"); assert_eq!( client_status, "lit", "expected client_status to be 'lit', got '{}'", @@ -311,7 +277,6 @@ async fn test_apply_entry_to_state_machine() -> Result<()> { Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// #[tokio::test] @@ -358,14 +323,8 @@ async fn test_replicate_to_state_machine() -> Result<()> { .expect("expected entry to exist in client_serial_responses for client 2"); assert_eq!(client_serial2.0, 0, "unexpected client serial response"); assert_eq!(client_serial2.1, None, "unexpected client serial response"); - let client_status1 = sm - .client_status - .get("1") - .expect("expected entry to exist in client_status for client 1"); - let client_status2 = sm - .client_status - .get("2") - .expect("expected entry to exist in client_status for client 2"); + let client_status1 = sm.client_status.get("1").expect("expected entry to exist in client_status for client 1"); + let client_status2 = sm.client_status.get("2").expect("expected entry to exist in client_status for client 2"); assert_eq!( client_status1, "new", "expected client_status to be 'new', got '{}'", @@ -379,7 +338,6 @@ async fn test_replicate_to_state_machine() -> Result<()> { Ok(()) } -////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// fn default_store_with_logs() -> MemStore { diff --git a/rustfmt.toml b/rustfmt.toml index 777d945c3..7a13826dd 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,15 +1,12 @@ # unstable_features = true # edition = "2018" -# comment_width = 100 # fn_args_layout = "Compressed" -# max_width = 150 # use_small_heuristics = "Default" # use_try_shorthand = true # # pre-unstable -# chain_width = 75 # single_line_if_else_max_width = 75 # space_around_attr_eq = false # struct_lit_width = 50 @@ -25,7 +22,6 @@ # reorder_impl_items = true # struct_lit_single_line = true # use_field_init_shorthand = true -# wrap_comments = true reorder_imports = true @@ -35,3 +31,10 @@ where_single_line = true trailing_comma = "Vertical" report_fixme = "Unnumbered" overflow_delimited_expr = true +wrap_comments = true +comment_width = 120 +max_width = 120 + +# pre-unstable + +chain_width = 100