Skip to content

Commit

Permalink
Change: rename ReplicationMetrics and methods in MetricsChangeFlags
Browse files Browse the repository at this point in the history
- Change: rename ReplicationMetrics to ReplicationTargetMetrics

- Change: rename LeaderMetrics to ReplicationMetrics

- Refactor: MetricsChangeFlags provides 3 methods:
    `set_data_changed()`:  set change flag for raft log, state machine or snapshot.
    `set_replication_changed()` set change flag for replication state.
    `set_cluster_changed()` set change flag for leader, node role or membership config

- Refactor: fix metrics flag usages for different state changes.

- Part of #229
  • Loading branch information
drmingdrmer committed Apr 5, 2022
1 parent 98ccd4c commit ffc8268
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 70 deletions.
11 changes: 7 additions & 4 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use crate::error::InitializeError;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::error::MissingNodeInfo;
use crate::leader_metrics::RemoveTarget;
use crate::raft::AddLearnerResponse;
use crate::raft::ChangeMembers;
use crate::raft::ClientWriteResponse;
use crate::raft::EntryPayload;
use crate::raft::RaftRespTx;
use crate::raft_types::LogIdOptionExt;
use crate::replication_metrics::RemoveTarget;
use crate::versioned::Updatable;
use crate::LogId;
use crate::Membership;
Expand Down Expand Up @@ -275,7 +275,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
let payload = EntryPayload::Membership(mem.clone());
let entry = self.core.append_payload_to_log(payload).await?;

self.set_leader_metrics_changed();
self.core.metrics_flags.set_data_changed();

let cr_entry = ClientRequestEntry {
entry: Arc::new(entry),
Expand Down Expand Up @@ -326,7 +326,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
self.try_remove_replication(target);
}

self.set_leader_metrics_changed();
self.set_replication_metrics_changed();
}

/// Remove a replication if the membership that does not include it has committed.
Expand Down Expand Up @@ -356,7 +356,10 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
tracing::info!("removed replication to: {}", target);
self.nodes.remove(&target);

self.leader_metrics.update(RemoveTarget { target });
self.replication_metrics.update(RemoveTarget { target });
// TODO(xp): set_replication_metrics_changed() can be removed.
// Use self.replication_metrics.version to detect changes.
self.set_replication_metrics_changed();

true
}
Expand Down
7 changes: 4 additions & 3 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.set_target_state(State::Follower); // State update will emit metrics.
}

self.metrics_flags.set_changed_other();
self.metrics_flags.set_cluster_changed();
}

// Caveat: [commit-index must not advance the last known consistent log](https://datafuselabs.github.io/openraft/replication.html#caveat-commit-index-must-not-advance-the-last-known-consistent-log)
Expand Down Expand Up @@ -334,7 +334,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.committed,
self.last_applied
);
self.metrics_flags.set_changed_other();
// TODO(xp): this should be moved to upper level.
self.metrics_flags.set_data_changed();
return Ok(());
}

Expand All @@ -354,7 +355,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.last_applied = Some(last_log_id);

self.trigger_log_compaction_if_needed(false).await;
self.metrics_flags.set_changed_other();
self.metrics_flags.set_data_changed();
Ok(())
}
}
8 changes: 4 additions & 4 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
pub(super) async fn commit_initial_leader_entry(&mut self) -> Result<(), StorageError<C::NodeId>> {
let entry = self.core.append_payload_to_log(EntryPayload::Blank).await?;

self.set_leader_metrics_changed();
self.core.metrics_flags.set_data_changed();

let cr_entry = ClientRequestEntry {
entry: Arc::new(entry),
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
tx: Some(tx),
};

self.set_leader_metrics_changed();
self.core.metrics_flags.set_data_changed();

self.replicate_client_request(entry).await?;
Ok(())
Expand Down Expand Up @@ -227,7 +227,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
self.core.committed = Some(log_id);
tracing::debug!(?self.core.committed, "update committed, no need to replicate");

self.set_leader_metrics_changed();
self.core.metrics_flags.set_data_changed();
self.client_request_post_commit(req).await?;
} else {
self.awaiting_committed.push(req);
Expand Down Expand Up @@ -358,7 +358,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

// TODO(xp): deal with partial apply.
self.core.last_applied = Some(*log_id);
self.set_leader_metrics_changed();
self.core.metrics_flags.set_data_changed();

// TODO(xp) merge this function to replication_to_state_machine?

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.set_target_state(State::Follower); // State update will emit metrics.
}

self.metrics_flags.set_changed_other();
self.metrics_flags.set_data_changed();
}

// Compare current snapshot state with received RPC and handle as needed.
Expand Down Expand Up @@ -235,7 +235,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.update_membership(membership);

self.snapshot_last_log_id = self.last_applied;
self.metrics_flags.set_changed_other();
self.metrics_flags.set_data_changed();

Ok(())
}
Expand Down
24 changes: 12 additions & 12 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::error::ExtractFatal;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
use crate::error::InitializeError;
use crate::leader_metrics::LeaderMetrics;
use crate::metrics::RaftMetrics;
use crate::raft::AddLearnerResponse;
use crate::raft::Entry;
Expand All @@ -52,6 +51,7 @@ use crate::raft::VoteResponse;
use crate::raft_types::LogIdOptionExt;
use crate::replication::ReplicaEvent;
use crate::replication::ReplicationStream;
use crate::replication_metrics::ReplicationMetrics;
use crate::storage::RaftSnapshotBuilder;
use crate::versioned::Versioned;
use crate::vote::Vote;
Expand Down Expand Up @@ -152,7 +152,7 @@ impl<C: RaftTypeConfig> MessageSummary for EffectiveMembership<C> {

pub trait MetricsProvider<NID: NodeId> {
/// The default impl for the non-leader state
fn get_leader_metrics(&self) -> Option<&Versioned<LeaderMetrics<NID>>> {
fn get_leader_metrics(&self) -> Option<&Versioned<ReplicationMetrics<NID>>> {
None
}
}
Expand Down Expand Up @@ -302,7 +302,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// Fetch the most recent snapshot in the system.
if let Some(snapshot) = self.storage.get_current_snapshot().await? {
self.snapshot_last_log_id = Some(snapshot.meta.last_log_id);
self.metrics_flags.set_changed_other();
self.metrics_flags.set_data_changed();
}

let has_log = if self.last_log_id.is_some() {
Expand Down Expand Up @@ -395,7 +395,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "trace", skip(self))]
fn report_metrics(&self, leader_metrics: Update<Option<Versioned<LeaderMetrics<C::NodeId>>>>) {
fn report_metrics(&self, leader_metrics: Update<Option<Versioned<ReplicationMetrics<C::NodeId>>>>) {
let leader_metrics = match leader_metrics {
Update::Update(v) => v,
Update::AsIs => self.tx_metrics.borrow().leader_metrics.clone(),
Expand Down Expand Up @@ -507,7 +507,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
fn update_snapshot_state(&mut self, update: SnapshotUpdate<C>) {
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
self.snapshot_last_log_id = Some(log_id);
self.metrics_flags.set_changed_other();
self.metrics_flags.set_data_changed();
}
// If snapshot state is anything other than streaming, then drop it.
if let Some(state @ SnapshotState::Streaming { .. }) = self.snapshot_state.take() {
Expand Down Expand Up @@ -784,7 +784,7 @@ struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStora
pub(super) nodes: BTreeMap<C::NodeId, ReplicationState<C>>,

/// The metrics about a leader
pub leader_metrics: Versioned<LeaderMetrics<C::NodeId>>,
pub replication_metrics: Versioned<ReplicationMetrics<C::NodeId>>,

/// The stream of events coming from replication streams.
pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent<C, S::SnapshotData>, Span)>,
Expand All @@ -799,8 +799,8 @@ struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStora
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsProvider<C::NodeId>
for LeaderState<'a, C, N, S>
{
fn get_leader_metrics(&self) -> Option<&Versioned<LeaderMetrics<C::NodeId>>> {
Some(&self.leader_metrics)
fn get_leader_metrics(&self) -> Option<&Versioned<ReplicationMetrics<C::NodeId>>> {
Some(&self.replication_metrics)
}
}

Expand All @@ -811,7 +811,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
Self {
core,
nodes: BTreeMap::new(),
leader_metrics: Versioned::new(LeaderMetrics::default()),
replication_metrics: Versioned::new(ReplicationMetrics::default()),
replication_tx,
replication_rx,
awaiting_committed: Vec::new(),
Expand Down Expand Up @@ -851,7 +851,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
pub(self) async fn leader_loop(mut self) -> Result<(), Fatal<C::NodeId>> {
// report the leader metrics every time there came to a new leader
// if not `report_metrics` before the leader loop, the leader metrics may not be updated cause no coming event.
self.core.report_metrics(Update::Update(Some(self.leader_metrics.clone())));
self.core.report_metrics(Update::Update(Some(self.replication_metrics.clone())));

loop {
if !self.core.target_state.is_leader() {
Expand Down Expand Up @@ -935,8 +935,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
}

/// Report metrics with leader specific states.
pub fn set_leader_metrics_changed(&mut self) {
self.core.metrics_flags.set_changed_leader();
pub fn set_replication_metrics_changed(&mut self) {
self.core.metrics_flags.set_replication_changed();
}
}

Expand Down
15 changes: 7 additions & 8 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use crate::core::ReplicationState;
use crate::core::SnapshotState;
use crate::core::State;
use crate::error::AddLearnerError;
use crate::leader_metrics::UpdateMatchedLogId;
use crate::raft::AddLearnerResponse;
use crate::raft::RaftRespTx;
use crate::replication::RaftEvent;
use crate::replication::ReplicaEvent;
use crate::replication::ReplicationStream;
use crate::replication_metrics::UpdateMatchedLogId;
use crate::storage::Snapshot;
use crate::summary::MessageSummary;
use crate::versioned::Updatable;
Expand Down Expand Up @@ -130,11 +130,10 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
if self.try_remove_replication(target) {
// nothing to do
} else {
self.update_leader_metrics(target, matched);
self.update_replication_metrics(target, matched);
}

if Some(matched) <= self.core.committed {
self.set_leader_metrics_changed();
return Ok(());
}

Expand Down Expand Up @@ -172,19 +171,19 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
self.client_request_post_commit(request).await?;
}
}
}

// TODO(xp): does this update too frequently?
self.set_leader_metrics_changed();
self.core.metrics_flags.set_data_changed();
}

Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
fn update_leader_metrics(&mut self, target: C::NodeId, matched: LogId<C::NodeId>) {
fn update_replication_metrics(&mut self, target: C::NodeId, matched: LogId<C::NodeId>) {
tracing::debug!(%target, ?matched, "update_leader_metrics");

self.leader_metrics.update(UpdateMatchedLogId { target, matched });
self.replication_metrics.update(UpdateMatchedLogId { target, matched });
self.set_replication_metrics_changed();
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/leader_metrics_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::leader_metrics::LeaderMetrics;
use crate::leader_metrics::UpdateMatchedLogId;
use crate::replication_metrics::ReplicationMetrics;
use crate::replication_metrics::UpdateMatchedLogId;
use crate::versioned::Updatable;
use crate::versioned::Versioned;
use crate::LeaderId;
Expand All @@ -8,7 +8,7 @@ use crate::MessageSummary;

#[test]
fn test_versioned() -> anyhow::Result<()> {
let mut a = Versioned::new(LeaderMetrics::<u64> {
let mut a = Versioned::new(ReplicationMetrics::<u64> {
replication: Default::default(),
});

Expand Down Expand Up @@ -85,7 +85,7 @@ fn test_versioned() -> anyhow::Result<()> {

#[test]
fn test_versioned_methods() -> anyhow::Result<()> {
let mut a = Versioned::new(LeaderMetrics::<u64> {
let mut a = Versioned::new(ReplicationMetrics::<u64> {
replication: Default::default(),
});

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
mod config;
mod core;
mod defensive;
mod leader_metrics;
mod membership;
mod node;
mod raft_types;
mod replication;
mod replication_metrics;
mod storage_error;
mod store_ext;
mod store_wrapper;
Expand Down Expand Up @@ -64,7 +64,7 @@ pub use crate::raft_types::SnapshotId;
pub use crate::raft_types::SnapshotSegmentId;
pub use crate::raft_types::StateMachineChanges;
pub use crate::raft_types::Update;
pub use crate::replication::ReplicationMetrics;
pub use crate::replication::ReplicationTargetMetrics;
pub use crate::storage::RaftLogReader;
pub use crate::storage::RaftSnapshotBuilder;
pub use crate::storage::RaftStorage;
Expand Down
31 changes: 23 additions & 8 deletions openraft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use tokio::time::Instant;
use crate::core::EffectiveMembership;
use crate::core::State;
use crate::error::Fatal;
use crate::leader_metrics::LeaderMetrics;
use crate::raft_types::LogIdOptionExt;
use crate::replication_metrics::ReplicationMetrics;
use crate::versioned::Versioned;
use crate::LogId;
use crate::MessageSummary;
Expand All @@ -34,25 +34,40 @@ pub struct RaftMetrics<C: RaftTypeConfig> {

/// The ID of the Raft node.
pub id: C::NodeId,
/// The state of the Raft node.
pub state: State,

// ---
// --- data ---
// ---
/// The current term of the Raft node.
pub current_term: u64,

/// The last log index has been appended to this Raft node's log.
pub last_log_index: Option<u64>,

/// The last log index has been applied to this Raft node's state machine.
pub last_applied: Option<LogId<C::NodeId>>,
/// The current cluster leader.
pub current_leader: Option<C::NodeId>,
/// The current membership config of the cluster.
pub membership_config: Arc<EffectiveMembership<C>>,

/// The id of the last log included in snapshot.
/// If there is no snapshot, it is (0,0).
pub snapshot: Option<LogId<C::NodeId>>,

// ---
// --- cluster ---
// ---
/// The state of the Raft node.
pub state: State,

/// The current cluster leader.
pub current_leader: Option<C::NodeId>,

/// The current membership config of the cluster.
pub membership_config: Arc<EffectiveMembership<C>>,

// ---
// --- replication ---
// ---
/// The metrics about the leader. It is Some() only when this node is leader.
pub leader_metrics: Option<Versioned<LeaderMetrics<C::NodeId>>>,
pub leader_metrics: Option<Versioned<ReplicationMetrics<C::NodeId>>>,
}

impl<C: RaftTypeConfig> MessageSummary for RaftMetrics<C> {
Expand Down
Loading

0 comments on commit ffc8268

Please sign in to comment.