Skip to content

Commit

Permalink
Change: add last_membership to SnapshotMeta
Browse files Browse the repository at this point in the history
Raft actually has two slots of logs: the membership log and the application log.

In order to simplify the design and implementation, raft stores these two slots of logs in one slot.

A log snapshot, should contain the information about these two slots:
- last applied business log: `last_applied`,
- and last applied membership log: `last_applied_membership`, which is not included in `SnapshotMeta` and should be added.

With `last_applied_membership`, the `Engine` will be able to handle the install-snapshot event without accessing the `RaftStorage`.

- Fix: #334
  • Loading branch information
drmingdrmer committed May 12, 2022
1 parent 26eb737 commit 1f645fe
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 2 deletions.
6 changes: 5 additions & 1 deletion example-raft-kv/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
async fn build_snapshot(
&mut self,
) -> Result<Snapshot<ExampleTypeConfig, Cursor<Vec<u8>>>, StorageError<ExampleNodeId>> {
let (data, last_applied_log);
let data;
let last_applied_log;
let last_membership;

{
// Serialize the data of the state machine.
Expand All @@ -141,6 +143,7 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
.map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?;

last_applied_log = state_machine.last_applied_log;
last_membership = state_machine.last_membership.clone();
}

let last_applied_log = match last_applied_log {
Expand All @@ -163,6 +166,7 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor

let meta = SnapshotMeta {
last_log_id: last_applied_log,
last_membership,
snapshot_id,
};

Expand Down
6 changes: 5 additions & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ impl RaftLogReader<Config> for Arc<MemStore> {
impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<MemStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<Config, Cursor<Vec<u8>>>, StorageError<MemNodeId>> {
let (data, last_applied_log);
let data;
let last_applied_log;
let last_membership;

{
// Serialize the data of the state machine.
Expand All @@ -197,6 +199,7 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<MemStore> {
.map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?;

last_applied_log = sm.last_applied_log;
last_membership = sm.last_membership.clone();
}

let last_applied_log = match last_applied_log {
Expand All @@ -221,6 +224,7 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<MemStore> {

let meta = SnapshotMeta {
last_log_id: last_applied_log,
last_membership,
snapshot_id,
};

Expand Down
3 changes: 3 additions & 0 deletions openraft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub struct SnapshotMeta<NID: NodeId> {
// Log entries upto which this snapshot includes, inclusive.
pub last_log_id: LogId<NID>,

// The last applied membership config.
pub last_membership: EffectiveMembership<NID>,

/// To identify a snapshot when transferring.
/// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be different in bytes.
pub snapshot_id: SnapshotId,
Expand Down
67 changes: 67 additions & 0 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::LeaderId;
use crate::LogId;
use crate::Membership;
use crate::NodeId;
use crate::RaftSnapshotBuilder;
use crate::RaftStorage;
use crate::RaftTypeConfig;
use crate::StorageError;
Expand Down Expand Up @@ -99,6 +100,8 @@ where
run_fut(Suite::last_applied_state(builder))?;
run_fut(Suite::delete_logs(builder))?;
run_fut(Suite::append_to_log(builder))?;
run_fut(Suite::snapshot_meta(builder))?;

// run_fut(Suite::apply_single(builder))?;
// run_fut(Suite::apply_multi(builder))?;

Expand Down Expand Up @@ -1019,6 +1022,59 @@ where
Ok(())
}

pub async fn snapshot_meta(builder: &B) -> Result<(), StorageError<C::NodeId>> {
let mut store = builder.build().await;

tracing::info!("--- just initialized");
{
store
.apply_to_state_machine(&[
//
&Entry {
log_id: log_id(0, 0),
payload: EntryPayload::Membership(Membership::new(vec![btreeset! {1,2}], None)),
},
])
.await?;

let mut b = store.get_snapshot_builder().await;
let snap = b.build_snapshot().await?;
let meta = snap.meta;
assert_eq!(log_id(0, 0), meta.last_log_id);
assert_eq!(Some(log_id(0, 0)), meta.last_membership.log_id);
assert_eq!(
Membership::new(vec![btreeset! {1,2}], None),
meta.last_membership.membership
);
}

tracing::info!("--- one app log, one membership log");
{
store
.apply_to_state_machine(&[
//
&blank(1, 1),
&Entry {
log_id: log_id(2, 2),
payload: EntryPayload::Membership(Membership::new(vec![btreeset! {3,4}], None)),
},
])
.await?;

let mut b = store.get_snapshot_builder().await;
let snap = b.build_snapshot().await?;
let meta = snap.meta;
assert_eq!(log_id(2, 2), meta.last_log_id);
assert_eq!(Some(log_id(2, 2)), meta.last_membership.log_id);
assert_eq!(
Membership::new(vec![btreeset! {3,4}], None),
meta.last_membership.membership
);
}

Ok(())
}

// pub async fn apply_single(builder: &B) -> Result<(), StorageError<C::NodeId>> {
// let mut store = builder.build().await;
//
Expand Down Expand Up @@ -1744,6 +1800,17 @@ where
}
}

fn log_id<NID: NodeId>(term: u64, index: u64) -> LogId<NID>
where NID: From<u64> {
LogId {
leader_id: LeaderId {
term,
node_id: NODE_ID.into(),
},
index,
}
}

/// Create a blank log entry for test
fn blank<C: RaftTypeConfig>(term: u64, index: u64) -> Entry<C>
where C::NodeId: From<u64> {
Expand Down
1 change: 1 addition & 0 deletions openraft/tests/api_install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
leader_id: LeaderId::new(1, 0),
index: 0,
},
last_membership: Default::default(),
},
offset: 0,
data: vec![1, 2, 3],
Expand Down

0 comments on commit 1f645fe

Please sign in to comment.