From f7a4ebd9166fb9b404dae2b628c53c2178a3bcbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 29 Jun 2023 16:27:52 +0800 Subject: [PATCH] Fix: restore replication progress when a leader starts up As a leader, the replication progress to itself should be restored upon startup. And if this leader is the only node in a cluster, it should re-apply all of the logs to state machine at once. - Fix: #883 --- memstore/src/lib.rs | 6 ++ openraft/src/engine/engine_impl.rs | 4 ++ .../src/engine/handler/log_handler/mod.rs | 2 +- .../engine/handler/replication_handler/mod.rs | 15 +++++ .../src/engine/handler/vote_handler/mod.rs | 6 -- openraft/src/internal_server_state.rs | 2 + scripts/mprocs-check.yaml | 30 +++++++++ tests/tests/life_cycle/main.rs | 1 + ...t50_single_leader_restart_re_apply_logs.rs | 62 +++++++++++++++++++ 9 files changed, 121 insertions(+), 7 deletions(-) create mode 100755 scripts/mprocs-check.yaml create mode 100644 tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 113195c3d..3e3eb7e98 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -168,6 +168,12 @@ impl MemStore { self.sm.write().await.clone() } + /// Clear the state machine for testing purposes. + pub async fn clear_state_machine(&self) { + let mut sm = self.sm.write().await; + *sm = MemStoreStateMachine::default(); + } + /// Block an operation for testing purposes. pub fn set_blocking(&self, block: BlockOperation, d: Duration) { self.block.lock().unwrap().insert(block, d); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 701246e18..245b017df 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -113,6 +113,10 @@ where C: RaftTypeConfig let mut rh = self.replication_handler(); rh.rebuild_replication_streams(); + + // Restore the progress about the local log + rh.update_local_progress(rh.state.last_log_id().copied()); + rh.initiate_replication(SendNone::False); return; diff --git a/openraft/src/engine/handler/log_handler/mod.rs b/openraft/src/engine/handler/log_handler/mod.rs index d3064c0b4..93eedf78a 100644 --- a/openraft/src/engine/handler/log_handler/mod.rs +++ b/openraft/src/engine/handler/log_handler/mod.rs @@ -12,7 +12,7 @@ use crate::RaftTypeConfig; #[cfg(test)] mod calc_purge_upto_test; #[cfg(test)] mod purge_log_test; -/// Handle raft vote related operations +/// Handle raft-log related operations pub(crate) struct LogHandler<'x, C> where C: RaftTypeConfig { diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 10f19b9dc..a3a05163e 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -199,6 +199,14 @@ where C: RaftTypeConfig /// accepted. #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_matching(&mut self, node_id: C::NodeId, inflight_id: u64, log_id: Option>) { + tracing::debug!( + node_id = display(node_id), + inflight_id = display(inflight_id), + log_id = display(log_id.display()), + "{}", + func_name!() + ); + debug_assert!(log_id.is_some(), "a valid update can never set matching to None"); // The value granted by a quorum may not yet be a committed. @@ -447,6 +455,8 @@ where C: RaftTypeConfig /// Writing to local log store does not have to wait for a replication response from remote /// node. Thus it can just be done in a fast-path. pub(crate) fn update_local_progress(&mut self, upto: Option>) { + tracing::debug!(upto = display(upto.display()), "{}", func_name!()); + if upto.is_none() { return; } @@ -455,6 +465,11 @@ where C: RaftTypeConfig // The leader may not be in membership anymore if let Some(prog_entry) = self.leader.progress.get_mut(&id) { + tracing::debug!( + self_matching = display(prog_entry.matching.display()), + "update progress" + ); + if prog_entry.matching >= upto { return; } diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 7993e2242..31d530974 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -158,12 +158,6 @@ where C: RaftTypeConfig self.state.last_log_id().copied(), ); - // TODO: the progress should be initialized when the leader is elected. - // TODO: we do not need to update the progress until the first blank log is appended. - // We can just ignore the result here: - // The `committed` will not be updated until a log of current term is granted by a quorum - let _ = leader.progress.update_with(&self.config.id, |v| v.matching = self.state.last_log_id().copied()); - // Do not update clock_progress, until the first blank log is committed. *self.internal_server_state = InternalServerState::Leading(leader); diff --git a/openraft/src/internal_server_state.rs b/openraft/src/internal_server_state.rs index d0fd40617..35535dfba 100644 --- a/openraft/src/internal_server_state.rs +++ b/openraft/src/internal_server_state.rs @@ -23,6 +23,8 @@ pub(crate) type LeaderQuorumSet = Joint, Vec>>; #[derive(PartialEq, Eq)] #[allow(clippy::large_enum_variant)] // TODO(9): consider moving Leader to a Box +// TODO(9): Make InternalServerState an Option, separate Leading(Proposer) role and +// Following(Acceptor) role pub(crate) enum InternalServerState where NID: NodeId, diff --git a/scripts/mprocs-check.yaml b/scripts/mprocs-check.yaml new file mode 100755 index 000000000..0a99f25d0 --- /dev/null +++ b/scripts/mprocs-check.yaml @@ -0,0 +1,30 @@ +#!/usr/bin/env mprocs --config + + +# run local check in parallel with mprocs +# +# Usage: +# mprocs --config ./scripts/check.yaml +# +# Install: +# cargo install mprocs +# +# See: https://github.com/pvolok/mprocs + + +procs: + test-lib: + cmd: ["cargo", "test", "--lib"] + it: + cmd: ["cargo", "test", "--test", "*"] + clippy: + cmd: ["cargo", "clippy", "--no-deps", "--all-targets", "--", "-D", "warnings"] + + # # keeps examples: + # xx: + # shell: "nodemon server.js" + # webpack: "webpack serve" + # tests: + # shell: "jest -w" + # env: + # NODE_ENV: test diff --git a/tests/tests/life_cycle/main.rs b/tests/tests/life_cycle/main.rs index 15de04d5e..56064993c 100644 --- a/tests/tests/life_cycle/main.rs +++ b/tests/tests/life_cycle/main.rs @@ -12,4 +12,5 @@ mod t10_initialization; mod t11_shutdown; mod t50_follower_restart_does_not_interrupt; mod t50_single_follower_restart; +mod t50_single_leader_restart_re_apply_logs; mod t90_issue_607_single_restart; diff --git a/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs b/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs new file mode 100644 index 000000000..a2697ef09 --- /dev/null +++ b/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; +use std::time::Duration; + +use maplit::btreeset; +use openraft::storage::RaftLogStorage; +use openraft::Config; +use openraft::ServerState; +use openraft::Vote; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::MemLogStore; +use crate::fixtures::MemRaft; +use crate::fixtures::MemStateMachine; +use crate::fixtures::RaftRouter; + +/// A single leader should re-apply all logs upon startup, +/// because itself is a quorum. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> { + let config = Arc::new( + Config { + enable_heartbeat: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- bring up cluster of 1 node"); + let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?; + + tracing::info!(log_index, "--- write to 1 log"); + { + log_index += router.client_request_many(0, "foo", 1).await?; + } + + tracing::info!(log_index, "--- stop and restart node-0"); + { + let (node, mut ls, sm): (MemRaft, MemLogStore, MemStateMachine) = router.remove_node(0).unwrap(); + node.shutdown().await?; + + // Clear state machine, logs should be re-applied upon restart, because it is a leader. + ls.storage().await.clear_state_machine().await; + + tracing::info!(log_index, "--- restart node-0"); + + router.new_raft_node_with_sto(0, ls, sm).await; + router.wait(&0, timeout()).state(ServerState::Leader, "become leader upon restart").await?; + } + + tracing::info!(log_index, "--- a single leader should re-apply all logs"); + { + router.wait(&0, timeout()).log(Some(log_index), "node-0 works").await?; + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +}