Skip to content

Commit

Permalink
change: SnapshotPointer do not store any info.
Browse files Browse the repository at this point in the history
Use SnapshotPointer to store membership is a bad idea.
It brings in troubles proving the consistency, e.g.:

- When concurrent `do_log_compaction()` is called(it is not
  possible for now, may be possible in future. The correctness proof
  involving multiple component is a nightmare.)

- Proof of correctness of consistency between
  `StateMachine.last_membership` and `SnapshotPointer.membership` is
  complicated.

What we need is actually:
- At least one committed log is left in the log storage,
- and info in the purged log must be present in the state machine, e.g.
  membership
  • Loading branch information
drmingdrmer committed Aug 24, 2021
1 parent daf2ed8 commit a18b98f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 31 deletions.
2 changes: 1 addition & 1 deletion async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
EntryPayload::Blank => {}
EntryPayload::Normal(_) => {}
EntryPayload::SnapshotPointer(_) => {}
EntryPayload::SnapshotPointer => {}
}
}

Expand Down
26 changes: 4 additions & 22 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
match res {
Ok(v) => {
if let Err(ref e) = v {
tracing::error!("error Raft::client_write: {}", e);
tracing::error!("error Raft::client_write: {:?}", e);
}
v
}
Expand Down Expand Up @@ -515,10 +515,7 @@ impl<D: AppData> Entry<D> {
pub fn new_snapshot_pointer(meta: &SnapshotMeta) -> Self {
Entry {
log_id: meta.last_log_id,
payload: EntryPayload::SnapshotPointer(EntrySnapshotPointer {
id: meta.snapshot_id.clone(),
membership: meta.membership.clone(),
}),
payload: EntryPayload::SnapshotPointer,
}
}
}
Expand All @@ -534,7 +531,7 @@ pub enum EntryPayload<D: AppData> {
/// A config change log entry.
ConfigChange(EntryConfigChange),
/// An entry which points to a snapshot.
SnapshotPointer(EntrySnapshotPointer),
SnapshotPointer,
}

impl<D: AppData> MessageSummary for EntryPayload<D> {
Expand All @@ -545,9 +542,7 @@ impl<D: AppData> MessageSummary for EntryPayload<D> {
EntryPayload::ConfigChange(c) => {
format!("config-change: {:?}", c.membership)
}
EntryPayload::SnapshotPointer(sp) => {
format!("snapshot-pointer: {:?}", sp)
}
EntryPayload::SnapshotPointer => "snapshot-pointer".to_string(),
}
}
}
Expand All @@ -567,19 +562,6 @@ pub struct EntryConfigChange {
pub membership: MembershipConfig,
}

/// A log entry pointing to a snapshot.
///
/// This will only be present when read from storage. An entry of this type will never be
/// transmitted from a leader during replication, an `InstallSnapshotRequest`
/// RPC will be sent instead.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct EntrySnapshotPointer {
/// The ID of the snapshot, which is application specific, and probably only meaningful to the storage layer.
pub id: String,
/// The cluster's membership config covered by this snapshot.
pub membership: MembershipConfig,
}

//////////////////////////////////////////////////////////////////////////////////////////////////

/// The membership configuration of the cluster.
Expand Down
15 changes: 15 additions & 0 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::config::SnapshotPolicy;
use crate::error::RaftResult;
use crate::raft::AppendEntriesRequest;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::InstallSnapshotRequest;
use crate::storage::Snapshot;
use crate::AppData;
Expand Down Expand Up @@ -684,6 +685,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
};

for entry in entries.iter() {
if let EntryPayload::SnapshotPointer = entry.payload {
self.replication_core.target_state = TargetReplState::Snapshotting;
return;
}
}

// Prepend.
self.replication_core.outbound_buffer.reverse();
self.replication_core.outbound_buffer.extend(entries.into_iter().rev().map(OutboundEntry::Raw));
Expand Down Expand Up @@ -790,6 +798,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
};

for entry in entries.iter() {
if let EntryPayload::SnapshotPointer = entry.payload {
self.replication_core.target_state = TargetReplState::Snapshotting;
return;
}
}

self.replication_core.outbound_buffer.extend(entries.into_iter().map(OutboundEntry::Raw));
}
}
Expand Down
23 changes: 15 additions & 8 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ impl MemStore {
{
it.find_map(|entry| match &entry.payload {
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
EntryPayload::SnapshotPointer(snap) => Some(snap.membership.clone()),
_ => None,
})
}
Expand All @@ -181,6 +180,13 @@ impl MemStore {
}
};

// Find membership stored in state machine.

let membership = match membership {
None => self.sm.read().await.last_membership.clone(),
Some(x) => Some(x),
};

// Otherwise, create a default one.

Ok(match membership {
Expand Down Expand Up @@ -247,6 +253,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

#[tracing::instrument(level = "trace", skip(self))]
async fn delete_logs_from(&self, start: u64, stop: Option<u64>) -> Result<()> {
// TODO(xp): never delete the last applied log
if stop.as_ref().map(|stop| &start > stop).unwrap_or(false) {
tracing::error!("delete_logs_from: invalid request, start({}) > stop({:?})", start, stop);
return Ok(());
Expand Down Expand Up @@ -289,7 +296,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

match entry.payload {
EntryPayload::Blank => res.push(ClientResponse(None)),
EntryPayload::SnapshotPointer(_) => res.push(ClientResponse(None)),
EntryPayload::SnapshotPointer => res.push(ClientResponse(None)),
EntryPayload::Normal(ref norm) => {
let data = &norm.data;
if let Some((serial, r)) = sm.client_serial_responses.get(&data.client) {
Expand Down Expand Up @@ -335,7 +342,9 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
{
let mut log = self.log.write().await;
let mut current_snapshot = self.current_snapshot.write().await;
*log = log.split_off(&(last_applied_log.index + 1));

// Leaves at least one log or replication can not find out the mismatched log.
*log = log.split_off(&last_applied_log.index);

let snapshot_id = format!("{}-{}-{}", last_applied_log.term, last_applied_log.index, snapshot_idx);

Expand All @@ -349,10 +358,6 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
meta: meta.clone(),
data: data.clone(),
};
log.insert(
snapshot.meta.last_log_id.index,
Entry::new_snapshot_pointer(&snapshot.meta),
);

*current_snapshot = Some(snapshot);
} // Release log & snapshot write locks.
Expand Down Expand Up @@ -397,8 +402,10 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
let mut log = self.log.write().await;

// Remove logs that are included in the snapshot.
*log = log.split_off(&(meta.last_log_id.index + 1));
// Leave at least one log or the replication can not find out the mismatched log.
*log = log.split_off(&meta.last_log_id.index);

// In case there are no log at all, a marker log need to be added to indicate the last log.
log.insert(meta.last_log_id.index, Entry::new_snapshot_pointer(&meta));
}

Expand Down

0 comments on commit a18b98f

Please sign in to comment.