Skip to content

Commit

Permalink
Change: ErrorSubject::Snapshot(SnapshotSignature)
Browse files Browse the repository at this point in the history
Change `ErrorSubject::Snapshot(SnapshotMeta)` to `ErrorSubject::Snapshot(SnapshotSignature)`.

`SnapshotSignature` is the same as `SnapshotMeta` except it does not include
`Membership` information.
This way errors do not have to depend on type `Node`, which is used in
`Membership` and it is a application specific type.

Then when a user-defined generic type `NodeData` is introduced, error
types do not need to change.

- Part of: #480
  • Loading branch information
drmingdrmer committed Aug 2, 2022
1 parent 984a9ba commit 565b692
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 17 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
let updated_state_machine: ExampleStateMachine =
serde_json::from_slice(&new_snapshot.data).map_err(|e| {
StorageIOError::new(
ErrorSubject::Snapshot(new_snapshot.meta.clone()),
ErrorSubject::Snapshot(new_snapshot.meta.signature()),
ErrorVerb::Read,
AnyError::new(&e),
)
Expand Down
8 changes: 6 additions & 2 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ impl ExampleStore {
self.db
.put_cf(self.store(), b"snapshot", serde_json::to_vec(&snap).unwrap().as_slice())
.map_err(|e| StorageError::IO {
source: StorageIOError::new(ErrorSubject::Snapshot(snap.meta), ErrorVerb::Write, AnyError::new(&e)),
source: StorageIOError::new(
ErrorSubject::Snapshot(snap.meta.signature()),
ErrorVerb::Write,
AnyError::new(&e),
),
})?;
Ok(())
}
Expand Down Expand Up @@ -538,7 +542,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
let updated_state_machine: SerializableExampleStateMachine = serde_json::from_slice(&new_snapshot.data)
.map_err(|e| {
StorageIOError::new(
ErrorSubject::Snapshot(new_snapshot.meta.clone()),
ErrorSubject::Snapshot(new_snapshot.meta.signature()),
ErrorVerb::Read,
AnyError::new(&e),
)
Expand Down
2 changes: 1 addition & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
{
let new_sm: MemStoreStateMachine = serde_json::from_slice(&new_snapshot.data).map_err(|e| {
StorageIOError::new(
ErrorSubject::Snapshot(new_snapshot.meta.clone()),
ErrorSubject::Snapshot(new_snapshot.meta.signature()),
ErrorVerb::Read,
AnyError::new(&e),
)
Expand Down
15 changes: 9 additions & 6 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let mut snapshot = self.storage.begin_receiving_snapshot().await?;
snapshot.as_mut().write_all(&req.data).await.map_err(|e| StorageError::IO {
source: StorageIOError::new(
ErrorSubject::Snapshot(req.meta.clone()),
ErrorSubject::Snapshot(req.meta.signature()),
ErrorVerb::Write,
AnyError::new(&e),
),
Expand Down Expand Up @@ -156,7 +156,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
if let Err(err) = snapshot.as_mut().seek(SeekFrom::Start(req.offset)).await {
self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot });
return Err(StorageError::from_io_error(
ErrorSubject::Snapshot(req.meta.clone()),
ErrorSubject::Snapshot(req.meta.signature()),
ErrorVerb::Seek,
err,
)
Expand All @@ -168,9 +168,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// Write the next segment & update offset.
if let Err(err) = snapshot.as_mut().write_all(&req.data).await {
self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot });
return Err(
StorageError::from_io_error(ErrorSubject::Snapshot(req.meta.clone()), ErrorVerb::Write, err).into(),
);
return Err(StorageError::from_io_error(
ErrorSubject::Snapshot(req.meta.signature()),
ErrorVerb::Write,
err,
)
.into());
}
offset += req.data.len() as u64;

Expand Down Expand Up @@ -198,7 +201,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

snapshot.as_mut().shutdown().await.map_err(|e| StorageError::IO {
source: StorageIOError::new(
ErrorSubject::Snapshot(req.meta.clone()),
ErrorSubject::Snapshot(req.meta.signature()),
ErrorVerb::Write,
AnyError::new(&e),
),
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
&mut self,
mut snapshot: Snapshot<C::NodeId, S::SnapshotData>,
) -> Result<(), ReplicationError<C::NodeId>> {
let err_x = || (ErrorSubject::Snapshot(snapshot.meta.clone()), ErrorVerb::Read);
let err_x = || (ErrorSubject::Snapshot(snapshot.meta.signature()), ErrorVerb::Read);

let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(err_x)?;

Expand Down
16 changes: 14 additions & 2 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! The Raft storage interface and data types.

mod helper;
mod snapshot_signature;
use std::fmt::Debug;
use std::ops::RangeBounds;

use async_trait::async_trait;
pub use helper::StorageHelper;
pub use snapshot_signature::SnapshotSignature;
use tokio::io::AsyncRead;
use tokio::io::AsyncSeek;
use tokio::io::AsyncWrite;
Expand All @@ -24,17 +26,27 @@ use crate::Vote;
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct SnapshotMeta<NID: NodeId> {
// Log entries upto which this snapshot includes, inclusive.
/// Log entries upto which this snapshot includes, inclusive.
pub last_log_id: LogId<NID>,

// The last applied membership config.
/// The last applied membership config.
pub last_membership: EffectiveMembership<NID>,

/// To identify a snapshot when transferring.
/// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be different in bytes.
pub snapshot_id: SnapshotId,
}

impl<NID: NodeId> SnapshotMeta<NID> {
pub fn signature(&self) -> SnapshotSignature<NID> {
SnapshotSignature {
last_log_id: Some(self.last_log_id),
last_membership_log_id: self.last_membership.log_id,
snapshot_id: self.snapshot_id.clone(),
}
}
}

/// The data associated with the current snapshot.
#[derive(Debug)]
pub struct Snapshot<NID, S>
Expand Down
17 changes: 17 additions & 0 deletions openraft/src/storage/snapshot_signature.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::LogId;
use crate::NodeId;
use crate::SnapshotId;

/// A small piece of information for identifying a snapshot and error tracing.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct SnapshotSignature<NID: NodeId> {
/// Log entries upto which this snapshot includes, inclusive.
pub last_log_id: Option<LogId<NID>>,

/// The last applied membership log id.
pub last_membership_log_id: Option<LogId<NID>>,

/// To identify a snapshot when transferring.
pub snapshot_id: SnapshotId,
}
4 changes: 2 additions & 2 deletions openraft/src/storage_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::ops::Bound;

use anyerror::AnyError;

use crate::storage::SnapshotSignature;
use crate::LogId;
use crate::NodeId;
use crate::SnapshotMeta;
use crate::Vote;

/// Convert error to StorageError::IO();
Expand Down Expand Up @@ -86,7 +86,7 @@ pub enum ErrorSubject<NID: NodeId> {
StateMachine,

/// Error happened when operating snapshot.
Snapshot(SnapshotMeta<NID>),
Snapshot(SnapshotSignature<NID>),

None,
}
Expand Down
8 changes: 6 additions & 2 deletions rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ impl RocksStore {
self.db
.put_cf(self.store(), b"snapshot", serde_json::to_vec(&snap).unwrap().as_slice())
.map_err(|e| StorageError::IO {
source: StorageIOError::new(ErrorSubject::Snapshot(snap.meta), ErrorVerb::Write, AnyError::new(&e)),
source: StorageIOError::new(
ErrorSubject::Snapshot(snap.meta.signature()),
ErrorVerb::Write,
AnyError::new(&e),
),
})?;
Ok(())
}
Expand Down Expand Up @@ -534,7 +538,7 @@ impl RaftStorage<Config> for Arc<RocksStore> {
let updated_state_machine: SerializableRocksStateMachine = serde_json::from_slice(&new_snapshot.data)
.map_err(|e| {
StorageIOError::new(
ErrorSubject::Snapshot(new_snapshot.meta.clone()),
ErrorSubject::Snapshot(new_snapshot.meta.signature()),
ErrorVerb::Read,
AnyError::new(&e),
)
Expand Down

0 comments on commit 565b692

Please sign in to comment.