diff --git a/async-raft/src/core/admin.rs b/async-raft/src/core/admin.rs index e19285c73..629c5777d 100644 --- a/async-raft/src/core/admin.rs +++ b/async-raft/src/core/admin.rs @@ -166,7 +166,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage }; let cr_entry = ClientRequestEntry::from_entry(entry, tx_joint); self.replicate_client_request(cr_entry).await; - self.core.report_metrics(); + self.leader_report_metrics(); // Setup channels for eventual response to the 2-phase config change. let (tx_cfg_change, rx_cfg_change) = oneshot::channel(); @@ -282,7 +282,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let entry = self.append_payload_to_log(payload.entry).await?; let cr_entry = ClientRequestEntry::from_entry(entry, tx_uniform); self.replicate_client_request(cr_entry).await; - self.core.report_metrics(); + self.leader_report_metrics(); // Setup channel for eventual commitment of the uniform consensus config. self.uniform_consensus_cb.push(rx_uniform); // Receiver for when the uniform consensus is committed. @@ -325,13 +325,16 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage tracing::debug!("membership: {:?}", self.core.membership); tracing::debug!("nodes_to_remove: {:?}", nodes_to_remove); - for node in nodes_to_remove { - tracing::debug!({ target = node }, "removing target node from replication pool"); - if let Some(node) = self.nodes.remove(&node) { + for target in nodes_to_remove { + tracing::debug!(target, "removing target node from replication pool"); + if let Some(node) = self.nodes.remove(&target) { let _ = node.replstream.repl_tx.send(RaftEvent::Terminate); + + // remove metrics entry + self.leader_metrics.replication.remove(&target); } } - self.core.report_metrics(); + self.leader_report_metrics(); Ok(()) } } diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 7c9f9fe2e..4b055df22 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -11,6 +11,7 @@ use crate::AppData; use crate::AppDataResponse; use crate::RaftNetwork; use crate::RaftStorage; +use crate::Update; impl, S: RaftStorage> RaftCore { /// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2). @@ -66,7 +67,7 @@ impl, S: RaftStorage> Ra self.append_log_entries(&msg.entries).await?; self.replicate_to_state_machine_if_needed(msg.entries).await; if report_metrics { - self.report_metrics(); + self.report_metrics(Update::Ignore); } return Ok(AppendEntriesResponse { term: self.current_term, @@ -92,7 +93,7 @@ impl, S: RaftStorage> Ra // specified index yet. Use the last known index & term as a conflict opt. None => { if report_metrics { - self.report_metrics(); + self.report_metrics(Update::Ignore); } return Ok(AppendEntriesResponse { term: self.current_term, @@ -143,7 +144,7 @@ impl, S: RaftStorage> Ra }), }; if report_metrics { - self.report_metrics(); + self.report_metrics(Update::Ignore); } return Ok(AppendEntriesResponse { term: self.current_term, @@ -159,7 +160,7 @@ impl, S: RaftStorage> Ra self.append_log_entries(&msg.entries).await?; self.replicate_to_state_machine_if_needed(msg.entries).await; if report_metrics { - self.report_metrics(); + self.report_metrics(Update::Ignore); } Ok(AppendEntriesResponse { term: self.current_term, @@ -246,7 +247,7 @@ impl, S: RaftStorage> Ra if entries.is_empty() { if let Some(index) = last_entry_seen { self.last_applied = index; - self.report_metrics(); + self.report_metrics(Update::Ignore); } return; } diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index 496903964..d4886068c 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -95,7 +95,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage self.core.last_log_term = self.core.current_term; // This only ever needs to be updated once per term. let cr_entry = ClientRequestEntry::from_entry(entry, tx_payload_committed); self.replicate_client_request(cr_entry).await; - self.core.report_metrics(); + self.leader_report_metrics(); // Setup any callbacks needed for responding to commitment of a pending config. if let Some(is_in_joint_consensus) = pending_config { @@ -286,7 +286,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } else { // Else, there are no voting nodes for replication, so the payload is now committed. self.core.commit_index = entry_arc.index; - self.core.report_metrics(); + self.leader_report_metrics(); self.client_request_post_commit(req).await; } @@ -332,7 +332,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } ClientOrInternalResponseTx::Internal(tx) => { self.core.last_applied = req.entry.index; - self.core.report_metrics(); + self.leader_report_metrics(); let _ = tx.send(Ok(req.entry.index)); } } @@ -395,7 +395,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } }); self.core.last_applied = *index; - self.core.report_metrics(); + self.leader_report_metrics(); res } } diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index 92171f539..f42f03eba 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -14,6 +14,7 @@ use crate::AppData; use crate::AppDataResponse; use crate::RaftNetwork; use crate::RaftStorage; +use crate::Update; impl, S: RaftStorage> RaftCore { /// Invoked by leader to send chunks of a snapshot to a follower (§7). @@ -56,7 +57,7 @@ impl, S: RaftStorage> Ra } if report_metrics { - self.report_metrics(); + self.report_metrics(Update::Ignore); } // Compare current snapshot state with received RPC and handle as needed. diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index 251ebad5d..29e320b2b 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -36,6 +36,7 @@ use crate::error::ClientWriteError; use crate::error::InitializeError; use crate::error::RaftError; use crate::error::RaftResult; +use crate::metrics::LeaderMetrics; use crate::metrics::RaftMetrics; use crate::raft::ChangeMembershipTx; use crate::raft::ClientReadResponseTx; @@ -54,6 +55,7 @@ use crate::AppDataResponse; use crate::NodeId; use crate::RaftNetwork; use crate::RaftStorage; +use crate::Update; /// The core type implementing the Raft protocol. pub struct RaftCore, S: RaftStorage> { @@ -269,7 +271,12 @@ impl, S: RaftStorage> Ra /// Report a metrics payload on the current state of the Raft node. #[tracing::instrument(level = "trace", skip(self))] - fn report_metrics(&mut self) { + fn report_metrics(&mut self, leader_metrics: Update>) { + let leader_metrics = match leader_metrics { + Update::Update(v) => v.cloned(), + Update::Ignore => self.tx_metrics.borrow().leader_metrics.clone(), + }; + let res = self.tx_metrics.send(RaftMetrics { id: self.id, state: self.target_state, @@ -278,9 +285,11 @@ impl, S: RaftStorage> Ra last_applied: self.last_applied, current_leader: self.current_leader, membership_config: self.membership.clone(), + leader_metrics, }); + if let Err(err) = res { - tracing::error!({error=%err, id=self.id}, "error reporting metrics"); + tracing::error!(error=%err, id=self.id, "error reporting metrics"); } } @@ -463,7 +472,7 @@ impl, S: RaftStorage> Ra if let Some(last_applied) = last_applied_opt { self.last_applied = last_applied; } - self.report_metrics(); + self.report_metrics(Update::Ignore); self.trigger_log_compaction_if_needed(); Ok(()) } @@ -592,6 +601,9 @@ struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: Raf /// A bool indicating if this node will be stepping down after committing the current config change. pub(super) is_stepping_down: bool, + /// The metrics about a leader + pub leader_metrics: LeaderMetrics, + /// The stream of events coming from replication streams. pub(super) replicationrx: mpsc::UnboundedReceiver>, /// The clonable sender channel for replication stream events. @@ -623,6 +635,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage nodes: BTreeMap::new(), non_voters: BTreeMap::new(), is_stepping_down: false, + leader_metrics: LeaderMetrics::default(), replicationtx, replicationrx, consensus_state, @@ -653,7 +666,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage self.core.last_heartbeat = None; self.core.next_election_timeout = None; self.core.update_current_leader(UpdateCurrentLeader::ThisNode); - self.core.report_metrics(); + self.leader_report_metrics(); // Per §8, commit an initial entry as part of becoming the cluster leader. self.commit_initial_leader_entry().await?; @@ -726,6 +739,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } } + + /// Report metrics with leader specific states. + #[tracing::instrument(level = "trace", skip(self))] + pub fn leader_report_metrics(&mut self) { + self.core.report_metrics(Update::Update(Some(&self.leader_metrics))); + } } /// A struct tracking the state of a replication stream from the perspective of the Raft actor. @@ -834,7 +853,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage self.core.voted_for = Some(self.core.id); self.core.update_current_leader(UpdateCurrentLeader::Unknown); self.core.save_hard_state().await?; - self.core.report_metrics(); + self.core.report_metrics(Update::Update(None)); // Send RPCs to all members in parallel. let mut pending_votes = self.spawn_parallel_vote_requests(); @@ -901,7 +920,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Run the follower loop. #[tracing::instrument(level="trace", skip(self), fields(id=self.core.id, raft_state="follower"))] pub(self) async fn run(self) -> RaftResult<()> { - self.core.report_metrics(); + self.core.report_metrics(Update::Update(None)); loop { if !self.core.target_state.is_follower() { return Ok(()); @@ -962,7 +981,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Run the non-voter loop. #[tracing::instrument(level="trace", skip(self), fields(id=self.core.id, raft_state="non-voter"))] pub(self) async fn run(mut self) -> RaftResult<()> { - self.core.report_metrics(); + self.core.report_metrics(Update::Update(None)); loop { if !self.core.target_state.is_non_voter() { return Ok(()); diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 929539e81..685560ba7 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -16,9 +16,11 @@ use crate::replication::ReplicationStream; use crate::storage::CurrentSnapshotData; use crate::AppData; use crate::AppDataResponse; +use crate::LogId; use crate::NodeId; use crate::RaftNetwork; use crate::RaftStorage; +use crate::ReplicationMetrics; impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> LeaderState<'a, D, R, N, S> { /// Spawn a new replication stream returning its replication state handle. @@ -148,13 +150,22 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage return Ok(()); } + self.update_leader_metrics(target, match_term, match_index); + // Drop replication stream if needed. + // TODO(xp): is it possible to merge the two node remove routines? + // here and that in handle_uniform_consensus_committed() if needs_removal { if let Some(node) = self.nodes.remove(&target) { let _ = node.replstream.repl_tx.send(RaftEvent::Terminate); + + // remove metrics entry + self.leader_metrics.replication.remove(&target); } } + // TODO(xp): simplify commit condition check + // Determine the new commit index of the current membership config nodes. let indices_c0 = self.get_match_indexes(&self.core.membership.members); tracing::debug!("indices_c0: {:?}", indices_c0); @@ -207,11 +218,23 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage self.client_request_post_commit(request).await; } } - self.core.report_metrics(); } + + // TODO(xp): does this update too frequently? + self.leader_report_metrics(); Ok(()) } + #[tracing::instrument(level = "debug", skip(self))] + fn update_leader_metrics(&mut self, target: NodeId, match_term: u64, match_index: u64) { + self.leader_metrics.replication.insert(target, ReplicationMetrics { + matched: LogId { + term: match_term, + index: match_index, + }, + }); + } + /// Extract the matching index/term of the replication state of specified nodes. fn get_match_indexes(&self, node_ids: &HashSet) -> Vec<(u64, u64)> { tracing::debug!("to get match indexes of nodes: {:?}", node_ids); diff --git a/async-raft/src/lib.rs b/async-raft/src/lib.rs index 3e026b569..e35265e3d 100644 --- a/async-raft/src/lib.rs +++ b/async-raft/src/lib.rs @@ -8,6 +8,7 @@ pub mod metrics; mod metrics_wait_test; pub mod network; pub mod raft; +mod raft_types; mod replication; pub mod storage; @@ -27,6 +28,9 @@ pub use crate::error::RaftError; pub use crate::metrics::RaftMetrics; pub use crate::network::RaftNetwork; pub use crate::raft::Raft; +pub use crate::raft_types::LogId; +pub use crate::raft_types::Update; +pub use crate::replication::ReplicationMetrics; pub use crate::storage::RaftStorage; /// A Raft node's ID. diff --git a/async-raft/src/metrics.rs b/async-raft/src/metrics.rs index 057cdf947..eabb3184a 100644 --- a/async-raft/src/metrics.rs +++ b/async-raft/src/metrics.rs @@ -7,6 +7,7 @@ //! Metrics are observed on a running Raft node via the `Raft::metrics()` method, which will //! return a stream of metrics. +use std::collections::HashMap; use std::collections::HashSet; use serde::Deserialize; @@ -19,6 +20,7 @@ use crate::core::State; use crate::raft::MembershipConfig; use crate::NodeId; use crate::RaftError; +use crate::ReplicationMetrics; /// A set of metrics describing the current state of a Raft node. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -37,6 +39,16 @@ pub struct RaftMetrics { pub current_leader: Option, /// The current membership config of the cluster. pub membership_config: MembershipConfig, + + /// The metrics about the leader. It is Some() only when this node is leader. + pub leader_metrics: Option, +} + +/// The metrics about the leader. It is Some() only when this node is leader. +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct LeaderMetrics { + /// Replication metrics of all known replication target: voters and non-voters + pub replication: HashMap, } impl RaftMetrics { @@ -50,6 +62,7 @@ impl RaftMetrics { last_applied: 0, current_leader: None, membership_config, + leader_metrics: None, } } } diff --git a/async-raft/src/metrics_wait_test.rs b/async-raft/src/metrics_wait_test.rs index ec0914bd6..4a677c23d 100644 --- a/async-raft/src/metrics_wait_test.rs +++ b/async-raft/src/metrics_wait_test.rs @@ -136,6 +136,7 @@ fn init_wait_test() -> (RaftMetrics, Wait, watch::Sender) { members: Default::default(), members_after_consensus: None, }, + leader_metrics: None, }; let (tx, rx) = watch::channel(init.clone()); let w = Wait { diff --git a/async-raft/src/raft_types.rs b/async-raft/src/raft_types.rs new file mode 100644 index 000000000..7185ae62e --- /dev/null +++ b/async-raft/src/raft_types.rs @@ -0,0 +1,17 @@ +use serde::Deserialize; +use serde::Serialize; + +/// The identity of a raft log. +/// A term and an index identifies an log globally. +#[derive(Debug, Default, Clone, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] +pub struct LogId { + pub term: u64, + pub index: u64, +} + +// An update action with option to update with some value or just ignore this update. +#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] +pub enum Update { + Update(T), + Ignore, +} diff --git a/async-raft/src/replication/mod.rs b/async-raft/src/replication/mod.rs index 02dc6b230..e74e8c118 100644 --- a/async-raft/src/replication/mod.rs +++ b/async-raft/src/replication/mod.rs @@ -4,6 +4,8 @@ use std::io::SeekFrom; use std::sync::Arc; use futures::future::FutureExt; +use serde::Deserialize; +use serde::Serialize; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::AsyncSeek; @@ -26,10 +28,16 @@ use crate::raft::InstallSnapshotRequest; use crate::storage::CurrentSnapshotData; use crate::AppData; use crate::AppDataResponse; +use crate::LogId; use crate::NodeId; use crate::RaftNetwork; use crate::RaftStorage; +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ReplicationMetrics { + pub matched: LogId, +} + /// The public handle to a spawned replication stream. pub(crate) struct ReplicationStream { /// The spawn handle the `ReplicationCore` task. diff --git a/async-raft/tests/leader_metrics.rs b/async-raft/tests/leader_metrics.rs new file mode 100644 index 000000000..8c64f4a0b --- /dev/null +++ b/async-raft/tests/leader_metrics.rs @@ -0,0 +1,180 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use async_raft::raft::VoteRequest; +use async_raft::Config; +use async_raft::LogId; +use async_raft::RaftNetwork; +use async_raft::ReplicationMetrics; +use async_raft::State; +use fixtures::RaftRouter; +use futures::stream::StreamExt; +use maplit::hashmap; +use maplit::hashset; + +mod fixtures; + +/// Cluster leader_metrics test. +/// +/// What does this test do? +/// +/// - brings 5 nodes online: one leader and 4 non-voter. +/// - add 4 non-voter as follower. +/// - asserts that the leader was able to successfully commit logs and that the followers has successfully replicated +/// the payload. +/// - remove one folower: node-4 +/// - asserts node-4 becomes non-voter and the leader stops sending logs to it. +/// +/// RUST_LOG=async_raft,memstore,leader_metrics=trace cargo test -p async-raft --test leader_metrics +#[tokio::test(flavor = "multi_thread", worker_threads = 6)] +async fn leader_metrics() -> Result<()> { + fixtures::init_tracing(); + + let timeout = Some(Duration::from_millis(1000)); + let all_members = hashset![0, 1, 2, 3, 4]; + let left_members = hashset![0, 1, 2, 3]; + + // Setup test dependencies. + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); + let router = Arc::new(RaftRouter::new(config.clone())); + router.new_raft_node(0).await; + + // Assert all nodes are in non-voter state & have no entries. + let mut want = 0; + router.wait_for_log(&hashset![0], want, timeout, "init").await?; + router.wait_for_state(&hashset![0], State::NonVoter, timeout, "init").await?; + + router.assert_pristine_cluster().await; + + tracing::info!("--- initializing cluster"); + + router.initialize_from_single_node(0).await?; + want += 1; + + router.wait_for_log(&hashset![0], want, timeout, "init cluster").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; + + router + .wait_for_metrics( + &0, + |x| { + if let Some(ref q) = x.leader_metrics { + q.replication.is_empty() + } else { + false + } + }, + timeout, + "no replication with 1 node cluster", + ) + .await?; + + // Sync some new nodes. + router.new_raft_node(1).await; + router.new_raft_node(2).await; + router.new_raft_node(3).await; + router.new_raft_node(4).await; + + tracing::info!("--- adding 4 new nodes to cluster"); + + let mut new_nodes = futures::stream::FuturesUnordered::new(); + new_nodes.push(router.add_non_voter(0, 1)); + new_nodes.push(router.add_non_voter(0, 2)); + new_nodes.push(router.add_non_voter(0, 3)); + new_nodes.push(router.add_non_voter(0, 4)); + while let Some(inner) = new_nodes.next().await { + inner?; + } + + router.wait_for_log(&all_members, want, timeout, "add non-voter 1,2,3,4").await?; + + tracing::info!("--- changing cluster config to 012"); + router.change_membership(0, all_members.clone()).await?; + want += 2; // 2 member-change logs + + router.wait_for_log(&all_members, want, timeout, "change members to 0,1,2,3,4").await?; + + router.assert_stable_cluster(Some(1), Some(want)).await; // Still in term 1, so leader is still node 0. + + let ww = ReplicationMetrics { + matched: LogId { term: 1, index: want }, + }; + let want_repl = hashmap! { 1=>ww.clone(), 2=>ww.clone(), 3=>ww.clone(), 4=>ww.clone(), }; + router + .wait_for_metrics( + &0, + |x| { + if let Some(ref q) = x.leader_metrics { + q.replication == want_repl + } else { + false + } + }, + timeout, + "replication metrics to 4 nodes", + ) + .await?; + + // Send some requests + router.client_request_many(0, "client", 10).await; + want += 10; + + // Remove Node 4 + tracing::info!("--- remove n{}", 4); + router.change_membership(0, left_members.clone()).await?; + want += 2; // two member-change logs + + router + .wait_for_metrics( + &4, + |x| x.state == State::NonVoter, + timeout, + &format!("n{}.state -> {:?}", 4, State::NonVoter), + ) + .await?; + + router.wait_for_log(&left_members, want, timeout, "removed node 4").await?; + + let ww = ReplicationMetrics { + matched: LogId { term: 1, index: want }, + }; + let want_repl = hashmap! { 1=>ww.clone(), 2=>ww.clone(), 3=>ww.clone()}; + router + .wait_for_metrics( + &0, + |x| { + if let Some(ref q) = x.leader_metrics { + q.replication == want_repl + } else { + false + } + }, + timeout, + "replication metrics to 3 nodes", + ) + .await?; + + tracing::info!("--- take leadership of node 0"); + router + .vote(0, VoteRequest { + term: 100, + candidate_id: 100, + last_log_index: 100, + last_log_term: 10, + }) + .await?; + + router.wait_for_state(&hashset![0], State::Candidate, timeout, "node 0 to candidate").await?; + + router + .wait_for_metrics( + &0, + |x| x.leader_metrics.is_none(), + timeout, + "node 0 should close all replication", + ) + .await?; + + Ok(()) +}