diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index b64186450..b8668ef32 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -5,6 +5,7 @@ use tokio::sync::watch; use tokio::time::Instant; use crate::core::ServerState; +use crate::display_ext::DisplayOption; use crate::metrics::RaftMetrics; use crate::node::Node; use crate::LogId; @@ -190,4 +191,14 @@ where ) .await } + + /// Wait for `purged` to become `want` or timeout. + #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] + pub async fn purged(&self, want: Option>, msg: impl ToString) -> Result, WaitError> { + self.metrics( + |x| x.purged == want, + &format!("{} .purged -> {}", msg.to_string(), DisplayOption(&want)), + ) + .await + } } diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index 010bd909c..b17c13546 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -10,6 +10,7 @@ use crate::core::ServerState; use crate::log_id::LogIdOptionExt; use crate::metrics::Wait; use crate::metrics::WaitError; +use crate::testing::log_id; use crate::vote::CommittedLeaderId; use crate::LogId; use crate::Membership; @@ -170,6 +171,24 @@ async fn test_wait() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn test_wait_purged() -> anyhow::Result<()> { + let (init, w, tx) = init_wait_test::(); + + let h = tokio::spawn(async move { + sleep(Duration::from_millis(10)).await; + let mut update = init.clone(); + update.purged = Some(log_id(1, 2, 3)); + let rst = tx.send(update); + assert!(rst.is_ok()); + }); + let got = w.purged(Some(log_id(1, 2, 3)), "purged").await?; + h.await?; + assert_eq!(Some(log_id(1, 2, 3)), got.purged); + + Ok(()) +} + pub(crate) type InitResult = (RaftMetrics, Wait, watch::Sender>); /// Build a initial state for testing of Wait: