Skip to content

Commit

Permalink
Merge pull request #417 from drmingdrmer/5-metrics
Browse files Browse the repository at this point in the history
Change: `RaftMetrics` and `Wait` do not need type param `C`, but just a `NID: NodeId`
  • Loading branch information
mergify[bot] committed Jul 1, 2022
2 parents 9505ffb + d8a5d8b commit 54ebed1
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 53 deletions.
2 changes: 1 addition & 1 deletion example-raft-kv/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl ExampleClient {
/// Metrics contains various information about the cluster, such as current leader,
/// membership config, replication status etc.
/// See [`RaftMetrics`].
pub async fn metrics(&self) -> Result<RaftMetrics<ExampleTypeConfig>, RPCError<ExampleNodeId, Infallible>> {
pub async fn metrics(&self) -> Result<RaftMetrics<ExampleNodeId>, RPCError<ExampleNodeId, Infallible>> {
self.do_send_rpc_to_leader("metrics", None::<&()>).await
}

Expand Down
3 changes: 1 addition & 2 deletions example-raft-kv/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use web::Json;

use crate::app::ExampleApp;
use crate::ExampleNodeId;
use crate::ExampleTypeConfig;

// --- Cluster management

Expand Down Expand Up @@ -63,6 +62,6 @@ pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
pub async fn metrics(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
let metrics = app.raft.metrics().borrow().clone();

let res: Result<RaftMetrics<ExampleTypeConfig>, Infallible> = Ok(metrics);
let res: Result<RaftMetrics<ExampleNodeId>, Infallible> = Ok(metrics);
Ok(Json(res))
}
4 changes: 2 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<
pub(crate) tx_api: mpsc::UnboundedSender<(RaftMsg<C, N, S>, Span)>,
pub(crate) rx_api: mpsc::UnboundedReceiver<(RaftMsg<C, N, S>, Span)>,

tx_metrics: watch::Sender<RaftMetrics<C>>,
tx_metrics: watch::Sender<RaftMetrics<C::NodeId>>,

pub(crate) rx_shutdown: oneshot::Receiver<()>,
}
Expand All @@ -119,7 +119,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
storage: S,
tx_api: mpsc::UnboundedSender<(RaftMsg<C, N, S>, Span)>,
rx_api: mpsc::UnboundedReceiver<(RaftMsg<C, N, S>, Span)>,
tx_metrics: watch::Sender<RaftMetrics<C>>,
tx_metrics: watch::Sender<RaftMetrics<C::NodeId>>,
rx_shutdown: oneshot::Receiver<()>,
) -> JoinHandle<Result<(), Fatal<C::NodeId>>> {
//
Expand Down
24 changes: 12 additions & 12 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ use crate::core::ServerState;
use crate::error::Fatal;
use crate::membership::EffectiveMembership;
use crate::metrics::ReplicationMetrics;
use crate::raft::RaftTypeConfig;
use crate::summary::MessageSummary;
use crate::versioned::Versioned;
use crate::LogId;
use crate::NodeId;

/// A set of metrics describing the current state of a Raft node.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftMetrics<C: RaftTypeConfig> {
pub running_state: Result<(), Fatal<C::NodeId>>,
pub struct RaftMetrics<NID: NodeId> {
pub running_state: Result<(), Fatal<NID>>,

/// The ID of the Raft node.
pub id: C::NodeId,
pub id: NID,

// ---
// --- data ---
Expand All @@ -28,11 +28,11 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
pub last_log_index: Option<u64>,

/// The last log index has been applied to this Raft node's state machine.
pub last_applied: Option<LogId<C::NodeId>>,
pub last_applied: Option<LogId<NID>>,

/// The id of the last log included in snapshot.
/// If there is no snapshot, it is (0,0).
pub snapshot: Option<LogId<C::NodeId>>,
pub snapshot: Option<LogId<NID>>,

// ---
// --- cluster ---
Expand All @@ -41,19 +41,19 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
pub state: ServerState,

/// The current cluster leader.
pub current_leader: Option<C::NodeId>,
pub current_leader: Option<NID>,

/// The current membership config of the cluster.
pub membership_config: Arc<EffectiveMembership<C::NodeId>>,
pub membership_config: Arc<EffectiveMembership<NID>>,

// ---
// --- replication ---
// ---
/// The metrics about the leader. It is Some() only when this node is leader.
pub replication: Option<Versioned<ReplicationMetrics<C::NodeId>>>,
pub replication: Option<Versioned<ReplicationMetrics<NID>>>,
}

impl<C: RaftTypeConfig> MessageSummary<RaftMetrics<C>> for RaftMetrics<C> {
impl<NID: NodeId> MessageSummary<RaftMetrics<NID>> for RaftMetrics<NID> {
fn summary(&self) -> String {
format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, replication:{}",
self.id,
Expand All @@ -69,8 +69,8 @@ impl<C: RaftTypeConfig> MessageSummary<RaftMetrics<C>> for RaftMetrics<C> {
}
}

impl<C: RaftTypeConfig> RaftMetrics<C> {
pub fn new_initial(id: C::NodeId) -> Self {
impl<NID: NodeId> RaftMetrics<NID> {
pub fn new_initial(id: NID) -> Self {
Self {
running_state: Ok(()),
id,
Expand Down
30 changes: 13 additions & 17 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use tokio::time::Instant;

use crate::core::ServerState;
use crate::metrics::RaftMetrics;
use crate::raft::RaftTypeConfig;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::NodeId;

// Error variants related to metrics.
#[derive(Debug, thiserror::Error)]
Expand All @@ -22,16 +22,16 @@ pub enum WaitError {
}

/// Wait is a wrapper of RaftMetrics channel that impls several utils to wait for metrics to satisfy some condition.
pub struct Wait<C: RaftTypeConfig> {
pub struct Wait<NID: NodeId> {
pub timeout: Duration,
pub rx: watch::Receiver<RaftMetrics<C>>,
pub rx: watch::Receiver<RaftMetrics<NID>>,
}

impl<C: RaftTypeConfig> Wait<C> {
impl<NID: NodeId> Wait<NID> {
/// Wait for metrics to satisfy some condition or timeout.
#[tracing::instrument(level = "trace", skip(self, func), fields(msg=%msg.to_string()))]
pub async fn metrics<T>(&self, func: T, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError>
where T: Fn(&RaftMetrics<C>) -> bool + Send {
pub async fn metrics<T>(&self, func: T, msg: impl ToString) -> Result<RaftMetrics<NID>, WaitError>
where T: Fn(&RaftMetrics<NID>) -> bool + Send {
let timeout_at = Instant::now() + self.timeout;

let mut rx = self.rx.clone();
Expand Down Expand Up @@ -96,7 +96,7 @@ impl<C: RaftTypeConfig> Wait<C> {

/// Wait for `current_leader` to become `Some(leader_id)` until timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn current_leader(&self, leader_id: C::NodeId, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
pub async fn current_leader(&self, leader_id: NID, msg: impl ToString) -> Result<RaftMetrics<NID>, WaitError> {
self.metrics(
|x| x.current_leader == Some(leader_id),
&format!("{} .current_leader -> {}", msg.to_string(), leader_id),
Expand All @@ -106,7 +106,7 @@ impl<C: RaftTypeConfig> Wait<C> {

/// Wait until applied exactly `want_log`(inclusive) logs or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log(&self, want_log_index: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
pub async fn log(&self, want_log_index: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<NID>, WaitError> {
self.metrics(
|x| x.last_log_index == want_log_index,
&format!("{} .last_log_index -> {:?}", msg.to_string(), want_log_index),
Expand All @@ -122,7 +122,7 @@ impl<C: RaftTypeConfig> Wait<C> {

/// Wait until applied at least `want_log`(inclusive) logs or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log_at_least(&self, want_log: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
pub async fn log_at_least(&self, want_log: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<NID>, WaitError> {
self.metrics(
|x| x.last_log_index >= want_log,
&format!("{} .last_log_index >= {:?}", msg.to_string(), want_log),
Expand All @@ -138,7 +138,7 @@ impl<C: RaftTypeConfig> Wait<C> {

/// Wait for `state` to become `want_state` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result<RaftMetrics<NID>, WaitError> {
self.metrics(
|x| x.state == want_state,
&format!("{} .state -> {:?}", msg.to_string(), want_state),
Expand All @@ -150,9 +150,9 @@ impl<C: RaftTypeConfig> Wait<C> {
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn members(
&self,
want_members: BTreeSet<C::NodeId>,
want_members: BTreeSet<NID>,
msg: impl ToString,
) -> Result<RaftMetrics<C>, WaitError> {
) -> Result<RaftMetrics<NID>, WaitError> {
self.metrics(
|x| {
let got = x.membership_config.nodes().map(|(nid, _)| *nid).collect::<BTreeSet<_>>();
Expand All @@ -165,11 +165,7 @@ impl<C: RaftTypeConfig> Wait<C> {

/// Wait for `snapshot` to become `want_snapshot` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn snapshot(
&self,
want_snapshot: LogId<C::NodeId>,
msg: impl ToString,
) -> Result<RaftMetrics<C>, WaitError> {
pub async fn snapshot(&self, want_snapshot: LogId<NID>, msg: impl ToString) -> Result<RaftMetrics<NID>, WaitError> {
self.metrics(
|x| x.snapshot == Some(want_snapshot),
&format!("{} .snapshot -> {:?}", msg.to_string(), want_snapshot),
Expand Down
21 changes: 10 additions & 11 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,18 @@ use crate::membership::EffectiveMembership;
use crate::metrics::Wait;
use crate::metrics::WaitError;
use crate::raft_types::LogIdOptionExt;
use crate::testing::DummyConfig as Config;
use crate::LeaderId;
use crate::LogId;
use crate::Membership;
use crate::NodeId;
use crate::RaftMetrics;
use crate::RaftTypeConfig;

/// Test wait for different state changes
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_wait() -> anyhow::Result<()> {
{
// wait for leader
let (init, w, tx) = init_wait_test::<Config>();
let (init, w, tx) = init_wait_test::<u64>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
Expand All @@ -38,7 +37,7 @@ async fn test_wait() -> anyhow::Result<()> {

{
// wait for log
let (init, w, tx) = init_wait_test::<Config>();
let (init, w, tx) = init_wait_test::<u64>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
Expand Down Expand Up @@ -66,7 +65,7 @@ async fn test_wait() -> anyhow::Result<()> {

{
// wait for state
let (init, w, tx) = init_wait_test::<Config>();
let (init, w, tx) = init_wait_test::<u64>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
Expand All @@ -83,7 +82,7 @@ async fn test_wait() -> anyhow::Result<()> {

{
// wait for members
let (init, w, tx) = init_wait_test::<Config>();
let (init, w, tx) = init_wait_test::<u64>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
Expand All @@ -106,7 +105,7 @@ async fn test_wait() -> anyhow::Result<()> {

tracing::info!("--- wait for snapshot, Ok");
{
let (init, w, tx) = init_wait_test::<Config>();
let (init, w, tx) = init_wait_test::<u64>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
Expand All @@ -123,7 +122,7 @@ async fn test_wait() -> anyhow::Result<()> {

tracing::info!("--- wait for snapshot, only index matches");
{
let (init, w, tx) = init_wait_test::<Config>();
let (init, w, tx) = init_wait_test::<u64>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
Expand All @@ -148,7 +147,7 @@ async fn test_wait() -> anyhow::Result<()> {

{
// timeout
let (_init, w, _tx) = init_wait_test::<Config>();
let (_init, w, _tx) = init_wait_test::<u64>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(200)).await;
Expand All @@ -171,10 +170,10 @@ async fn test_wait() -> anyhow::Result<()> {

/// Build a initial state for testing of Wait:
/// Returns init metrics, Wait, and the tx to send an updated metrics.
fn init_wait_test<C: RaftTypeConfig>() -> (RaftMetrics<C>, Wait<C>, watch::Sender<RaftMetrics<C>>) {
fn init_wait_test<NID: NodeId>() -> (RaftMetrics<NID>, Wait<NID>, watch::Sender<RaftMetrics<NID>>) {
let init = RaftMetrics {
running_state: Ok(()),
id: C::NodeId::default(),
id: NID::default(),
state: ServerState::Learner,
current_term: 0,
last_log_index: None,
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ enum CoreState<NID: NodeId> {

struct RaftInner<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> {
tx_api: mpsc::UnboundedSender<(RaftMsg<C, N, S>, Span)>,
rx_metrics: watch::Receiver<RaftMetrics<C>>,
rx_metrics: watch::Receiver<RaftMetrics<C::NodeId>>,
// TODO(xp): it does not need to be a async mutex.
#[allow(clippy::type_complexity)]
tx_shutdown: Mutex<Option<oneshot::Sender<()>>>,
Expand Down Expand Up @@ -542,7 +542,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
}

/// Get a handle to the metrics channel.
pub fn metrics(&self) -> watch::Receiver<RaftMetrics<C>> {
pub fn metrics(&self) -> watch::Receiver<RaftMetrics<C::NodeId>> {
self.inner.rx_metrics.clone()
}

Expand All @@ -563,7 +563,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
/// // wait for raft state to become a follower
/// r.wait(None).state(State::Follower, "state").await?;
/// ```
pub fn wait(&self, timeout: Option<Duration>) -> Wait<C> {
pub fn wait(&self, timeout: Option<Duration>) -> Wait<C::NodeId> {
let timeout = match timeout {
Some(t) => t,
None => Duration::from_millis(500),
Expand Down
10 changes: 5 additions & 5 deletions openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ where
}

/// Get a payload of the latest metrics from each node in the cluster.
pub fn latest_metrics(&self) -> Vec<RaftMetrics<C>> {
pub fn latest_metrics(&self) -> Vec<RaftMetrics<C::NodeId>> {
let rt = self.routing_table.lock().unwrap();
let mut metrics = vec![];
for node in rt.values() {
Expand All @@ -394,7 +394,7 @@ where
metrics
}

pub fn get_metrics(&self, node_id: &C::NodeId) -> Result<RaftMetrics<C>> {
pub fn get_metrics(&self, node_id: &C::NodeId) -> Result<RaftMetrics<C::NodeId>> {
let node = self.get_raft_handle(node_id)?;
let metrics = node.metrics().borrow().clone();
Ok(metrics)
Expand Down Expand Up @@ -426,16 +426,16 @@ where
func: T,
timeout: Option<Duration>,
msg: &str,
) -> Result<RaftMetrics<C>>
) -> Result<RaftMetrics<C::NodeId>>
where
T: Fn(&RaftMetrics<C>) -> bool + Send,
T: Fn(&RaftMetrics<C::NodeId>) -> bool + Send,
{
let wait = self.wait(node_id, timeout);
let rst = wait.metrics(func, format!("node-{} {}", node_id, msg)).await?;
Ok(rst)
}

pub fn wait(&self, node_id: &C::NodeId, timeout: Option<Duration>) -> Wait<C> {
pub fn wait(&self, node_id: &C::NodeId, timeout: Option<Duration>) -> Wait<C::NodeId> {
let node = {
let rt = self.routing_table.lock().unwrap();
rt.get(node_id).expect("target node not found in routing table").clone().0
Expand Down

0 comments on commit 54ebed1

Please sign in to comment.