diff --git a/example-raft-kv/src/client.rs b/example-raft-kv/src/client.rs index 3c04df610..54e1085a2 100644 --- a/example-raft-kv/src/client.rs +++ b/example-raft-kv/src/client.rs @@ -114,7 +114,7 @@ impl ExampleClient { /// Metrics contains various information about the cluster, such as current leader, /// membership config, replication status etc. /// See [`RaftMetrics`]. - pub async fn metrics(&self) -> Result, RPCError> { + pub async fn metrics(&self) -> Result, RPCError> { self.do_send_rpc_to_leader("metrics", None::<&()>).await } diff --git a/example-raft-kv/src/network/management.rs b/example-raft-kv/src/network/management.rs index 7002ffa52..bdb2f7c44 100644 --- a/example-raft-kv/src/network/management.rs +++ b/example-raft-kv/src/network/management.rs @@ -13,7 +13,6 @@ use web::Json; use crate::app::ExampleApp; use crate::ExampleNodeId; -use crate::ExampleTypeConfig; // --- Cluster management @@ -63,6 +62,6 @@ pub async fn init(app: Data) -> actix_web::Result { pub async fn metrics(app: Data) -> actix_web::Result { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result, Infallible> = Ok(metrics); Ok(Json(res)) } diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index d50aaffe2..572c48cb9 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -106,7 +106,7 @@ pub struct RaftCore, S: RaftStorage< pub(crate) tx_api: mpsc::UnboundedSender<(RaftMsg, Span)>, pub(crate) rx_api: mpsc::UnboundedReceiver<(RaftMsg, Span)>, - tx_metrics: watch::Sender>, + tx_metrics: watch::Sender>, pub(crate) rx_shutdown: oneshot::Receiver<()>, } @@ -119,7 +119,7 @@ impl, S: RaftStorage> RaftCore, Span)>, rx_api: mpsc::UnboundedReceiver<(RaftMsg, Span)>, - tx_metrics: watch::Sender>, + tx_metrics: watch::Sender>, rx_shutdown: oneshot::Receiver<()>, ) -> JoinHandle>> { // diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index b42840e69..fbc2bc9e8 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -4,19 +4,19 @@ use crate::core::ServerState; use crate::error::Fatal; use crate::membership::EffectiveMembership; use crate::metrics::ReplicationMetrics; -use crate::raft::RaftTypeConfig; use crate::summary::MessageSummary; use crate::versioned::Versioned; use crate::LogId; +use crate::NodeId; /// A set of metrics describing the current state of a Raft node. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct RaftMetrics { - pub running_state: Result<(), Fatal>, +pub struct RaftMetrics { + pub running_state: Result<(), Fatal>, /// The ID of the Raft node. - pub id: C::NodeId, + pub id: NID, // --- // --- data --- @@ -28,11 +28,11 @@ pub struct RaftMetrics { pub last_log_index: Option, /// The last log index has been applied to this Raft node's state machine. - pub last_applied: Option>, + pub last_applied: Option>, /// The id of the last log included in snapshot. /// If there is no snapshot, it is (0,0). - pub snapshot: Option>, + pub snapshot: Option>, // --- // --- cluster --- @@ -41,19 +41,19 @@ pub struct RaftMetrics { pub state: ServerState, /// The current cluster leader. - pub current_leader: Option, + pub current_leader: Option, /// The current membership config of the cluster. - pub membership_config: Arc>, + pub membership_config: Arc>, // --- // --- replication --- // --- /// The metrics about the leader. It is Some() only when this node is leader. - pub replication: Option>>, + pub replication: Option>>, } -impl MessageSummary> for RaftMetrics { +impl MessageSummary> for RaftMetrics { fn summary(&self) -> String { format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, replication:{}", self.id, @@ -69,8 +69,8 @@ impl MessageSummary> for RaftMetrics { } } -impl RaftMetrics { - pub fn new_initial(id: C::NodeId) -> Self { +impl RaftMetrics { + pub fn new_initial(id: NID) -> Self { Self { running_state: Ok(()), id, diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 887faed04..983add9e0 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -6,10 +6,10 @@ use tokio::time::Instant; use crate::core::ServerState; use crate::metrics::RaftMetrics; -use crate::raft::RaftTypeConfig; use crate::LogId; use crate::LogIdOptionExt; use crate::MessageSummary; +use crate::NodeId; // Error variants related to metrics. #[derive(Debug, thiserror::Error)] @@ -22,16 +22,16 @@ pub enum WaitError { } /// Wait is a wrapper of RaftMetrics channel that impls several utils to wait for metrics to satisfy some condition. -pub struct Wait { +pub struct Wait { pub timeout: Duration, - pub rx: watch::Receiver>, + pub rx: watch::Receiver>, } -impl Wait { +impl Wait { /// Wait for metrics to satisfy some condition or timeout. #[tracing::instrument(level = "trace", skip(self, func), fields(msg=%msg.to_string()))] - pub async fn metrics(&self, func: T, msg: impl ToString) -> Result, WaitError> - where T: Fn(&RaftMetrics) -> bool + Send { + pub async fn metrics(&self, func: T, msg: impl ToString) -> Result, WaitError> + where T: Fn(&RaftMetrics) -> bool + Send { let timeout_at = Instant::now() + self.timeout; let mut rx = self.rx.clone(); @@ -96,7 +96,7 @@ impl Wait { /// Wait for `current_leader` to become `Some(leader_id)` until timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn current_leader(&self, leader_id: C::NodeId, msg: impl ToString) -> Result, WaitError> { + pub async fn current_leader(&self, leader_id: NID, msg: impl ToString) -> Result, WaitError> { self.metrics( |x| x.current_leader == Some(leader_id), &format!("{} .current_leader -> {}", msg.to_string(), leader_id), @@ -106,7 +106,7 @@ impl Wait { /// Wait until applied exactly `want_log`(inclusive) logs or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn log(&self, want_log_index: Option, msg: impl ToString) -> Result, WaitError> { + pub async fn log(&self, want_log_index: Option, msg: impl ToString) -> Result, WaitError> { self.metrics( |x| x.last_log_index == want_log_index, &format!("{} .last_log_index -> {:?}", msg.to_string(), want_log_index), @@ -122,7 +122,7 @@ impl Wait { /// Wait until applied at least `want_log`(inclusive) logs or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn log_at_least(&self, want_log: Option, msg: impl ToString) -> Result, WaitError> { + pub async fn log_at_least(&self, want_log: Option, msg: impl ToString) -> Result, WaitError> { self.metrics( |x| x.last_log_index >= want_log, &format!("{} .last_log_index >= {:?}", msg.to_string(), want_log), @@ -138,7 +138,7 @@ impl Wait { /// Wait for `state` to become `want_state` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result, WaitError> { + pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result, WaitError> { self.metrics( |x| x.state == want_state, &format!("{} .state -> {:?}", msg.to_string(), want_state), @@ -150,9 +150,9 @@ impl Wait { #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn members( &self, - want_members: BTreeSet, + want_members: BTreeSet, msg: impl ToString, - ) -> Result, WaitError> { + ) -> Result, WaitError> { self.metrics( |x| { let got = x.membership_config.nodes().map(|(nid, _)| *nid).collect::>(); @@ -165,11 +165,7 @@ impl Wait { /// Wait for `snapshot` to become `want_snapshot` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn snapshot( - &self, - want_snapshot: LogId, - msg: impl ToString, - ) -> Result, WaitError> { + pub async fn snapshot(&self, want_snapshot: LogId, msg: impl ToString) -> Result, WaitError> { self.metrics( |x| x.snapshot == Some(want_snapshot), &format!("{} .snapshot -> {:?}", msg.to_string(), want_snapshot), diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index d801f8e11..4c2f7e6f9 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -10,19 +10,18 @@ use crate::membership::EffectiveMembership; use crate::metrics::Wait; use crate::metrics::WaitError; use crate::raft_types::LogIdOptionExt; -use crate::testing::DummyConfig as Config; use crate::LeaderId; use crate::LogId; use crate::Membership; +use crate::NodeId; use crate::RaftMetrics; -use crate::RaftTypeConfig; /// Test wait for different state changes #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn test_wait() -> anyhow::Result<()> { { // wait for leader - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -38,7 +37,7 @@ async fn test_wait() -> anyhow::Result<()> { { // wait for log - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -66,7 +65,7 @@ async fn test_wait() -> anyhow::Result<()> { { // wait for state - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -83,7 +82,7 @@ async fn test_wait() -> anyhow::Result<()> { { // wait for members - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -106,7 +105,7 @@ async fn test_wait() -> anyhow::Result<()> { tracing::info!("--- wait for snapshot, Ok"); { - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -123,7 +122,7 @@ async fn test_wait() -> anyhow::Result<()> { tracing::info!("--- wait for snapshot, only index matches"); { - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -148,7 +147,7 @@ async fn test_wait() -> anyhow::Result<()> { { // timeout - let (_init, w, _tx) = init_wait_test::(); + let (_init, w, _tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(200)).await; @@ -171,10 +170,10 @@ async fn test_wait() -> anyhow::Result<()> { /// Build a initial state for testing of Wait: /// Returns init metrics, Wait, and the tx to send an updated metrics. -fn init_wait_test() -> (RaftMetrics, Wait, watch::Sender>) { +fn init_wait_test() -> (RaftMetrics, Wait, watch::Sender>) { let init = RaftMetrics { running_state: Ok(()), - id: C::NodeId::default(), + id: NID::default(), state: ServerState::Learner, current_term: 0, last_log_index: None, diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 3c450585f..96d52a461 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -119,7 +119,7 @@ enum CoreState { struct RaftInner, S: RaftStorage> { tx_api: mpsc::UnboundedSender<(RaftMsg, Span)>, - rx_metrics: watch::Receiver>, + rx_metrics: watch::Receiver>, // TODO(xp): it does not need to be a async mutex. #[allow(clippy::type_complexity)] tx_shutdown: Mutex>>, @@ -542,7 +542,7 @@ impl, S: RaftStorage> Raft watch::Receiver> { + pub fn metrics(&self) -> watch::Receiver> { self.inner.rx_metrics.clone() } @@ -563,7 +563,7 @@ impl, S: RaftStorage> Raft) -> Wait { + pub fn wait(&self, timeout: Option) -> Wait { let timeout = match timeout { Some(t) => t, None => Duration::from_millis(500), diff --git a/openraft/tests/fixtures/mod.rs b/openraft/tests/fixtures/mod.rs index fdc87aab0..0deb26741 100644 --- a/openraft/tests/fixtures/mod.rs +++ b/openraft/tests/fixtures/mod.rs @@ -385,7 +385,7 @@ where } /// Get a payload of the latest metrics from each node in the cluster. - pub fn latest_metrics(&self) -> Vec> { + pub fn latest_metrics(&self) -> Vec> { let rt = self.routing_table.lock().unwrap(); let mut metrics = vec![]; for node in rt.values() { @@ -394,7 +394,7 @@ where metrics } - pub fn get_metrics(&self, node_id: &C::NodeId) -> Result> { + pub fn get_metrics(&self, node_id: &C::NodeId) -> Result> { let node = self.get_raft_handle(node_id)?; let metrics = node.metrics().borrow().clone(); Ok(metrics) @@ -426,16 +426,16 @@ where func: T, timeout: Option, msg: &str, - ) -> Result> + ) -> Result> where - T: Fn(&RaftMetrics) -> bool + Send, + T: Fn(&RaftMetrics) -> bool + Send, { let wait = self.wait(node_id, timeout); let rst = wait.metrics(func, format!("node-{} {}", node_id, msg)).await?; Ok(rst) } - pub fn wait(&self, node_id: &C::NodeId, timeout: Option) -> Wait { + pub fn wait(&self, node_id: &C::NodeId, timeout: Option) -> Wait { let node = { let rt = self.routing_table.lock().unwrap(); rt.get(node_id).expect("target node not found in routing table").clone().0