Skip to content

Commit

Permalink
feature: add metrics about leader
Browse files Browse the repository at this point in the history
In LeaderState it also report metrics about the replication to other node when report metrics.

When switched to other state, LeaderState will be destroyed as long as
the cached replication metrics.

Other state report an `None` to raft core to override the previous
metrics data.

At some point the raft core, without knonwning the state, just report
metrics with an `Update::Ignore`, to indicate that leave replication
metrics intact.
  • Loading branch information
drmingdrmer committed Jul 1, 2021
1 parent 4d1a03c commit 32a67e2
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 24 deletions.
15 changes: 9 additions & 6 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
};
let cr_entry = ClientRequestEntry::from_entry(entry, tx_joint);
self.replicate_client_request(cr_entry).await;
self.core.report_metrics();
self.leader_report_metrics();

// Setup channels for eventual response to the 2-phase config change.
let (tx_cfg_change, rx_cfg_change) = oneshot::channel();
Expand Down Expand Up @@ -282,7 +282,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let entry = self.append_payload_to_log(payload.entry).await?;
let cr_entry = ClientRequestEntry::from_entry(entry, tx_uniform);
self.replicate_client_request(cr_entry).await;
self.core.report_metrics();
self.leader_report_metrics();

// Setup channel for eventual commitment of the uniform consensus config.
self.uniform_consensus_cb.push(rx_uniform); // Receiver for when the uniform consensus is committed.
Expand Down Expand Up @@ -325,13 +325,16 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
tracing::debug!("membership: {:?}", self.core.membership);
tracing::debug!("nodes_to_remove: {:?}", nodes_to_remove);

for node in nodes_to_remove {
tracing::debug!({ target = node }, "removing target node from replication pool");
if let Some(node) = self.nodes.remove(&node) {
for target in nodes_to_remove {
tracing::debug!(target, "removing target node from replication pool");
if let Some(node) = self.nodes.remove(&target) {
let _ = node.replstream.repl_tx.send(RaftEvent::Terminate);

// remove metrics entry
self.leader_metrics.replication.remove(&target);
}
}
self.core.report_metrics();
self.leader_report_metrics();
Ok(())
}
}
11 changes: 6 additions & 5 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::AppData;
use crate::AppDataResponse;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::Update;

impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
/// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
Expand Down Expand Up @@ -66,7 +67,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
self.report_metrics(Update::Ignore);
}
return Ok(AppendEntriesResponse {
term: self.current_term,
Expand All @@ -92,7 +93,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// specified index yet. Use the last known index & term as a conflict opt.
None => {
if report_metrics {
self.report_metrics();
self.report_metrics(Update::Ignore);
}
return Ok(AppendEntriesResponse {
term: self.current_term,
Expand Down Expand Up @@ -143,7 +144,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}),
};
if report_metrics {
self.report_metrics();
self.report_metrics(Update::Ignore);
}
return Ok(AppendEntriesResponse {
term: self.current_term,
Expand All @@ -159,7 +160,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
self.report_metrics(Update::Ignore);
}
Ok(AppendEntriesResponse {
term: self.current_term,
Expand Down Expand Up @@ -246,7 +247,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if entries.is_empty() {
if let Some(index) = last_entry_seen {
self.last_applied = index;
self.report_metrics();
self.report_metrics(Update::Ignore);
}
return;
}
Expand Down
8 changes: 4 additions & 4 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.last_log_term = self.core.current_term; // This only ever needs to be updated once per term.
let cr_entry = ClientRequestEntry::from_entry(entry, tx_payload_committed);
self.replicate_client_request(cr_entry).await;
self.core.report_metrics();
self.leader_report_metrics();

// Setup any callbacks needed for responding to commitment of a pending config.
if let Some(is_in_joint_consensus) = pending_config {
Expand Down Expand Up @@ -286,7 +286,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
} else {
// Else, there are no voting nodes for replication, so the payload is now committed.
self.core.commit_index = entry_arc.index;
self.core.report_metrics();
self.leader_report_metrics();
self.client_request_post_commit(req).await;
}

Expand Down Expand Up @@ -332,7 +332,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
ClientOrInternalResponseTx::Internal(tx) => {
self.core.last_applied = req.entry.index;
self.core.report_metrics();
self.leader_report_metrics();
let _ = tx.send(Ok(req.entry.index));
}
}
Expand Down Expand Up @@ -395,7 +395,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
});
self.core.last_applied = *index;
self.core.report_metrics();
self.leader_report_metrics();
res
}
}
3 changes: 2 additions & 1 deletion async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::AppData;
use crate::AppDataResponse;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::Update;

impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
/// Invoked by leader to send chunks of a snapshot to a follower (§7).
Expand Down Expand Up @@ -56,7 +57,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

if report_metrics {
self.report_metrics();
self.report_metrics(Update::Ignore);
}

// Compare current snapshot state with received RPC and handle as needed.
Expand Down
33 changes: 26 additions & 7 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::error::ClientWriteError;
use crate::error::InitializeError;
use crate::error::RaftError;
use crate::error::RaftResult;
use crate::metrics::LeaderMetrics;
use crate::metrics::RaftMetrics;
use crate::raft::ChangeMembershipTx;
use crate::raft::ClientReadResponseTx;
Expand All @@ -54,6 +55,7 @@ use crate::AppDataResponse;
use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::Update;

/// The core type implementing the Raft protocol.
pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
Expand Down Expand Up @@ -269,7 +271,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "trace", skip(self))]
fn report_metrics(&mut self) {
fn report_metrics(&mut self, leader_metrics: Update<Option<&LeaderMetrics>>) {
let leader_metrics = match leader_metrics {
Update::Update(v) => v.cloned(),
Update::Ignore => self.tx_metrics.borrow().leader_metrics.clone(),
};

let res = self.tx_metrics.send(RaftMetrics {
id: self.id,
state: self.target_state,
Expand All @@ -278,9 +285,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_applied: self.last_applied,
current_leader: self.current_leader,
membership_config: self.membership.clone(),
leader_metrics,
});

if let Err(err) = res {
tracing::error!({error=%err, id=self.id}, "error reporting metrics");
tracing::error!(error=%err, id=self.id, "error reporting metrics");
}
}

Expand Down Expand Up @@ -463,7 +472,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if let Some(last_applied) = last_applied_opt {
self.last_applied = last_applied;
}
self.report_metrics();
self.report_metrics(Update::Ignore);
self.trigger_log_compaction_if_needed();
Ok(())
}
Expand Down Expand Up @@ -592,6 +601,9 @@ struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: Raf
/// A bool indicating if this node will be stepping down after committing the current config change.
pub(super) is_stepping_down: bool,

/// The metrics about a leader
pub leader_metrics: LeaderMetrics,

/// The stream of events coming from replication streams.
pub(super) replicationrx: mpsc::UnboundedReceiver<ReplicaEvent<S::Snapshot>>,
/// The clonable sender channel for replication stream events.
Expand Down Expand Up @@ -623,6 +635,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
nodes: BTreeMap::new(),
non_voters: BTreeMap::new(),
is_stepping_down: false,
leader_metrics: LeaderMetrics::default(),
replicationtx,
replicationrx,
consensus_state,
Expand Down Expand Up @@ -653,7 +666,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.last_heartbeat = None;
self.core.next_election_timeout = None;
self.core.update_current_leader(UpdateCurrentLeader::ThisNode);
self.core.report_metrics();
self.leader_report_metrics();

// Per §8, commit an initial entry as part of becoming the cluster leader.
self.commit_initial_leader_entry().await?;
Expand Down Expand Up @@ -726,6 +739,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
}

/// Report metrics with leader specific states.
#[tracing::instrument(level = "trace", skip(self))]
pub fn leader_report_metrics(&mut self) {
self.core.report_metrics(Update::Update(Some(&self.leader_metrics)));
}
}

/// A struct tracking the state of a replication stream from the perspective of the Raft actor.
Expand Down Expand Up @@ -834,7 +853,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.voted_for = Some(self.core.id);
self.core.update_current_leader(UpdateCurrentLeader::Unknown);
self.core.save_hard_state().await?;
self.core.report_metrics();
self.core.report_metrics(Update::Update(None));

// Send RPCs to all members in parallel.
let mut pending_votes = self.spawn_parallel_vote_requests();
Expand Down Expand Up @@ -901,7 +920,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// Run the follower loop.
#[tracing::instrument(level="trace", skip(self), fields(id=self.core.id, raft_state="follower"))]
pub(self) async fn run(self) -> RaftResult<()> {
self.core.report_metrics();
self.core.report_metrics(Update::Update(None));
loop {
if !self.core.target_state.is_follower() {
return Ok(());
Expand Down Expand Up @@ -962,7 +981,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// Run the non-voter loop.
#[tracing::instrument(level="trace", skip(self), fields(id=self.core.id, raft_state="non-voter"))]
pub(self) async fn run(mut self) -> RaftResult<()> {
self.core.report_metrics();
self.core.report_metrics(Update::Update(None));
loop {
if !self.core.target_state.is_non_voter() {
return Ok(());
Expand Down
25 changes: 24 additions & 1 deletion async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use crate::replication::ReplicationStream;
use crate::storage::CurrentSnapshotData;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::ReplicationMetrics;

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
/// Spawn a new replication stream returning its replication state handle.
Expand Down Expand Up @@ -148,13 +150,22 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return Ok(());
}

self.update_leader_metrics(target, match_term, match_index);

// Drop replication stream if needed.
// TODO(xp): is it possible to merge the two node remove routines?
// here and that in handle_uniform_consensus_committed()
if needs_removal {
if let Some(node) = self.nodes.remove(&target) {
let _ = node.replstream.repl_tx.send(RaftEvent::Terminate);

// remove metrics entry
self.leader_metrics.replication.remove(&target);
}
}

// TODO(xp): simplify commit condition check

// Determine the new commit index of the current membership config nodes.
let indices_c0 = self.get_match_indexes(&self.core.membership.members);
tracing::debug!("indices_c0: {:?}", indices_c0);
Expand Down Expand Up @@ -207,11 +218,23 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.client_request_post_commit(request).await;
}
}
self.core.report_metrics();
}

// TODO(xp): does this update too frequently?
self.leader_report_metrics();
Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
fn update_leader_metrics(&mut self, target: NodeId, match_term: u64, match_index: u64) {
self.leader_metrics.replication.insert(target, ReplicationMetrics {
matched: LogId {
term: match_term,
index: match_index,
},
});
}

/// Extract the matching index/term of the replication state of specified nodes.
fn get_match_indexes(&self, node_ids: &HashSet<NodeId>) -> Vec<(u64, u64)> {
tracing::debug!("to get match indexes of nodes: {:?}", node_ids);
Expand Down
4 changes: 4 additions & 0 deletions async-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod metrics;
mod metrics_wait_test;
pub mod network;
pub mod raft;
mod raft_types;
mod replication;
pub mod storage;

Expand All @@ -27,6 +28,9 @@ pub use crate::error::RaftError;
pub use crate::metrics::RaftMetrics;
pub use crate::network::RaftNetwork;
pub use crate::raft::Raft;
pub use crate::raft_types::LogId;
pub use crate::raft_types::Update;
pub use crate::replication::ReplicationMetrics;
pub use crate::storage::RaftStorage;

/// A Raft node's ID.
Expand Down
13 changes: 13 additions & 0 deletions async-raft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! Metrics are observed on a running Raft node via the `Raft::metrics()` method, which will
//! return a stream of metrics.

use std::collections::HashMap;
use std::collections::HashSet;

use serde::Deserialize;
Expand All @@ -19,6 +20,7 @@ use crate::core::State;
use crate::raft::MembershipConfig;
use crate::NodeId;
use crate::RaftError;
use crate::ReplicationMetrics;

/// A set of metrics describing the current state of a Raft node.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -37,6 +39,16 @@ pub struct RaftMetrics {
pub current_leader: Option<NodeId>,
/// The current membership config of the cluster.
pub membership_config: MembershipConfig,

/// The metrics about the leader. It is Some() only when this node is leader.
pub leader_metrics: Option<LeaderMetrics>,
}

/// The metrics about the leader. It is Some() only when this node is leader.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct LeaderMetrics {
/// Replication metrics of all known replication target: voters and non-voters
pub replication: HashMap<NodeId, ReplicationMetrics>,
}

impl RaftMetrics {
Expand All @@ -50,6 +62,7 @@ impl RaftMetrics {
last_applied: 0,
current_leader: None,
membership_config,
leader_metrics: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions async-raft/src/metrics_wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ fn init_wait_test() -> (RaftMetrics, Wait, watch::Sender<RaftMetrics>) {
members: Default::default(),
members_after_consensus: None,
},
leader_metrics: None,
};
let (tx, rx) = watch::channel(init.clone());
let w = Wait {
Expand Down

0 comments on commit 32a67e2

Please sign in to comment.