Skip to content

Commit

Permalink
change: add CurrentSnapshotData.meta: SnapshotMeta, which is a contai…
Browse files Browse the repository at this point in the history
…ner of all meta data of a snapshot: last log id included, membership etc.
  • Loading branch information
drmingdrmer committed Jul 13, 2021
1 parent fc8e92a commit 7792ccc
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 39 deletions.
6 changes: 3 additions & 3 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if let Some(snapshot) =
self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?
{
self.snapshot_last_log_id = snapshot.last_log_id;
self.snapshot_last_log_id = snapshot.meta.last_log_id;
self.report_metrics(Update::Ignore);
}

Expand Down Expand Up @@ -452,8 +452,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
match res {
Ok(res) => match res {
Ok(snapshot) => {
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.last_log_id));
let _ = chan_tx.send(snapshot.last_log_id.index); // This will always succeed.
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.meta.last_log_id));
let _ = chan_tx.send(snapshot.meta.last_log_id.index); // This will always succeed.
}
Err(err) => {
tracing::error!({error=%err}, "error while generating snapshot");
Expand Down
2 changes: 1 addition & 1 deletion async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// If snapshot exists, ensure its distance from the leader's last log index is <= half
// of the configured snapshot threshold, else create a new snapshot.
if snapshot_is_within_half_of_threshold(
&snapshot.last_log_id.index,
&snapshot.meta.last_log_id.index,
&self.core.last_log_id.index,
&threshold,
) {
Expand Down
2 changes: 1 addition & 1 deletion async-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub mod raft;
mod raft_types;
mod replication;
pub mod storage;

pub use async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand All @@ -35,6 +34,7 @@ pub use crate::raft_types::SnapshotSegmentId;
pub use crate::raft_types::Update;
pub use crate::replication::ReplicationMetrics;
pub use crate::storage::RaftStorage;
pub use crate::storage::SnapshotMeta;

/// A Raft node's ID.
pub type NodeId = u64;
Expand Down
8 changes: 4 additions & 4 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,10 +829,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::Snapshot>) -> RaftResult<()> {
let snapshot_id = snapshot.snapshot_id.clone();
let snapshot_id = snapshot.meta.snapshot_id.clone();
let mut offset = 0;
self.core.next_index = snapshot.last_log_id.index + 1;
self.core.matched = snapshot.last_log_id;
self.core.next_index = snapshot.meta.last_log_id.index + 1;
self.core.matched = snapshot.meta.last_log_id;
let mut buf = Vec::with_capacity(self.core.config.snapshot_max_chunk_size as usize);
loop {
// Build the RPC.
Expand All @@ -843,7 +843,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
term: self.core.term,
leader_id: self.core.id,
snapshot_id: snapshot_id.clone(),
last_log_id: snapshot.last_log_id,
last_log_id: snapshot.meta.last_log_id,
offset,
data: Vec::from(&buf[..nread]),
done,
Expand Down
14 changes: 10 additions & 4 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ use crate::AppDataResponse;
use crate::LogId;
use crate::NodeId;

/// The data associated with the current snapshot.
pub struct CurrentSnapshotData<S>
where S: AsyncRead + AsyncSeek + Send + Unpin + 'static
{
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SnapshotMeta {
// Log entries upto which this snapshot includes, inclusive.
pub last_log_id: LogId,

/// The latest membership configuration covered by the snapshot.
pub membership: MembershipConfig,

pub snapshot_id: SnapshotId,
}

/// The data associated with the current snapshot.
pub struct CurrentSnapshotData<S>
where S: AsyncRead + AsyncSeek + Send + Unpin + 'static
{
/// metadata of a snapshot
pub meta: SnapshotMeta,

/// A read handle to the associated snapshot.
pub snapshot: Box<S>,
Expand Down
16 changes: 8 additions & 8 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,27 +501,27 @@ impl RaftRouter {
.unwrap_or_else(|| panic!("no snapshot present for node {}", id));
match index_test {
ValueTest::Exact(index) => assert_eq!(
&snap.last_log_id.index, index,
&snap.meta.last_log_id.index, index,
"expected node {} to have snapshot with index {}, got {}",
id, index, snap.last_log_id.index
id, index, snap.meta.last_log_id.index
),
ValueTest::Range(range) => assert!(
range.contains(&snap.last_log_id.index),
range.contains(&snap.meta.last_log_id.index),
"expected node {} to have snapshot within range {:?}, got {}",
id,
range,
snap.last_log_id.index
snap.meta.last_log_id.index
),
}
assert_eq!(
&snap.last_log_id.term, term,
&snap.meta.last_log_id.term, term,
"expected node {} to have snapshot with term {}, got {}",
id, term, snap.last_log_id.term
id, term, snap.meta.last_log_id.term
);
assert_eq!(
&snap.membership, cfg,
&snap.meta.membership, cfg,
"expected node {} to have membership config {:?}, got {:?}",
id, cfg, snap.membership
id, cfg, snap.meta.membership
);
}
let sm = storage.get_state_machine().await;
Expand Down
33 changes: 15 additions & 18 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_raft::AppDataResponse;
use async_raft::LogId;
use async_raft::NodeId;
use async_raft::RaftStorage;
use async_raft::SnapshotId;
use async_raft::SnapshotMeta;
use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;
Expand Down Expand Up @@ -67,13 +67,8 @@ pub enum ShutdownError {
/// The application snapshot type which the `MemStore` works with.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MemStoreSnapshot {
/// The last log covered by this snapshot.
pub last_log_id: LogId,
pub meta: SnapshotMeta,

/// The last membership config included in this snapshot.
pub membership: MembershipConfig,

pub snapshot_id: SnapshotId,
/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
}
Expand Down Expand Up @@ -339,12 +334,14 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

snapshot_id = format!("{}-{}-{}", term, last_applied_log, snapshot_idx);
let snapshot = MemStoreSnapshot {
last_log_id: LogId {
term,
index: last_applied_log,
meta: SnapshotMeta {
last_log_id: LogId {
term,
index: last_applied_log,
},
snapshot_id: snapshot_id.clone(),
membership: membership_config.clone(),
},
snapshot_id: snapshot_id.clone(),
membership: membership_config.clone(),
data,
};
snapshot_bytes = serde_json::to_vec(&snapshot)?;
Expand All @@ -353,9 +350,11 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

tracing::trace!({ snapshot_size = snapshot_bytes.len() }, "log compaction complete");
Ok(CurrentSnapshotData {
last_log_id: (term, last_applied_log).into(),
membership: membership_config.clone(),
snapshot_id,
meta: SnapshotMeta {
last_log_id: (term, last_applied_log).into(),
membership: membership_config.clone(),
snapshot_id,
},
snapshot: Box::new(Cursor::new(snapshot_bytes)),
})
}
Expand Down Expand Up @@ -435,9 +434,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Some(snapshot) => {
let reader = serde_json::to_vec(&snapshot)?;
Ok(Some(CurrentSnapshotData {
last_log_id: snapshot.last_log_id,
membership: snapshot.membership.clone(),
snapshot_id: snapshot.snapshot_id.clone(),
meta: snapshot.meta.clone(),
snapshot: Box::new(Cursor::new(reader)),
}))
}
Expand Down

0 comments on commit 7792ccc

Please sign in to comment.