diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 056edd82d..fea5a05c0 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -104,6 +104,7 @@ pub struct MemStoreStateMachine { #[derive(PartialOrd, Ord)] pub enum BlockOperation { BuildSnapshot, + PurgeLog, } /// An in-memory storage system implementing the `RaftStorage` trait. @@ -331,7 +332,12 @@ impl RaftStorage for Arc { #[tracing::instrument(level = "debug", skip_all)] async fn purge_logs_upto(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("purge_log_upto: [{:?}, +oo)", log_id); + tracing::debug!("purge_log_upto: {:?}", log_id); + + if let Some(d) = self.get_blocking(&BlockOperation::PurgeLog) { + tracing::info!(?d, "block purging log"); + tokio::time::sleep(d).await; + } { let mut ld = self.last_purged_log_id.write().await; diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index d478be607..19b18d29c 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -516,6 +516,7 @@ where last_log_index: self.engine.state.last_log_id().index(), last_applied: self.engine.state.io_applied().copied(), snapshot: self.engine.state.snapshot_meta.last_log_id, + purged: self.engine.state.io_purged().copied(), // --- cluster --- state: self.engine.state.server_state, @@ -1520,7 +1521,10 @@ where Command::SaveVote { vote } => { self.log_store.save_vote(&vote).await?; } - Command::PurgeLog { upto } => self.log_store.purge(upto).await?, + Command::PurgeLog { upto } => { + self.log_store.purge(upto).await?; + self.engine.state.io_state_mut().update_purged(Some(upto)); + } Command::DeleteConflictLog { since } => { self.log_store.truncate(since).await?; } diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index 69e7bf527..2796e2218 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -38,6 +38,12 @@ where /// If there is no snapshot, it is (0,0). pub snapshot: Option>, + /// The last log id that has purged from storage, inclusive. + /// + /// `purged` is also the first log id Openraft knows, although the corresponding log entry has + /// already been deleted. + pub purged: Option>, + // --- // --- cluster --- // --- @@ -64,7 +70,7 @@ where { // TODO: make this more readable fn summary(&self) -> String { - format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, replication:{{{}}}", + format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, purged:{}, replication:{{{}}}", self.id, self.state, self.current_term, @@ -73,6 +79,7 @@ where self.current_leader, self.membership_config.summary(), self.snapshot, + self.purged.summary(), self.replication.as_ref().map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, v.summary())).collect::>().join(",") }).unwrap_or_default(), @@ -89,13 +96,16 @@ where Self { running_state: Ok(()), id, - state: ServerState::Follower, + current_term: 0, last_log_index: None, last_applied: None, + snapshot: None, + purged: None, + + state: ServerState::Follower, current_leader: None, membership_config: Arc::new(StoredMembership::default()), - snapshot: None, replication: None, } } diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index 6ed7bffa2..010bd909c 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -186,6 +186,8 @@ where current_term: 0, last_log_index: None, last_applied: None, + purged: None, + current_leader: None, membership_config: Arc::new(StoredMembership::new(None, Membership::new(vec![btreeset! {}], None))), diff --git a/openraft/src/progress/entry/tests.rs b/openraft/src/progress/entry/tests.rs index 58b8e16eb..e7b48b724 100644 --- a/openraft/src/progress/entry/tests.rs +++ b/openraft/src/progress/entry/tests.rs @@ -121,6 +121,10 @@ impl LogStateReader for LogState { todo!() } + fn io_purged(&self) -> Option<&LogId> { + todo!() + } + fn snapshot_last_log_id(&self) -> Option<&LogId> { self.snap_last.as_ref() } diff --git a/openraft/src/raft_state/io_state.rs b/openraft/src/raft_state/io_state.rs index a67195595..f121a63ed 100644 --- a/openraft/src/raft_state/io_state.rs +++ b/openraft/src/raft_state/io_state.rs @@ -25,16 +25,25 @@ pub(crate) struct IOState { /// The last log id that has been flushed to storage. pub(crate) flushed: LogIOId, + /// The last log id that has been applied to state machine. pub(crate) applied: Option>, + + /// The last log id that has been purged from storage. + /// + /// [`RaftState::last_purged_log_id`](`crate::raft_state::RaftState::last_purged_log_id`) + /// is just the log id that is going to be purged, i.e., there is a `PurgeLog` command queued to + /// be executed, and it may not be the actually purged log id. + pub(crate) purged: Option>, } impl IOState { - pub(crate) fn new(flushed: LogIOId, applied: Option>) -> Self { + pub(crate) fn new(flushed: LogIOId, applied: Option>, purged: Option>) -> Self { Self { building_snapshot: false, flushed, applied, + purged, } } pub(crate) fn update_applied(&mut self, log_id: Option>) { @@ -62,4 +71,12 @@ impl IOState { pub(crate) fn building_snapshot(&self) -> bool { self.building_snapshot } + + pub(crate) fn update_purged(&mut self, log_id: Option>) { + self.purged = log_id; + } + + pub(crate) fn purged(&self) -> Option<&LogId> { + self.purged.as_ref() + } } diff --git a/openraft/src/raft_state/log_state_reader.rs b/openraft/src/raft_state/log_state_reader.rs index d7b20266d..cf1e469e6 100644 --- a/openraft/src/raft_state/log_state_reader.rs +++ b/openraft/src/raft_state/log_state_reader.rs @@ -54,6 +54,11 @@ pub(crate) trait LogStateReader { /// This is actually happened io-state which might fall behind committed log id. fn io_applied(&self) -> Option<&LogId>; + /// The last known purged log id, inclusive. + /// + /// This is actually purged log id from storage. + fn io_purged(&self) -> Option<&LogId>; + /// Return the last log id the snapshot includes. fn snapshot_last_log_id(&self) -> Option<&LogId>; diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 3a4da552b..47501606f 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -115,6 +115,10 @@ where self.io_state.applied() } + fn io_purged(&self) -> Option<&LogId> { + self.io_state.purged() + } + fn snapshot_last_log_id(&self) -> Option<&LogId> { self.snapshot_meta.last_log_id.as_ref() } diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 4a2ac80bb..6cc64581b 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -75,7 +75,7 @@ where let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?; // TODO: `flushed` is not set. - let io_state = IOState::new(LogIOId::default(), last_applied); + let io_state = IOState::new(LogIOId::default(), last_applied, last_purged_log_id); let snapshot = self.state_machine.get_current_snapshot().await?; diff --git a/tests/tests/metrics/main.rs b/tests/tests/metrics/main.rs index e7d186382..f84e6ef06 100644 --- a/tests/tests/metrics/main.rs +++ b/tests/tests/metrics/main.rs @@ -9,6 +9,7 @@ mod fixtures; // The later tests may depend on the earlier ones. mod t10_current_leader; +mod t10_purged; mod t20_metrics_state_machine_consistency; mod t30_leader_metrics; mod t40_metrics_wait; diff --git a/tests/tests/metrics/t10_purged.rs b/tests/tests/metrics/t10_purged.rs new file mode 100644 index 000000000..bfecc230c --- /dev/null +++ b/tests/tests/metrics/t10_purged.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use maplit::btreeset; +use openraft::testing::log_id; +use openraft::Config; +use openraft::RaftLogReader; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// Metric `purged` should be the last purged log id. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn metrics_purged() -> Result<()> { + let config = Arc::new( + Config { + enable_heartbeat: false, + max_in_snapshot_log_to_keep: 0, + purge_batch_size: 1, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initialize cluster"); + let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + let n = 10; + tracing::info!(log_index, "--- write {} logs", n); + log_index += router.client_request_many(0, "foo", n).await?; + + tracing::info!(log_index, "--- trigger snapshot"); + { + let n0 = router.get_raft_handle(&0)?; + n0.trigger_snapshot().await?; + n0.wait(timeout()).snapshot(log_id(1, 0, log_index), "build snapshot").await?; + + tracing::info!(log_index, "--- metrics reports purged log id"); + n0.wait(timeout()) + .metrics( + |m| m.purged == Some(log_id(1, 0, log_index)), + "purged is reported to metrics", + ) + .await?; + + tracing::info!(log_index, "--- check storage at once to ensure purged log is removed"); + let (mut sto0, _sm0) = router.get_storage_handle(&0)?; + let state = sto0.get_log_state().await?; + assert_eq!(state.last_purged_log_id, Some(log_id(1, 0, log_index))); + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +}