From f168696ba44c8af2a9106cfcde3ccb0d7c62d46a Mon Sep 17 00:00:00 2001 From: drdr xp Date: Mon, 23 Aug 2021 20:52:20 +0800 Subject: [PATCH] change: rename RaftStorage::Snapshot to RaftStorage::SnapsthoData --- async-raft/src/core/install_snapshot.rs | 4 ++-- async-raft/src/core/mod.rs | 6 +++--- async-raft/src/core/replication.rs | 4 ++-- async-raft/src/replication/mod.rs | 14 +++++++------- async-raft/src/storage.rs | 18 ++++++++++++------ memstore/src/lib.rs | 14 +++++++++----- 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index e99dd5bac..747e78bfb 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -138,7 +138,7 @@ impl, S: RaftStorage> Ra &mut self, req: InstallSnapshotRequest, mut offset: u64, - mut snapshot: Box, + mut snapshot: Box, ) -> RaftResult { let id = req.meta.snapshot_id.clone(); @@ -176,7 +176,7 @@ impl, S: RaftStorage> Ra async fn finalize_snapshot_installation( &mut self, req: InstallSnapshotRequest, - mut snapshot: Box, + mut snapshot: Box, ) -> RaftResult<()> { snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?; diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index 53fa29bd4..398740a59 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -113,7 +113,7 @@ pub struct RaftCore, S: RaftSt last_log_id: LogId, /// The node's current snapshot state. - snapshot_state: Option>, + snapshot_state: Option>, /// The log id upto which the current snapshot includes, inclusive, if a snapshot exists. /// @@ -608,10 +608,10 @@ struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: Raf pub leader_metrics: LeaderMetrics, /// The stream of events coming from replication streams. - pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent, Span)>, + pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent, Span)>, /// The cloneable sender channel for replication stream events. - pub(super) replication_tx: mpsc::UnboundedSender<(ReplicaEvent, Span)>, + pub(super) replication_tx: mpsc::UnboundedSender<(ReplicaEvent, Span)>, /// A buffer of client requests which have been appended locally and are awaiting to be committed to the cluster. pub(super) awaiting_committed: Vec>, diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 4b667a2ad..9f0a5dfd7 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -49,7 +49,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Handle a replication event coming from one of the replication streams. #[tracing::instrument(level = "trace", skip(self, event))] - pub(super) async fn handle_replica_event(&mut self, event: ReplicaEvent) { + pub(super) async fn handle_replica_event(&mut self, event: ReplicaEvent) { let res = match event { ReplicaEvent::RateUpdate { target, is_line_rate } => self.handle_rate_update(target, is_line_rate).await, ReplicaEvent::RevertToFollower { target, term } => self.handle_revert_to_follower(target, term).await, @@ -277,7 +277,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage async fn handle_needs_snapshot( &mut self, _: NodeId, - tx: oneshot::Sender>, + tx: oneshot::Sender>, ) -> RaftResult<()> { // Ensure snapshotting is configured, else do nothing. let threshold = match &self.core.config.snapshot_policy { diff --git a/async-raft/src/replication/mod.rs b/async-raft/src/replication/mod.rs index 898a0cfcd..623d02800 100644 --- a/async-raft/src/replication/mod.rs +++ b/async-raft/src/replication/mod.rs @@ -60,7 +60,7 @@ impl ReplicationStream { commit_index: u64, network: Arc, storage: Arc, - replication_tx: mpsc::UnboundedSender<(ReplicaEvent, Span)>, + replication_tx: mpsc::UnboundedSender<(ReplicaEvent, Span)>, ) -> Self { ReplicationCore::spawn( id, @@ -93,7 +93,7 @@ struct ReplicationCore, S: Raf term: u64, /// A channel for sending events to the Raft node. - raft_core_tx: mpsc::UnboundedSender<(ReplicaEvent, Span)>, + raft_core_tx: mpsc::UnboundedSender<(ReplicaEvent, Span)>, /// A channel for receiving events from the Raft node. repl_rx: mpsc::UnboundedReceiver<(RaftEvent, Span)>, @@ -179,7 +179,7 @@ impl, S: RaftStorage> Re commit_index: u64, network: Arc, storage: Arc, - raft_core_tx: mpsc::UnboundedSender<(ReplicaEvent, Span)>, + raft_core_tx: mpsc::UnboundedSender<(ReplicaEvent, Span)>, ) -> ReplicationStream { // other component to ReplicationStream let (repl_tx, repl_rx) = mpsc::unbounded_channel(); @@ -801,8 +801,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage struct SnapshottingState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> { /// An exclusive handle to the replication core. replication_core: &'a mut ReplicationCore, - snapshot: Option>, - snapshot_fetch_rx: Option>>, + snapshot: Option>, + snapshot_fetch_rx: Option>>, } impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> SnapshottingState<'a, D, R, N, S> { @@ -865,7 +865,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// If an error comes up during processing, this routine should simple be called again after /// issuing a new request to the storage layer. #[tracing::instrument(level = "trace", skip(self, rx))] - async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver>) { + async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver>) { loop { let span = tracing::debug_span!("FFF:wait_for_snapshot"); let _ent = span.enter(); @@ -898,7 +898,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData) -> RaftResult<()> { + async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData) -> RaftResult<()> { let end = snapshot.snapshot.seek(SeekFrom::End(0)).await?; let mut offset = 0; diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index 8a3383a7f..bd690d2b5 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -26,6 +26,8 @@ pub struct SnapshotMeta { /// The latest membership configuration covered by the snapshot. pub membership: MembershipConfig, + /// To identify a snapshot when transferring. + /// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be different in bytes. pub snapshot_id: SnapshotId, } @@ -99,7 +101,8 @@ where /// /// See the [storage chapter of the guide](https://async-raft.github.io/async-raft/storage.html) /// for details on where and how this is used. - type Snapshot: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static; + type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static; + /// The error type used to indicate to Raft that shutdown is needed when calling the /// `apply_entry_to_state_machine` method. /// @@ -211,7 +214,7 @@ where /// log covered by the snapshot. /// /// Errors returned from this method will be logged and retried. - async fn do_log_compaction(&self) -> Result>; + async fn do_log_compaction(&self) -> Result>; /// Create a new blank snapshot, returning a writable handle to the snapshot object. /// @@ -220,7 +223,7 @@ where /// for details on log compaction / snapshotting. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn create_snapshot(&self) -> Result>; + async fn create_snapshot(&self) -> Result>; /// Finalize the installation of a snapshot which has finished streaming from the cluster leader. /// @@ -239,7 +242,11 @@ where /// made to the snapshot. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn finalize_snapshot_installation(&self, meta: &SnapshotMeta, snapshot: Box) -> Result<()>; + async fn finalize_snapshot_installation( + &self, + meta: &SnapshotMeta, + snapshot: Box, + ) -> Result<()>; /// Get a readable handle to the current snapshot, along with its metadata. /// @@ -254,7 +261,7 @@ where /// of the snapshot, which should be decoded for creating this method's response data. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn get_current_snapshot(&self) -> Result>>; + async fn get_current_snapshot(&self) -> Result>>; } /// APIs for debugging a store. @@ -266,4 +273,3 @@ pub trait RaftStorageDebug { /// Get the current hard state for testing purposes. async fn read_hard_state(&self) -> Option; } - diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index f2bd34e54..925f5cce4 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -192,7 +192,7 @@ impl MemStore { #[async_trait] impl RaftStorage for MemStore { - type Snapshot = Cursor>; + type SnapshotData = Cursor>; type ShutdownError = ShutdownError; #[tracing::instrument(level = "trace", skip(self))] @@ -344,7 +344,7 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn do_log_compaction(&self) -> Result> { + async fn do_log_compaction(&self) -> Result> { let (data, last_applied_log); let membership_config; { @@ -397,12 +397,16 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn create_snapshot(&self) -> Result> { + async fn create_snapshot(&self) -> Result> { Ok(Box::new(Cursor::new(Vec::new()))) } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn finalize_snapshot_installation(&self, meta: &SnapshotMeta, snapshot: Box) -> Result<()> { + async fn finalize_snapshot_installation( + &self, + meta: &SnapshotMeta, + snapshot: Box, + ) -> Result<()> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -444,7 +448,7 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&self) -> Result>> { + async fn get_current_snapshot(&self) -> Result>> { match &*self.current_snapshot.read().await { Some(snapshot) => { // TODO(xp): try not to clone the entire data.