Skip to content

Commit

Permalink
change: rename RaftStorage::Snapshot to RaftStorage::SnapsthoData
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 23, 2021
1 parent eb82266 commit f168696
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 25 deletions.
4 changes: 2 additions & 2 deletions async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
&mut self,
req: InstallSnapshotRequest,
mut offset: u64,
mut snapshot: Box<S::Snapshot>,
mut snapshot: Box<S::SnapshotData>,
) -> RaftResult<InstallSnapshotResponse> {
let id = req.meta.snapshot_id.clone();

Expand Down Expand Up @@ -176,7 +176,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
async fn finalize_snapshot_installation(
&mut self,
req: InstallSnapshotRequest,
mut snapshot: Box<S::Snapshot>,
mut snapshot: Box<S::SnapshotData>,
) -> RaftResult<()> {
snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?;

Expand Down
6 changes: 3 additions & 3 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
last_log_id: LogId,

/// The node's current snapshot state.
snapshot_state: Option<SnapshotState<S::Snapshot>>,
snapshot_state: Option<SnapshotState<S::SnapshotData>>,

/// The log id upto which the current snapshot includes, inclusive, if a snapshot exists.
///
Expand Down Expand Up @@ -608,10 +608,10 @@ struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: Raf
pub leader_metrics: LeaderMetrics,

/// The stream of events coming from replication streams.
pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent<S::Snapshot>, Span)>,
pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent<S::SnapshotData>, Span)>,

/// The cloneable sender channel for replication stream events.
pub(super) replication_tx: mpsc::UnboundedSender<(ReplicaEvent<S::Snapshot>, Span)>,
pub(super) replication_tx: mpsc::UnboundedSender<(ReplicaEvent<S::SnapshotData>, Span)>,

/// A buffer of client requests which have been appended locally and are awaiting to be committed to the cluster.
pub(super) awaiting_committed: Vec<ClientRequestEntry<D, R>>,
Expand Down
4 changes: 2 additions & 2 deletions async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

/// Handle a replication event coming from one of the replication streams.
#[tracing::instrument(level = "trace", skip(self, event))]
pub(super) async fn handle_replica_event(&mut self, event: ReplicaEvent<S::Snapshot>) {
pub(super) async fn handle_replica_event(&mut self, event: ReplicaEvent<S::SnapshotData>) {
let res = match event {
ReplicaEvent::RateUpdate { target, is_line_rate } => self.handle_rate_update(target, is_line_rate).await,
ReplicaEvent::RevertToFollower { target, term } => self.handle_revert_to_follower(target, term).await,
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
async fn handle_needs_snapshot(
&mut self,
_: NodeId,
tx: oneshot::Sender<CurrentSnapshotData<S::Snapshot>>,
tx: oneshot::Sender<CurrentSnapshotData<S::SnapshotData>>,
) -> RaftResult<()> {
// Ensure snapshotting is configured, else do nothing.
let threshold = match &self.core.config.snapshot_policy {
Expand Down
14 changes: 7 additions & 7 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<D: AppData> ReplicationStream<D> {
commit_index: u64,
network: Arc<N>,
storage: Arc<S>,
replication_tx: mpsc::UnboundedSender<(ReplicaEvent<S::Snapshot>, Span)>,
replication_tx: mpsc::UnboundedSender<(ReplicaEvent<S::SnapshotData>, Span)>,
) -> Self {
ReplicationCore::spawn(
id,
Expand Down Expand Up @@ -93,7 +93,7 @@ struct ReplicationCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: Raf
term: u64,

/// A channel for sending events to the Raft node.
raft_core_tx: mpsc::UnboundedSender<(ReplicaEvent<S::Snapshot>, Span)>,
raft_core_tx: mpsc::UnboundedSender<(ReplicaEvent<S::SnapshotData>, Span)>,

/// A channel for receiving events from the Raft node.
repl_rx: mpsc::UnboundedReceiver<(RaftEvent<D>, Span)>,
Expand Down Expand Up @@ -179,7 +179,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
commit_index: u64,
network: Arc<N>,
storage: Arc<S>,
raft_core_tx: mpsc::UnboundedSender<(ReplicaEvent<S::Snapshot>, Span)>,
raft_core_tx: mpsc::UnboundedSender<(ReplicaEvent<S::SnapshotData>, Span)>,
) -> ReplicationStream<D> {
// other component to ReplicationStream
let (repl_tx, repl_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -801,8 +801,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
struct SnapshottingState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
/// An exclusive handle to the replication core.
replication_core: &'a mut ReplicationCore<D, R, N, S>,
snapshot: Option<CurrentSnapshotData<S::Snapshot>>,
snapshot_fetch_rx: Option<oneshot::Receiver<CurrentSnapshotData<S::Snapshot>>>,
snapshot: Option<CurrentSnapshotData<S::SnapshotData>>,
snapshot_fetch_rx: Option<oneshot::Receiver<CurrentSnapshotData<S::SnapshotData>>>,
}

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> SnapshottingState<'a, D, R, N, S> {
Expand Down Expand Up @@ -865,7 +865,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// If an error comes up during processing, this routine should simple be called again after
/// issuing a new request to the storage layer.
#[tracing::instrument(level = "trace", skip(self, rx))]
async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver<CurrentSnapshotData<S::Snapshot>>) {
async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver<CurrentSnapshotData<S::SnapshotData>>) {
loop {
let span = tracing::debug_span!("FFF:wait_for_snapshot");
let _ent = span.enter();
Expand Down Expand Up @@ -898,7 +898,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::Snapshot>) -> RaftResult<()> {
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::SnapshotData>) -> RaftResult<()> {
let end = snapshot.snapshot.seek(SeekFrom::End(0)).await?;

let mut offset = 0;
Expand Down
18 changes: 12 additions & 6 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct SnapshotMeta {
/// The latest membership configuration covered by the snapshot.
pub membership: MembershipConfig,

/// To identify a snapshot when transferring.
/// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be different in bytes.
pub snapshot_id: SnapshotId,
}

Expand Down Expand Up @@ -99,7 +101,8 @@ where
///
/// See the [storage chapter of the guide](https://async-raft.github.io/async-raft/storage.html)
/// for details on where and how this is used.
type Snapshot: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static;
type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static;

/// The error type used to indicate to Raft that shutdown is needed when calling the
/// `apply_entry_to_state_machine` method.
///
Expand Down Expand Up @@ -211,7 +214,7 @@ where
/// log covered by the snapshot.
///
/// Errors returned from this method will be logged and retried.
async fn do_log_compaction(&self) -> Result<CurrentSnapshotData<Self::Snapshot>>;
async fn do_log_compaction(&self) -> Result<CurrentSnapshotData<Self::SnapshotData>>;

/// Create a new blank snapshot, returning a writable handle to the snapshot object.
///
Expand All @@ -220,7 +223,7 @@ where
/// for details on log compaction / snapshotting.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn create_snapshot(&self) -> Result<Box<Self::Snapshot>>;
async fn create_snapshot(&self) -> Result<Box<Self::SnapshotData>>;

/// Finalize the installation of a snapshot which has finished streaming from the cluster leader.
///
Expand All @@ -239,7 +242,11 @@ where
/// made to the snapshot.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn finalize_snapshot_installation(&self, meta: &SnapshotMeta, snapshot: Box<Self::Snapshot>) -> Result<()>;
async fn finalize_snapshot_installation(
&self,
meta: &SnapshotMeta,
snapshot: Box<Self::SnapshotData>,
) -> Result<()>;

/// Get a readable handle to the current snapshot, along with its metadata.
///
Expand All @@ -254,7 +261,7 @@ where
/// of the snapshot, which should be decoded for creating this method's response data.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn get_current_snapshot(&self) -> Result<Option<CurrentSnapshotData<Self::Snapshot>>>;
async fn get_current_snapshot(&self) -> Result<Option<CurrentSnapshotData<Self::SnapshotData>>>;
}

/// APIs for debugging a store.
Expand All @@ -266,4 +273,3 @@ pub trait RaftStorageDebug<SM> {
/// Get the current hard state for testing purposes.
async fn read_hard_state(&self) -> Option<HardState>;
}

14 changes: 9 additions & 5 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl MemStore {

#[async_trait]
impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
type Snapshot = Cursor<Vec<u8>>;
type SnapshotData = Cursor<Vec<u8>>;
type ShutdownError = ShutdownError;

#[tracing::instrument(level = "trace", skip(self))]
Expand Down Expand Up @@ -344,7 +344,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn do_log_compaction(&self) -> Result<CurrentSnapshotData<Self::Snapshot>> {
async fn do_log_compaction(&self) -> Result<CurrentSnapshotData<Self::SnapshotData>> {
let (data, last_applied_log);
let membership_config;
{
Expand Down Expand Up @@ -397,12 +397,16 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn create_snapshot(&self) -> Result<Box<Self::Snapshot>> {
async fn create_snapshot(&self) -> Result<Box<Self::SnapshotData>> {
Ok(Box::new(Cursor::new(Vec::new())))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn finalize_snapshot_installation(&self, meta: &SnapshotMeta, snapshot: Box<Self::Snapshot>) -> Result<()> {
async fn finalize_snapshot_installation(
&self,
meta: &SnapshotMeta,
snapshot: Box<Self::SnapshotData>,
) -> Result<()> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
"decoding snapshot for installation"
Expand Down Expand Up @@ -444,7 +448,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(&self) -> Result<Option<CurrentSnapshotData<Self::Snapshot>>> {
async fn get_current_snapshot(&self) -> Result<Option<CurrentSnapshotData<Self::SnapshotData>>> {
match &*self.current_snapshot.read().await {
Some(snapshot) => {
// TODO(xp): try not to clone the entire data.
Expand Down

0 comments on commit f168696

Please sign in to comment.