Skip to content

Commit

Permalink
turn wait_for_metrics a fallible func; re-organize import
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jun 1, 2021
1 parent 823ae49 commit 273e14a
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;

use anyhow::{anyhow, Result};
use tokio::sync::RwLock;
use tracing_subscriber::prelude::*;

use async_raft::{Config, NodeId, Raft, RaftMetrics, RaftNetwork, State};
use async_raft::async_trait::async_trait;
use async_raft::error::{ChangeConfigError, ClientReadError, ClientWriteError};
use async_raft::raft::ClientWriteRequest;
use async_raft::raft::MembershipConfig;
use async_raft::raft::{AppendEntriesRequest, AppendEntriesResponse};
use async_raft::raft::{InstallSnapshotRequest, InstallSnapshotResponse};
use async_raft::raft::{VoteRequest, VoteResponse};
use async_raft::raft::ClientWriteRequest;
use async_raft::raft::MembershipConfig;
use async_raft::storage::RaftStorage;
use async_raft::{Config, NodeId, Raft, RaftMetrics, RaftNetwork, State};
use memstore::{ClientRequest as MemClientRequest, ClientResponse as MemClientResponse, MemStore};
use tokio::sync::RwLock;
use tracing_subscriber::prelude::*;

/// A concrete Raft type used during testing.
pub type MemRaft = Raft<MemClientRequest, MemClientResponse, RaftRouter, MemStore>;
Expand Down Expand Up @@ -115,20 +116,22 @@ impl RaftRouter {
metrics
}


pub async fn get_sto(&self, node_id: &NodeId) -> Arc<MemStore> {
let sto = self.routing_table.read().await.get(node_id).unwrap().clone().1;
sto
/// Get a handle to the storage backend for the target node.
pub async fn get_storage_handle(&self, node_id: &NodeId) -> Result<Arc<MemStore>> {
let rt = self.routing_table.read().await;
let addr = rt.get(node_id).ok_or_else(||anyhow::anyhow!("could not find node {} in routing table", node_id))?;
let sto = addr.clone().1;
Ok(sto)
}

/// Wait for metrics until it satisfies some condition.
#[tracing::instrument(level = "info", skip(self, func))]
pub async fn wait_for_metrics<T>(&self, node_id:&NodeId, func: T, timeout: tokio::time::Duration, msg: &str) -> RaftMetrics
where
T: Fn(&RaftMetrics) -> bool,
pub async fn wait_for_metrics<T>(&self, node_id: &NodeId, func: T, timeout: tokio::time::Duration, msg: &str) -> Result<RaftMetrics>
where
T: Fn(&RaftMetrics) -> bool,
{

let rt = self.routing_table.read().await;
let node = rt.get(node_id).unwrap();
let node = rt.get(node_id).ok_or_else(|| anyhow::anyhow!("node {} not found", node_id))?;
let mut mrx = node.0.metrics().clone();

loop {
Expand All @@ -138,14 +141,14 @@ impl RaftRouter {

if func(&latest) {
tracing::info!("done wait for {:} metrics: {:?}", msg, latest);
return latest;
return Ok(latest);
}

let delay = tokio::time::sleep(timeout);

tokio::select! {
_ = delay => {
panic!("timeout wait for {} metrics: {:?}", msg, latest);
return Err(anyhow::anyhow!("timeout wait for {} latest metrics: {:?}", msg, latest));
}
changed = mrx.changed() => {
assert!(changed.is_ok());
Expand Down

0 comments on commit 273e14a

Please sign in to comment.