Skip to content

Commit

Permalink
Feature: add Wait::purged() to wait for purged to become the expected
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed May 1, 2023
1 parent 31a181f commit 37e9648
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
11 changes: 11 additions & 0 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::sync::watch;
use tokio::time::Instant;

use crate::core::ServerState;
use crate::display_ext::DisplayOption;
use crate::metrics::RaftMetrics;
use crate::node::Node;
use crate::LogId;
Expand Down Expand Up @@ -190,4 +191,14 @@ where
)
.await
}

/// Wait for `purged` to become `want` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn purged(&self, want: Option<LogId<NID>>, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(
|x| x.purged == want,
&format!("{} .purged -> {}", msg.to_string(), DisplayOption(&want)),
)
.await
}
}
19 changes: 19 additions & 0 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::core::ServerState;
use crate::log_id::LogIdOptionExt;
use crate::metrics::Wait;
use crate::metrics::WaitError;
use crate::testing::log_id;
use crate::vote::CommittedLeaderId;
use crate::LogId;
use crate::Membership;
Expand Down Expand Up @@ -170,6 +171,24 @@ async fn test_wait() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_wait_purged() -> anyhow::Result<()> {
let (init, w, tx) = init_wait_test::<u64, ()>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.purged = Some(log_id(1, 2, 3));
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.purged(Some(log_id(1, 2, 3)), "purged").await?;
h.await?;
assert_eq!(Some(log_id(1, 2, 3)), got.purged);

Ok(())
}

pub(crate) type InitResult<NID, N> = (RaftMetrics<NID, N>, Wait<NID, N>, watch::Sender<RaftMetrics<NID, N>>);

/// Build a initial state for testing of Wait:
Expand Down

0 comments on commit 37e9648

Please sign in to comment.