Skip to content

Commit

Permalink
Merge pull request #641 from drmingdrmer/20-snapshot-handler
Browse files Browse the repository at this point in the history
refactor: move snapshot handling from Engine to SnapshotHandler
  • Loading branch information
drmingdrmer committed Jan 8, 2023
2 parents 3ab40f6 + 1fcad1c commit e1a548c
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 140 deletions.
48 changes: 18 additions & 30 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use crate::core::ServerState;
use crate::engine::vote_handler::VoteHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::engine::handler::vote_handler::VoteHandler;
use crate::engine::Command;
use crate::entry::RaftEntry;
use crate::error::InitializeError;
Expand Down Expand Up @@ -687,7 +688,7 @@ where
let mem_state = MembershipState { committed, effective };

if self.state.membership_state.effective != mem_state.effective {
self.push_command(Command::UpdateMembership {
self.output.push_command(Command::UpdateMembership {
membership: mem_state.effective.clone(),
})
}
Expand Down Expand Up @@ -873,7 +874,8 @@ where
// snapshot_last_log_id can not be None
let snap_last_log_id = snap_last_log_id.unwrap();

let updated = self.update_snapshot(meta.clone());
let mut snap_handler = self.snapshot_handler();
let updated = snap_handler.update_snapshot(meta.clone());
if !updated {
return;
}
Expand Down Expand Up @@ -914,7 +916,7 @@ where
// - Replace state machine with snapshot and replace the `current_snapshot` in the store.
// - Do not install, just replace the `current_snapshot` with a newer one. This command can be used for
// leader to synchronize its snapshot data.
self.push_command(Command::InstallSnapshot { snapshot_meta: meta });
self.output.push_command(Command::InstallSnapshot { snapshot_meta: meta });

// A local log that is <= snap_last_log_id can not conflict with the leader.
// But there will be a hole in the logs. Thus it's better remove all logs.
Expand All @@ -929,35 +931,14 @@ where
pub(crate) fn finish_building_snapshot(&mut self, meta: SnapshotMeta<NID, N>) {
tracing::info!("finish_building_snapshot: {:?}", meta);

let updated = self.update_snapshot(meta);
let mut h = self.snapshot_handler();
let updated = h.update_snapshot(meta);
if !updated {
return;
}

self.purge_in_snapshot_log();
}

/// Update engine state when a new snapshot is built or installed.
///
/// Engine records only the metadata of a snapshot. Snapshot data is stored by RaftStorage implementation.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta<NID, N>) -> bool {
tracing::info!("update_snapshot: {:?}", meta);

if meta.last_log_id <= self.state.snapshot_meta.last_log_id {
tracing::info!(
"No need to install a smaller snapshot: current snapshot last_log_id({}), new snapshot last_log_id({})",
self.state.snapshot_meta.last_log_id.summary(),
meta.last_log_id.summary()
);
return false;
}

self.state.snapshot_meta = meta;
self.output.metrics_flags.set_data_changed();

true
}
}

/// Supporting util
Expand Down Expand Up @@ -1178,9 +1159,9 @@ where
let is_leader = server_state == ServerState::Leader;

if !was_leader && is_leader {
self.push_command(Command::BecomeLeader);
self.output.push_command(Command::BecomeLeader);
} else if was_leader && !is_leader {
self.push_command(Command::QuitLeader);
self.output.push_command(Command::QuitLeader);
} else {
// nothing to do
}
Expand Down Expand Up @@ -1319,10 +1300,17 @@ where

// --- handlers ---

fn vote_handler(&mut self) -> VoteHandler<NID, N> {
pub(crate) fn vote_handler(&mut self) -> VoteHandler<NID, N> {
VoteHandler {
state: &mut self.state,
output: &mut self.output,
}
}

pub(crate) fn snapshot_handler(&mut self) -> SnapshotHandler<NID, N> {
SnapshotHandler {
state: &mut self.state,
output: &mut self.output,
}
}
}
2 changes: 2 additions & 0 deletions openraft/src/engine/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod snapshot_handler;
pub(crate) mod vote_handler;
157 changes: 157 additions & 0 deletions openraft/src/engine/handler/snapshot_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use crate::engine::engine_impl::EngineOutput;
use crate::raft_state::LogStateReader;
use crate::summary::MessageSummary;
use crate::Node;
use crate::NodeId;
use crate::RaftState;
use crate::SnapshotMeta;

/// Handle raft vote related operations
pub(crate) struct SnapshotHandler<'st, 'out, NID, N>
where
NID: NodeId,
N: Node,
{
pub(crate) state: &'st mut RaftState<NID, N>,
pub(crate) output: &'out mut EngineOutput<NID, N>,
}

impl<'st, 'out, NID, N> SnapshotHandler<'st, 'out, NID, N>
where
NID: NodeId,
N: Node,
{
/// Update engine state when a new snapshot is built or installed.
///
/// Engine records only the metadata of a snapshot. Snapshot data is stored by RaftStorage implementation.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta<NID, N>) -> bool {
tracing::info!("update_snapshot: {:?}", meta);

if meta.last_log_id <= self.state.snapshot_last_log_id().copied() {
tracing::info!(
"No need to install a smaller snapshot: current snapshot last_log_id({}), new snapshot last_log_id({})",
self.state.snapshot_last_log_id().summary(),
meta.last_log_id.summary()
);
return false;
}

self.state.snapshot_meta = meta;
self.output.metrics_flags.set_data_changed();

true
}
}

#[cfg(test)]
mod tests {
use maplit::btreeset;
use pretty_assertions::assert_eq;

use crate::engine::Engine;
use crate::EffectiveMembership;
use crate::LeaderId;
use crate::LogId;
use crate::Membership;
use crate::MetricsChangeFlags;
use crate::SnapshotMeta;

fn log_id(term: u64, index: u64) -> LogId<u64> {
LogId::<u64> {
leader_id: LeaderId { term, node_id: 1 },
index,
}
}

fn m12() -> Membership<u64, ()> {
Membership::<u64, ()>::new(vec![btreeset! {1,2}], None)
}

fn m1234() -> Membership<u64, ()> {
Membership::<u64, ()>::new(vec![btreeset! {1,2,3,4}], None)
}

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::<u64, ()> { ..Default::default() };

eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
};
eng
}

#[test]
fn test_update_snapshot_no_update() -> anyhow::Result<()> {
// snapshot will not be updated because of equal or less `last_log_id`.
let mut eng = eng();

let got = eng.snapshot_handler().update_snapshot(SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
});

assert_eq!(false, got);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.state.snapshot_meta
);

assert_eq!(
MetricsChangeFlags {
replication: false,
local_data: false,
cluster: false,
},
eng.output.metrics_flags
);

assert_eq!(0, eng.output.commands.len());

Ok(())
}

#[test]
fn test_update_snapshot_updated() -> anyhow::Result<()> {
// snapshot will be updated to a new one with greater `last_log_id`.
let mut eng = eng();

let got = eng.snapshot_handler().update_snapshot(SnapshotMeta {
last_log_id: Some(log_id(2, 3)),
last_membership: EffectiveMembership::new(Some(log_id(2, 2)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
});

assert_eq!(true, got);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(2, 3)),
last_membership: EffectiveMembership::new(Some(log_id(2, 2)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.state.snapshot_meta
);

assert_eq!(
MetricsChangeFlags {
replication: false,
local_data: true,
cluster: false,
},
eng.output.metrics_flags
);

assert_eq!(0, eng.output.commands.len());

Ok(())
}
}
File renamed without changes.
3 changes: 1 addition & 2 deletions openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

mod command;
mod engine_impl;
mod handler;
mod log_id_list;
mod vote_handler;

#[cfg(test)] mod calc_purge_upto_test;
#[cfg(test)] mod elect_test;
Expand All @@ -50,7 +50,6 @@ mod vote_handler;
#[cfg(test)] mod update_committed_membership_test;
#[cfg(test)] mod update_effective_membership_test;
#[cfg(test)] mod update_progress_test;
#[cfg(test)] mod update_snapshot_test;

pub(crate) use command::Command;
pub(crate) use engine_impl::Engine;
Expand Down
108 changes: 0 additions & 108 deletions openraft/src/engine/update_snapshot_test.rs

This file was deleted.

0 comments on commit e1a548c

Please sign in to comment.