Skip to content

Commit

Permalink
Fix: if the application does not persist snapshot, build a snapshot w…
Browse files Browse the repository at this point in the history
…hen starting up
  • Loading branch information
drmingdrmer committed Apr 15, 2023
1 parent bbfe4f4 commit 04e4060
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 5 deletions.
4 changes: 2 additions & 2 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ where

tracing::debug!("{}: received command: {:?}", func_name!(), cmd);

let done = match cmd.payload {
let command_result = match cmd.payload {
CommandPayload::BuildSnapshot => {
let resp = self.build_snapshot().await?;
CommandResult::new(cmd.command_id, Ok(Response::BuildSnapshot(resp)))
Expand Down Expand Up @@ -182,7 +182,7 @@ where
}
};

let _ = self.resp_tx.send(RaftMsg::StateMachine { command_result: done });
let _ = self.resp_tx.send(RaftMsg::StateMachine { command_result });

(cmd.respond)();
}
Expand Down
2 changes: 0 additions & 2 deletions openraft/src/storage/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ where
}
}

// TODO(1): make it private because only tests need it.
// rewrite memstore with separated log-store and state machine.
/// Get a write lock of the underlying storage.
pub async fn storage_mut(&self) -> RwLockWriteGuard<S> {
self.storage.write().await
Expand Down
19 changes: 18 additions & 1 deletion openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::utime::UTime;
use crate::EffectiveMembership;
use crate::LogIdOptionExt;
use crate::MembershipState;
use crate::RaftSnapshotBuilder;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::StorageError;
Expand Down Expand Up @@ -76,7 +77,23 @@ where
// TODO: `flushed` is not set.
let io_state = IOState::new(LogIOId::default(), last_applied);

let snapshot_meta = self.state_machine.get_current_snapshot().await?.map(|x| x.meta).unwrap_or_default();
let snapshot = self.state_machine.get_current_snapshot().await?;

// If there is not a snapshot and there are logs purged, which means the snapshot is not persisted,
// we just rebuild it so that replication can use it.
let snapshot = match snapshot {
None => {
if last_purged_log_id.is_some() {
let mut b = self.state_machine.get_snapshot_builder().await;
let s = b.build_snapshot().await?;
Some(s)
} else {
None
}
}
s @ Some(_) => s,
};
let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default();

let now = Instant::now();

Expand Down
1 change: 1 addition & 0 deletions tests/tests/snapshot/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
mod fixtures;

mod t20_api_install_snapshot;
mod t20_startup_snapshot;
mod t20_trigger_snapshot;
mod t23_snapshot_chunk_size;
mod t24_snapshot_when_lacking_log;
Expand Down
55 changes: 55 additions & 0 deletions tests/tests/snapshot/t20_startup_snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::storage::RaftLogStorage;
use openraft::Config;
use openraft::SnapshotPolicy;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::log_id;
use crate::fixtures::RaftRouter;

/// When startup, if there is no snapshot and there are logs purged, it should build a snapshot at
/// once.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn startup_build_snapshot() -> anyhow::Result<()> {
let snapshot_threshold = 10;

let config = Arc::new(
Config {
enable_heartbeat: false,
snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
max_in_snapshot_log_to_keep: 0,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?;

tracing::info!("--- send client requests");
{
router.client_request_many(0, "0", (snapshot_threshold - 1 - log_index) as usize).await?;
log_index = snapshot_threshold - 1;
}

tracing::info!("--- shut down and purge to log index: {}", 5);
let (_, mut log_store, sm) = router.remove_node(0).unwrap();
log_store.purge(log_id(1, 0, 5)).await?;

tracing::info!("--- restart, expect snapshot at index: {} for node-1", log_index);
{
router.new_raft_node_with_sto(0, log_store, sm).await;
router.wait(&0, timeout()).snapshot(log_id(1, 0, log_index), "node-1 snapshot").await?;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}

0 comments on commit 04e4060

Please sign in to comment.