From 21aa051363fd347084cc09c091e8df8b0c23ce29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 6 Dec 2022 13:56:33 +0800 Subject: [PATCH 1/5] Refactor: add does_replication_session_match() to assert replication session id matches --- openraft/src/core/raft_core.rs | 42 +++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 805ded37d..5f5385aa8 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1161,7 +1161,7 @@ impl, S: RaftStorage> RaftCore { - if self.does_vote_match(vote, "VoteResponse") { + if self.does_vote_match(&vote, "VoteResponse") { self.handle_vote_resp(resp, target).await?; } } @@ -1291,7 +1291,7 @@ impl, S: RaftStorage> RaftCore { - if self.does_vote_match(vote, "HigherVote") { + if self.does_vote_match(&vote, "HigherVote") { // Rejected vote change is ok. let _ = self.engine.handle_vote_change(&higher); self.run_engine_commands::>(&[]).await?; @@ -1303,17 +1303,15 @@ impl, S: RaftStorage> RaftCore { - if self.does_vote_match(session_id.vote, "UpdateReplicationMatched") { - // If membership changes, ignore the message. - // There is chance delayed message reports a wrong state. - if session_id.membership_log_id == self.engine.state.membership_state.effective.log_id { - self.handle_update_matched(target, result).await?; - } + // If vote or membership changes, ignore the message. + // There is chance delayed message reports a wrong state. + if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") { + self.handle_update_matched(target, result).await?; } } RaftMsg::NeedsSnapshot { target: _, tx, vote } => { - if self.does_vote_match(vote, "NeedsSnapshot") { + if self.does_vote_match(&vote, "NeedsSnapshot") { self.handle_needs_snapshot(tx).await?; } } @@ -1382,8 +1380,8 @@ impl, S: RaftStorage> RaftCore, msg: impl Display) -> bool { - if vote != self.engine.state.vote { + fn does_vote_match(&self, vote: &Vote, msg: impl Display) -> bool { + if vote != &self.engine.state.vote { tracing::warn!( "vote changed: msg sent by: {:?}; curr: {}; ignore when ({})", vote, @@ -1395,6 +1393,28 @@ impl, S: RaftStorage> RaftCore, + msg: impl Display + Copy, + ) -> bool { + if !self.does_vote_match(&session_id.vote, msg) { + return false; + } + + if session_id.membership_log_id != self.engine.state.membership_state.effective.log_id { + tracing::warn!( + "membership_log_id changed: msg sent by: {:?}; curr: {}; ignore when ({})", + session_id.membership_log_id, + self.engine.state.membership_state.effective.log_id.summary(), + msg + ); + return false; + } + true + } /// A replication streams requesting for snapshot info. /// From d8d0892845eb578a184ec5af9a2ef91168d71567 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 6 Dec 2022 23:47:11 +0800 Subject: [PATCH 2/5] Refactor: no need to wait for building a snapshot when lacking a log When `ReplicationCore` asks for a snapshot for replication, `RaftCore` could just gives it the last built one. There is never need to rebuild one. Because a log won't be purged until a snapshot including it is built. --- openraft/src/core/raft_core.rs | 66 +++++------------------------ openraft/src/core/snapshot_state.rs | 4 -- 2 files changed, 11 insertions(+), 59 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 5f5385aa8..9093ffac7 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -15,7 +15,6 @@ use futures::StreamExt; use futures::TryFutureExt; use maplit::btreeset; use pin_utils::pin_mut; -use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; @@ -81,7 +80,6 @@ use crate::replication::ReplicationSessionId; use crate::replication::ReplicationStream; use crate::runtime::RaftRuntime; use crate::storage::RaftSnapshotBuilder; -use crate::storage::Snapshot; use crate::storage::StorageHelper; use crate::versioned::Updatable; use crate::versioned::Versioned; @@ -765,13 +763,9 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore match res { Ok(snapshot) => { let _ = tx_api.send(RaftMsg::BuildingSnapshotResult { - result: SnapshotResult::Ok(snapshot.meta.clone()), + result: SnapshotResult::Ok(snapshot.meta), }); - // This will always succeed. - let _ = chan_tx.send(snapshot.meta.last_log_id); } Err(err) => { tracing::error!({error=%err}, "error while generating snapshot"); @@ -1309,10 +1301,17 @@ impl, S: RaftStorage> RaftCore { + // TODO check session_id if self.does_vote_match(&vote, "NeedsSnapshot") { - self.handle_needs_snapshot(tx).await?; + let snapshot = self.storage.get_current_snapshot().await?; + + if let Some(snapshot) = snapshot { + let _ = tx.send(snapshot); + return Ok(()); + } + + unreachable!("A log is lacking, which means a snapshot is already built"); } } RaftMsg::ReplicationFatal => { @@ -1415,49 +1414,6 @@ impl, S: RaftStorage> RaftCore>, - ) -> Result<(), StorageError> { - // Check for existence of current snapshot. - let current_snapshot_opt = self.storage.get_current_snapshot().await?; - - if let Some(snapshot) = current_snapshot_opt { - let _ = tx.send(snapshot); - return Ok(()); - } - - // Check if snapshot creation is already in progress. If so, we spawn a task to await its - // completion (or cancellation), and respond to the replication stream. The repl stream - // will wait for the completion and will then send another request to fetch the finished snapshot. - // Else we just drop any other state and continue. Leaders never enter `Streaming` state. - if let SnapshotState::Snapshotting { sender, .. } = &self.snapshot_state { - let mut chan = sender.subscribe(); - tokio::spawn( - async move { - let _ = chan.recv().await; - // TODO(xp): send another ReplicaEvent::NeedSnapshot to raft core - drop(tx); - } - .instrument(tracing::debug_span!("spawn-recv-and-drop")), - ); - return Ok(()); - } - - // At this point, we just attempt to request a snapshot. Under normal circumstances, the - // leader will always be keeping up-to-date with its snapshotting, and the latest snapshot - // will always be found and this block will never even be executed. - // - // If this block is executed, and a snapshot is needed, the repl stream will submit another - // request here shortly, and will hit the above logic where it will await the snapshot completion. - self.trigger_snapshot_if_needed(false).await; - Ok(()) - } } #[async_trait::async_trait] diff --git a/openraft/src/core/snapshot_state.rs b/openraft/src/core/snapshot_state.rs index c8c05d560..7489412f7 100644 --- a/openraft/src/core/snapshot_state.rs +++ b/openraft/src/core/snapshot_state.rs @@ -1,8 +1,6 @@ use futures::future::AbortHandle; -use tokio::sync::broadcast; use crate::core::streaming_state::StreamingState; -use crate::LogId; use crate::Node; use crate::NodeId; use crate::RaftTypeConfig; @@ -17,8 +15,6 @@ pub(crate) enum SnapshotState { Snapshotting { /// A handle to abort the compaction process early if needed. abort_handle: AbortHandle, - /// A sender for notifying any other tasks of the completion of this compaction. - sender: broadcast::Sender>>, }, /// The Raft node is streaming in a snapshot from the leader. Streaming(StreamingState), From 1b0aee73af3d0b5ba0b8f31918b775be5605d619 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 7 Dec 2022 09:16:20 +0800 Subject: [PATCH 3/5] Refactor: use the last snapshot when ReplicationCore asks When a ReplicationCore asks RaftCore for a snapshot, RaftCore just sends back the last built snapshot, it does not have to build a new one. ReplciationCore guarantees it asks for a snapshot only when the last snapshot fulfill its need: i.e., when it tries to replicate a purged log(which must be included in the last snapshot). --- openraft/src/core/raft_core.rs | 9 ++-- openraft/src/raft.rs | 8 ++-- openraft/src/replication/mod.rs | 80 ++++++++++----------------------- 3 files changed, 34 insertions(+), 63 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 9093ffac7..3f1cf03c0 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1301,9 +1301,12 @@ impl, S: RaftStorage> RaftCore { - // TODO check session_id - if self.does_vote_match(&vote, "NeedsSnapshot") { + RaftMsg::NeedsSnapshot { + target: _, + tx, + session_id, + } => { + if self.does_replication_session_match(&session_id, "NeedsSnapshot") { let snapshot = self.storage.get_current_snapshot().await?; if let Some(snapshot) = snapshot { diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index db9c803f1..5a66f8ef3 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -956,8 +956,8 @@ pub(crate) enum RaftMsg, S: RaftStor /// The response channel for delivering the snapshot data. tx: oneshot::Sender>, - /// Which ServerState sent this message - vote: Vote, + /// Which replication session sent this message + session_id: ReplicationSessionId, }, /// Some critical error has taken place, and Raft needs to shutdown. @@ -1040,7 +1040,9 @@ where ) } RaftMsg::NeedsSnapshot { - ref target, ref vote, .. + ref target, + session_id: ref vote, + .. } => { format!("NeedsSnapshot: target: {}, server_state_vote: {}", target, vote) } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 39a665ba0..aae2ba2cf 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -445,6 +445,7 @@ impl, S: RaftStorage> Replication /// snapshot is warranted. #[tracing::instrument(level = "trace", skip(self))] pub(self) fn needs_snapshot(&self) -> bool { + // TODO needs_snapshot threshold should be greater than build-snapshot threshold match &self.config.snapshot_policy { SnapshotPolicy::LogsSinceLast(threshold) => { let c = self.committed.next_index(); @@ -620,68 +621,33 @@ impl, S: RaftStorage> Replication Ok(()) } - /// Wait for a response from the storage layer for the current snapshot. - /// - /// If an error comes up during processing, this routine should simple be called again after - /// issuing a new request to the storage layer. + /// Ask RaftCore for a snapshot #[tracing::instrument(level = "debug", skip(self))] async fn wait_for_snapshot( &mut self, ) -> Result, ReplicationError> { // Ask raft core for a snapshot. - // - If raft core has a ready snapshot, it sends back through tx. - // - Otherwise raft core starts a new task taking snapshot, and **close** `tx` when finished. Thus there has to - // be a loop. - - loop { - // channel to communicate with raft-core - let (tx, mut rx) = oneshot::channel(); - - // TODO(xp): handle sending error. If channel is closed, quite replication by returning - // ReplicationError::Closed. - let _ = self.raft_core_tx.send(RaftMsg::NeedsSnapshot { - target: self.target, - tx, - vote: self.session_id.vote, - }); - - let mut waiting_for_snapshot = true; - - // TODO(xp): use a watch channel to let the core to send one of the 3 event: - // heartbeat, new-log, or snapshot is ready. - while waiting_for_snapshot { - tokio::select! { - - event_opt = self.repl_rx.recv() => { - match event_opt { - Some(event) => { - self.process_raft_event(event); - self.try_drain_raft_rx().await?; - }, - None => { - tracing::info!("repl_rx is closed"); - return Err(ReplicationError::Closed); - } - } - }, - - res = &mut rx => { - match res { - Ok(snapshot) => { - return Ok(snapshot); - } - Err(_) => { - // TODO(xp): This channel is closed to notify an in progress snapshotting is completed. - // Start a new round to get the snapshot. - - tracing::info!("rx for waiting for snapshot is closed, may be snapshot is ready. re-send need-snapshot."); - waiting_for_snapshot = false; - }, - } - }, - } - } - } + // + // RaftCore must have a ready snapshot: + // When `ReplicationCore` asks for a snapshot for replication, `RaftCore` + // could just gives it the last built one. There is never need to rebuild + // one. Because a log won't be purged until a snapshot including it is + // built. + + let (tx, mut rx) = oneshot::channel(); + + let _ = self.raft_core_tx.send(RaftMsg::NeedsSnapshot { + target: self.target, + tx, + session_id: self.session_id, + }); + + let snapshot = rx.await.map_err(|e| { + tracing::info!("error waiting for snapshot: {}", e); + ReplicationError::Closed + })?; + + Ok(snapshot) } #[tracing::instrument(level = "trace", skip(self, snapshot))] From bc1bfdf8bfd298e85e838854efdfc06e9836e82b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 7 Dec 2022 10:32:11 +0800 Subject: [PATCH 4/5] Refactor: ask for a snapshot for replication only when the last snapshot can fix replication lagging `replication_lag_threshold >= config.SnapshotPolicy.LogsSinceLast` should be held; Asking for a snapshot only when a follower behind the `committed` index by `replication_lag_threshold`. --- openraft/src/config/config.rs | 9 ++++-- openraft/src/config/config_test.rs | 10 +++--- openraft/src/replication/mod.rs | 32 ++++++++++++-------- openraft/tests/membership/t10_add_learner.rs | 13 ++++---- 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/openraft/src/config/config.rs b/openraft/src/config/config.rs index bfcb0775a..dc2eba4ad 100644 --- a/openraft/src/config/config.rs +++ b/openraft/src/config/config.rs @@ -121,9 +121,12 @@ pub struct Config { /// The distance behind in log replication a follower must fall before it is considered lagging /// - /// Once a replication stream transition into line-rate state, the target node will be considered safe to join a - /// cluster. - #[clap(long, default_value = "1000")] + /// A follower falls behind this index are replicated with snapshot. + /// A follower falls within this index are replicated with log entries. + /// + /// This value should be greater than snapshot_policy.SnapshotPolicy.LogsSinceLast, otherwise transmitting a + /// snapshot may not fix the lagging. + #[clap(long, default_value = "5000")] pub replication_lag_threshold: u64, /// The snapshot policy to use for a Raft node. diff --git a/openraft/src/config/config_test.rs b/openraft/src/config/config_test.rs index 57a8fbaf0..3e3207627 100644 --- a/openraft/src/config/config_test.rs +++ b/openraft/src/config/config_test.rs @@ -13,7 +13,7 @@ fn test_config_defaults() { assert_eq!(50, cfg.heartbeat_interval); assert_eq!(300, cfg.max_payload_entries); - assert_eq!(1000, cfg.replication_lag_threshold); + assert_eq!(5000, cfg.replication_lag_threshold); assert_eq!(3 * 1024 * 1024, cfg.snapshot_max_chunk_size); assert_eq!(SnapshotPolicy::LogsSinceLast(5000), cfg.snapshot_policy); @@ -57,8 +57,8 @@ fn test_build() -> anyhow::Result<()> { "--send-snapshot-timeout=199", "--install-snapshot-timeout=200", "--max-payload-entries=201", - "--replication-lag-threshold=202", - "--snapshot-policy=since_last:203", + "--snapshot-policy=since_last:202", + "--replication-lag-threshold=203", "--snapshot-max-chunk-size=204", "--max-in-snapshot-log-to-keep=205", "--purge-batch-size=207", @@ -71,8 +71,8 @@ fn test_build() -> anyhow::Result<()> { assert_eq!(199, config.send_snapshot_timeout); assert_eq!(200, config.install_snapshot_timeout); assert_eq!(201, config.max_payload_entries); - assert_eq!(202, config.replication_lag_threshold); - assert_eq!(SnapshotPolicy::LogsSinceLast(203), config.snapshot_policy); + assert_eq!(SnapshotPolicy::LogsSinceLast(202), config.snapshot_policy); + assert_eq!(203, config.replication_lag_threshold); assert_eq!(204, config.snapshot_max_chunk_size); assert_eq!(205, config.max_in_snapshot_log_to_keep); assert_eq!(207, config.purge_batch_size); diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index aae2ba2cf..31682eddf 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -17,7 +17,6 @@ use tokio::time::Duration; use tracing_futures::Instrument; use crate::config::Config; -use crate::config::SnapshotPolicy; use crate::error::AppendEntriesError; use crate::error::CommittedAdvanceTooMany; use crate::error::HigherVote; @@ -44,6 +43,7 @@ use crate::RaftNetwork; use crate::RaftNetworkFactory; use crate::RaftStorage; use crate::RaftTypeConfig; +use crate::SnapshotPolicy; use crate::ToStorageResult; /// The handle to a spawned replication stream. @@ -445,18 +445,26 @@ impl, S: RaftStorage> Replication /// snapshot is warranted. #[tracing::instrument(level = "trace", skip(self))] pub(self) fn needs_snapshot(&self) -> bool { - // TODO needs_snapshot threshold should be greater than build-snapshot threshold - match &self.config.snapshot_policy { - SnapshotPolicy::LogsSinceLast(threshold) => { - let c = self.committed.next_index(); - let m = self.matched.next_index(); + let c = self.committed.next_index(); + let m = self.matched.next_index(); + let distance = c.saturating_sub(m); - let needs_snap = c.saturating_sub(m) >= *threshold; + #[allow(clippy::infallible_destructuring_match)] + let snapshot_threshold = match self.config.snapshot_policy { + SnapshotPolicy::LogsSinceLast(n) => n, + }; - tracing::trace!("snapshot needed: {}", needs_snap); - needs_snap - } - } + let lagging_threshold = self.config.replication_lag_threshold; + let needs_snap = distance >= lagging_threshold && distance > snapshot_threshold; + + tracing::trace!( + "snapshot needed: {}, distance:{}; lagging_threshold:{}; snapshot_threshold:{}", + needs_snap, + distance, + lagging_threshold, + snapshot_threshold + ); + needs_snap } #[tracing::instrument(level = "trace", skip(self))] @@ -634,7 +642,7 @@ impl, S: RaftStorage> Replication // one. Because a log won't be purged until a snapshot including it is // built. - let (tx, mut rx) = oneshot::channel(); + let (tx, rx) = oneshot::channel(); let _ = self.raft_core_tx.send(RaftMsg::NeedsSnapshot { target: self.target, diff --git a/openraft/tests/membership/t10_add_learner.rs b/openraft/tests/membership/t10_add_learner.rs index 7a2f75189..c00f45ea5 100644 --- a/openraft/tests/membership/t10_add_learner.rs +++ b/openraft/tests/membership/t10_add_learner.rs @@ -43,17 +43,18 @@ async fn add_learner_basic() -> Result<()> { tracing::info!("--- add new node node-1"); { tracing::info!("--- write up to 1000 logs"); + { + router.client_request_many(0, "learner_add", 1000 - log_index as usize).await?; + log_index = 1000; - router.client_request_many(0, "learner_add", 1000 - log_index as usize).await?; - log_index = 1000; - - tracing::info!("--- write up to 1000 logs done"); - - router.wait_for_log(&btreeset! {0}, Some(log_index), timeout(), "write 1000 logs to leader").await?; + tracing::info!("--- write up to 1000 logs done"); + router.wait_for_log(&btreeset! {0}, Some(log_index), timeout(), "write 1000 logs to leader").await?; + } router.new_raft_node(1); router.add_learner(0, 1).await?; log_index += 1; + router.wait_for_log(&btreeset! {0,1}, Some(log_index), timeout(), "add learner").await?; tracing::info!("--- add_learner blocks until the replication catches up"); From d735280aba0f18f2fd1bd0f8b140239c17a79870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 7 Dec 2022 12:17:30 +0800 Subject: [PATCH 5/5] Refactor: rename ReplicationCore fields --- openraft/src/core/raft_core.rs | 14 ++++---- openraft/src/replication/mod.rs | 62 ++++++++++++++++++--------------- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 3f1cf03c0..dc744abfc 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -76,8 +76,8 @@ use crate::raft_types::LogIdOptionExt; use crate::raft_types::RaftLogId; use crate::replication::Replicate; use crate::replication::ReplicationCore; +use crate::replication::ReplicationHandle; use crate::replication::ReplicationSessionId; -use crate::replication::ReplicationStream; use crate::runtime::RaftRuntime; use crate::storage::RaftSnapshotBuilder; use crate::storage::StorageHelper; @@ -109,7 +109,7 @@ pub(crate) struct LeaderData { /// A mapping of node IDs the replication state of the target node. // TODO(xp): make it a field of RaftCore. it does not have to belong to leader. // It requires the Engine to emit correct add/remove replication commands - pub(super) nodes: BTreeMap>, + pub(super) nodes: BTreeMap>, /// The metrics of all replication streams pub(crate) replication_metrics: Versioned>, @@ -930,7 +930,7 @@ impl, S: RaftStorage> RaftCore, - ) -> Result, N::ConnectionError> { + ) -> Result, N::ConnectionError> { // Safe unwrap(): target must be in membership let target_node = self.engine.state.membership_state.effective.get_node(&target).unwrap(); @@ -966,10 +966,10 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftRuntime Command::ReplicateCommitted { committed } => { if let Some(l) = &self.leader_data { for node in l.nodes.values() { - let _ = node.repl_tx.send(Replicate::Committed(*committed)); + let _ = node.tx_repl.send(Replicate::Committed(*committed)); } } else { unreachable!("it has to be a leader!!!"); @@ -1509,7 +1509,7 @@ impl, S: RaftStorage> RaftRuntime Command::ReplicateEntries { upto } => { if let Some(l) = &self.leader_data { for node in l.nodes.values() { - let _ = node.repl_tx.send(Replicate::Entries(*upto)); + let _ = node.tx_repl.send(Replicate::Entries(*upto)); } } else { unreachable!("it has to be a leader!!!"); diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 31682eddf..be1ba34b3 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -47,12 +47,12 @@ use crate::SnapshotPolicy; use crate::ToStorageResult; /// The handle to a spawned replication stream. -pub(crate) struct ReplicationStream { +pub(crate) struct ReplicationHandle { /// The spawn handle the `ReplicationCore` task. - pub handle: JoinHandle<()>, + pub(crate) join_handle: JoinHandle<()>, /// The channel used for communicating with the replication task. - pub repl_tx: mpsc::UnboundedSender>, + pub(crate) tx_repl: mpsc::UnboundedSender>, } /// A task responsible for sending replication events to a target follower in the Raft cluster. @@ -67,12 +67,12 @@ pub(crate) struct ReplicationCore, S /// Identifies which session this replication belongs to. session_id: ReplicationSessionId, - /// A channel for sending events to the Raft node. + /// A channel for sending events to the RaftCore. #[allow(clippy::type_complexity)] - raft_core_tx: mpsc::UnboundedSender>, + tx_raft_core: mpsc::UnboundedSender>, - /// A channel for receiving events from the Raft node. - repl_rx: mpsc::UnboundedReceiver>, + /// A channel for receiving events from the RaftCore. + rx_repl: mpsc::UnboundedReceiver>, /// The `RaftNetwork` interface. network: N::Network, @@ -107,7 +107,7 @@ pub(crate) struct ReplicationCore, S impl, S: RaftStorage> ReplicationCore { /// Spawn a new replication task for the target node. - #[tracing::instrument(level = "trace", skip(config, network, log_reader, raft_core_tx))] + #[tracing::instrument(level = "trace", skip_all,fields(target=display(target), session_id=display(session_id)))] #[allow(clippy::type_complexity)] #[allow(clippy::too_many_arguments)] pub(crate) fn spawn( @@ -118,11 +118,18 @@ impl, S: RaftStorage> Replication progress_entry: ProgressEntry, network: N::Network, log_reader: S::LogReader, - raft_core_tx: mpsc::UnboundedSender>, + tx_raft_core: mpsc::UnboundedSender>, span: tracing::Span, - ) -> ReplicationStream { + ) -> ReplicationHandle { + tracing::debug!( + session_id = display(&session_id), + target = display(&target), + committed = display(committed.summary()), + progress_entry = debug(&progress_entry), + "spawn replication" + ); // other component to ReplicationStream - let (repl_tx, repl_rx) = mpsc::unbounded_channel(); + let (tx_repl, rx_repl) = mpsc::unbounded_channel(); let this = Self { target, @@ -134,14 +141,14 @@ impl, S: RaftStorage> Replication committed, matched: progress_entry.matching, max_possible_matched_index: progress_entry.max_possible_matching(), - raft_core_tx, - repl_rx, + tx_raft_core, + rx_repl, need_to_replicate: true, }; - let handle = tokio::spawn(this.main().instrument(span)); + let join_handle = tokio::spawn(this.main().instrument(span)); - ReplicationStream { handle, repl_tx } + ReplicationHandle { join_handle, tx_repl } } #[tracing::instrument(level="debug", skip(self), fields(session=%self.session_id, target=display(self.target), cluster=%self.config.cluster_name))] @@ -168,7 +175,7 @@ impl, S: RaftStorage> Replication return; } ReplicationError::HigherVote(h) => { - let _ = self.raft_core_tx.send(RaftMsg::HigherVote { + let _ = self.tx_raft_core.send(RaftMsg::HigherVote { target: self.target, higher: h.higher, vote: self.session_id.vote, @@ -183,7 +190,7 @@ impl, S: RaftStorage> Replication } ReplicationError::StorageError(_err) => { // TODO: report this error - let _ = self.raft_core_tx.send(RaftMsg::ReplicationFatal); + let _ = self.tx_raft_core.send(RaftMsg::ReplicationFatal); return; } ReplicationError::NodeNotFound(err) => { @@ -314,7 +321,7 @@ impl, S: RaftStorage> Replication let repl_err = match err { RPCError::NodeNotFound(e) => ReplicationError::NodeNotFound(e), RPCError::Timeout(e) => { - let _ = self.raft_core_tx.send(RaftMsg::UpdateReplicationMatched { + let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched { target: self.target, result: Err(e.to_string()), session_id: self.session_id, @@ -322,7 +329,7 @@ impl, S: RaftStorage> Replication ReplicationError::Timeout(e) } RPCError::Network(e) => { - let _ = self.raft_core_tx.send(RaftMsg::UpdateReplicationMatched { + let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched { target: self.target, result: Err(e.to_string()), session_id: self.session_id, @@ -337,7 +344,7 @@ impl, S: RaftStorage> Replication Err(timeout_err) => { tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target"); - let _ = self.raft_core_tx.send(RaftMsg::UpdateReplicationMatched { + let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched { target: self.target, result: Err(timeout_err.to_string()), session_id: self.session_id, @@ -431,7 +438,7 @@ impl, S: RaftStorage> Replication tracing::debug!(target=%self.target, matched=?self.matched, "matched updated"); - let _ = self.raft_core_tx.send(RaftMsg::UpdateReplicationMatched { + let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched { target: self.target, // `self.matched < new_matched` implies new_matched can not be None. // Thus unwrap is safe. @@ -472,7 +479,7 @@ impl, S: RaftStorage> Replication tracing::debug!("try_drain_raft_rx"); for _i in 0..self.config.max_payload_entries { - let event_or_nothing = self.repl_rx.recv().now_or_never(); + let event_or_nothing = self.rx_repl.recv().now_or_never(); let ev_opt = match event_or_nothing { None => { // no event in self.repl_rx @@ -607,7 +614,7 @@ impl, S: RaftStorage> Replication continue; } - let event_or_none = self.repl_rx.recv().await; + let event_or_none = self.rx_repl.recv().await; match event_or_none { Some(event) => { self.process_raft_event(event); @@ -644,7 +651,7 @@ impl, S: RaftStorage> Replication let (tx, rx) = oneshot::channel(); - let _ = self.raft_core_tx.send(RaftMsg::NeedsSnapshot { + let _ = self.tx_raft_core.send(RaftMsg::NeedsSnapshot { target: self.target, tx, session_id: self.session_id, @@ -665,19 +672,16 @@ impl, S: RaftStorage> Replication ) -> Result<(), ReplicationError> { let err_x = || (ErrorSubject::Snapshot(snapshot.meta.signature()), ErrorVerb::Read); - let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(err_x)?; - let mut offset = 0; - + let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(err_x)?; let mut buf = Vec::with_capacity(self.config.snapshot_max_chunk_size as usize); loop { // Build the RPC. snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(err_x)?; - let n_read = snapshot.snapshot.read_buf(&mut buf).await.sto_res(err_x)?; - let done = (offset + n_read as u64) == end; // If bytes read == 0, then we're done. + let done = (offset + n_read as u64) == end; let req = InstallSnapshotRequest { vote: self.session_id.vote, meta: snapshot.meta.clone(),