Skip to content

Commit

Permalink
Feature: use a version to track metics change
Browse files Browse the repository at this point in the history
- Add `Versioned<D>` to track changes of an `Arc<D>`.

  In openraft, some frequently updated object such metrics are wrapped
  in an `Arc`, and some modification is made in place: by storing an
  `AtomicU64`.

  Thus we can not tell whether an `Arc<D>` is changed by comparing them
  with `==` any more.

  In order to determine whether to broadcast a metrics instance, we need
  an additional `version` to track the changes applied to the `Arc<D>`.
  These are all included in the `Versioned<D>`.

- Add trait `Update` to define variant update operation to apply to
  `Versioned<D>`.

  `Update` has to implement two methods:
  - `apply_in_place()` to apply a modification in place if possible
  - and `apply_mut()` if it has to apply modification to a cloned
    instance.

  `Update` will increment the `Versioned.version` by 1 after each
  update.

- Reimplement `LeaderMetrics` with `Versioned`.

- Change: type of `LeaderMetrics.replication` from HashMap to BTreeMap
  for easing test.

commit-id:2432f7a0
  • Loading branch information
drmingdrmer committed Mar 4, 2022
1 parent 518cc63 commit 966eb28
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 63 deletions.
8 changes: 5 additions & 3 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use crate::error::InitializeError;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::error::NodeIdNotInNodes;
use crate::leader_metrics::RemoveTarget;
use crate::membership::EitherNodesOrIds;
use crate::raft::AddLearnerResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::EntryPayload;
use crate::raft::RaftRespTx;
use crate::raft_types::LogIdOptionExt;
use crate::versioned::Updatable;
use crate::LogId;
use crate::Membership;
use crate::Node;
Expand Down Expand Up @@ -337,9 +339,9 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

tracing::info!("removed replication to: {}", target);
self.nodes.remove(&target);
let mut metrics_clone = self.leader_metrics.as_ref().clone();
metrics_clone.replication.remove(&target);
self.leader_metrics = Arc::new(metrics_clone);

self.leader_metrics.update(RemoveTarget { target });

true
}
}
9 changes: 5 additions & 4 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::error::ExtractFatal;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
use crate::error::InitializeError;
use crate::metrics::LeaderMetrics;
use crate::leader_metrics::LeaderMetrics;
use crate::metrics::RaftMetrics;
use crate::raft::AddLearnerResponse;
use crate::raft::Entry;
Expand All @@ -53,6 +53,7 @@ use crate::raft_types::LogIdOptionExt;
use crate::replication::ReplicaEvent;
use crate::replication::ReplicationStream;
use crate::storage::RaftSnapshotBuilder;
use crate::versioned::Versioned;
use crate::vote::Vote;
use crate::LeaderId;
use crate::LogId;
Expand Down Expand Up @@ -363,7 +364,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "trace", skip(self))]
fn report_metrics(&mut self, leader_metrics: Update<Option<&Arc<LeaderMetrics<C>>>>) {
fn report_metrics(&mut self, leader_metrics: Update<Option<&Versioned<LeaderMetrics<C>>>>) {
let leader_metrics = match leader_metrics {
Update::Update(v) => v.cloned(),
Update::AsIs => self.tx_metrics.borrow().leader_metrics.clone(),
Expand Down Expand Up @@ -751,7 +752,7 @@ struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStora
pub(super) nodes: BTreeMap<C::NodeId, ReplicationState<C>>,

/// The metrics about a leader
pub leader_metrics: Arc<LeaderMetrics<C>>,
pub leader_metrics: Versioned<LeaderMetrics<C>>,

/// The stream of events coming from replication streams.
pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent<C, S::SnapshotData>, Span)>,
Expand All @@ -770,7 +771,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
Self {
core,
nodes: BTreeMap::new(),
leader_metrics: Arc::new(LeaderMetrics::default()),
leader_metrics: Versioned::new(LeaderMetrics::default()),
replication_tx,
replication_rx,
awaiting_committed: Vec::new(),
Expand Down
23 changes: 3 additions & 20 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use tokio::sync::oneshot;
use tracing_futures::Instrument;
Expand All @@ -12,19 +9,20 @@ use crate::core::ReplicationState;
use crate::core::SnapshotState;
use crate::core::State;
use crate::error::AddLearnerError;
use crate::leader_metrics::UpdateMatchedLogId;
use crate::raft::AddLearnerResponse;
use crate::raft::RaftRespTx;
use crate::replication::RaftEvent;
use crate::replication::ReplicaEvent;
use crate::replication::ReplicationStream;
use crate::storage::Snapshot;
use crate::summary::MessageSummary;
use crate::versioned::Updatable;
use crate::vote::Vote;
use crate::LogId;
use crate::RaftNetworkFactory;
use crate::RaftStorage;
use crate::RaftTypeConfig;
use crate::ReplicationMetrics;
use crate::StorageError;

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderState<'a, C, N, S> {
Expand Down Expand Up @@ -181,23 +179,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
#[tracing::instrument(level = "trace", skip(self))]
fn update_leader_metrics(&mut self, target: C::NodeId, matched: LogId<C::NodeId>) {
tracing::debug!(%target, ?matched, "update_leader_metrics");
let (matched_leader_id, matched_index) = (matched.leader_id, matched.index);

if let Some(target_metrics) = self.leader_metrics.replication.get(&target) {
if target_metrics.matched_leader_id == matched_leader_id {
// we can update the metrics in-place
target_metrics.matched_index.store(matched_index, Ordering::Relaxed);
return;
}
}
// either the record does not exist or the leader ID is different
// create a new object with updated metrics
let mut metrics_clone = self.leader_metrics.as_ref().clone();
metrics_clone.replication.insert(target, ReplicationMetrics {
matched_leader_id,
matched_index: AtomicU64::new(matched_index),
});
self.leader_metrics = Arc::new(metrics_clone);
self.leader_metrics.update(UpdateMatchedLogId { target, matched });
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
81 changes: 81 additions & 0 deletions openraft/src/leader_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use serde::Deserialize;
use serde::Serialize;

use crate::versioned::Update;
use crate::versioned::UpdateError;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftTypeConfig;
use crate::ReplicationMetrics;

/// The metrics about the leader. It is Some() only when this node is leader.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct LeaderMetrics<C: RaftTypeConfig> {
/// Replication metrics of all known replication target: voters and learners
pub replication: BTreeMap<C::NodeId, ReplicationMetrics<C>>,
}

impl<C: RaftTypeConfig> MessageSummary for LeaderMetrics<C> {
fn summary(&self) -> String {
let mut res = vec!["LeaderMetrics{".to_string()];
for (i, (k, v)) in self.replication.iter().enumerate() {
if i > 0 {
res.push(", ".to_string());
}
res.push(format!("{}:{}", k, v.summary()));
}

res.push("}".to_string());
res.join("")
}
}

/// Update one replication metrics in `LeaderMetrics.replication`.
pub struct UpdateMatchedLogId<NID: NodeId> {
pub target: NID,
pub matched: LogId<NID>,
}

impl<C: RaftTypeConfig> Update<LeaderMetrics<C>> for UpdateMatchedLogId<C::NodeId> {
/// If there is already a record for the target node. Just modify the atomic u64.
fn apply_in_place(&self, to: &Arc<LeaderMetrics<C>>) -> Result<(), UpdateError> {
let target_metrics = to.replication.get(&self.target).ok_or(UpdateError::CanNotUpdateInPlace)?;

if target_metrics.matched_leader_id == self.matched.leader_id {
target_metrics.matched_index.store(self.matched.index, Ordering::Relaxed);
return Ok(());
}

Err(UpdateError::CanNotUpdateInPlace)
}

/// To insert a new record always work.
fn apply_mut(&self, to: &mut LeaderMetrics<C>) {
to.replication.insert(self.target, ReplicationMetrics {
matched_leader_id: self.matched.leader_id,
matched_index: AtomicU64::new(self.matched.index),
});
}
}

/// Remove one replication metrics in `LeaderMetrics.replication`.
pub struct RemoveTarget<NID: NodeId> {
pub target: NID,
}

impl<C: RaftTypeConfig> Update<LeaderMetrics<C>> for RemoveTarget<C::NodeId> {
/// Removing can not be done in place
fn apply_in_place(&self, _to: &Arc<LeaderMetrics<C>>) -> Result<(), UpdateError> {
Err(UpdateError::CanNotUpdateInPlace)
}

fn apply_mut(&self, to: &mut LeaderMetrics<C>) {
to.replication.remove(&self.target);
}
}
104 changes: 104 additions & 0 deletions openraft/src/leader_metrics_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use crate::leader_metrics::LeaderMetrics;
use crate::leader_metrics::UpdateMatchedLogId;
use crate::testing::DummyConfig;
use crate::versioned::Updatable;
use crate::versioned::Versioned;
use crate::LeaderId;
use crate::LogId;
use crate::MessageSummary;

#[test]
fn test_versioned() -> anyhow::Result<()> {
let mut a = Versioned::new(LeaderMetrics::<DummyConfig> {
replication: Default::default(),
});

assert_eq!("{ver:0, LeaderMetrics{}}", a.summary());

// In place update

a.update(UpdateMatchedLogId {
target: 1,
matched: LogId::new(LeaderId::new(1, 2), 3),
});

assert_eq!("{ver:1, LeaderMetrics{1:1-2-3}}", a.summary());

let mut b1 = a.clone();

// Two instances reference the same data.
// In place update applies to both instance.

b1.update(UpdateMatchedLogId {
target: 1,
matched: LogId::new(LeaderId::new(1, 2), 5),
});
assert_eq!("{ver:1, LeaderMetrics{1:1-2-5}}", a.summary());
assert_eq!("{ver:2, LeaderMetrics{1:1-2-5}}", b1.summary());

// In place update is not possible.
// Fall back to cloned update

b1.update(UpdateMatchedLogId {
target: 2,
matched: LogId::new(LeaderId::new(1, 2), 5),
});
assert_eq!("{ver:1, LeaderMetrics{1:1-2-5}}", a.summary());
assert_eq!("{ver:3, LeaderMetrics{1:1-2-5, 2:1-2-5}}", b1.summary());

// a and b1 have the same content but not equal, because they reference different data.

a.update(UpdateMatchedLogId {
target: 1,
matched: LogId::new(LeaderId::new(1, 2), 5),
});
a.update(UpdateMatchedLogId {
target: 2,
matched: LogId::new(LeaderId::new(1, 2), 5),
});
assert_eq!("{ver:3, LeaderMetrics{1:1-2-5, 2:1-2-5}}", a.summary());
assert_eq!("{ver:3, LeaderMetrics{1:1-2-5, 2:1-2-5}}", b1.summary());
assert_ne!(a, b1);

// b2 reference the same data as b1.

let mut b2 = b1.clone();
b2.update(UpdateMatchedLogId {
target: 2,
matched: LogId::new(LeaderId::new(1, 2), 9),
});
assert_eq!("{ver:3, LeaderMetrics{1:1-2-5, 2:1-2-9}}", b1.summary());
assert_eq!("{ver:4, LeaderMetrics{1:1-2-5, 2:1-2-9}}", b2.summary());
assert_ne!(b1, b2);

// Two Versioned are equal only when they reference the same data and have the same version.

b1.update(UpdateMatchedLogId {
target: 2,
matched: LogId::new(LeaderId::new(1, 2), 9),
});
assert_eq!("{ver:4, LeaderMetrics{1:1-2-5, 2:1-2-9}}", b1.summary());
assert_eq!("{ver:4, LeaderMetrics{1:1-2-5, 2:1-2-9}}", b2.summary());
assert_eq!(b1, b2);

Ok(())
}

#[test]
fn test_versioned_methods() -> anyhow::Result<()> {
let mut a = Versioned::new(LeaderMetrics::<DummyConfig> {
replication: Default::default(),
});

a.update(UpdateMatchedLogId {
target: 1,
matched: LogId::new(LeaderId::new(1, 2), 3),
});

assert_eq!("{ver:1, LeaderMetrics{1:1-2-3}}", a.summary());

assert_eq!(1, a.version());
assert_eq!("LeaderMetrics{1:1-2-3}", a.data().summary());

Ok(())
}
4 changes: 4 additions & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod config;
mod core;
mod defensive;
mod leader_metrics;
mod membership;
mod node;
mod raft_types;
Expand All @@ -20,7 +21,10 @@ pub mod network;
pub mod raft;
pub mod storage;
pub mod testing;
pub mod versioned;

#[cfg(test)]
mod leader_metrics_test;
#[cfg(test)]
mod metrics_wait_test;

Expand Down
28 changes: 3 additions & 25 deletions openraft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
//! return a stream of metrics.

use std::collections::BTreeSet;
use std::collections::HashMap;
use std::sync::Arc;

use serde::Deserialize;
Expand All @@ -21,12 +20,13 @@ use tokio::time::Instant;
use crate::core::EffectiveMembership;
use crate::core::State;
use crate::error::Fatal;
use crate::leader_metrics::LeaderMetrics;
use crate::raft_types::LogIdOptionExt;
use crate::versioned::Versioned;
use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
use crate::RaftTypeConfig;
use crate::ReplicationMetrics;

/// A set of metrics describing the current state of a Raft node.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -53,7 +53,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
pub snapshot: Option<LogId<C::NodeId>>,

/// The metrics about the leader. It is Some() only when this node is leader.
pub leader_metrics: Option<Arc<LeaderMetrics<C>>>,
pub leader_metrics: Option<Versioned<LeaderMetrics<C>>>,
}

impl<C: RaftTypeConfig> MessageSummary for RaftMetrics<C> {
Expand All @@ -72,28 +72,6 @@ impl<C: RaftTypeConfig> MessageSummary for RaftMetrics<C> {
}
}

/// The metrics about the leader. It is Some() only when this node is leader.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct LeaderMetrics<C: RaftTypeConfig> {
/// Replication metrics of all known replication target: voters and learners
pub replication: HashMap<C::NodeId, ReplicationMetrics<C>>,
}

impl<C: RaftTypeConfig> MessageSummary for LeaderMetrics<C> {
fn summary(&self) -> String {
let mut res = vec!["LeaderMetrics{".to_string()];
for (i, (k, v)) in self.replication.iter().enumerate() {
if i > 0 {
res.push(",".to_string());
}
res.push(format!("{}:{}", k, v.summary()));
}

res.push("}".to_string());
res.join("")
}
}

impl<C: RaftTypeConfig> RaftMetrics<C> {
pub(crate) fn new_initial(id: C::NodeId) -> Self {
let membership_config = Membership::new_initial(id);
Expand Down
Loading

0 comments on commit 966eb28

Please sign in to comment.