Skip to content

Commit

Permalink
Merge pull request #613 from drmingdrmer/50-main
Browse files Browse the repository at this point in the history
simplify snapshot replication
  • Loading branch information
drmingdrmer committed Dec 7, 2022
2 parents 42e2ff3 + d735280 commit 89c270d
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 184 deletions.
9 changes: 6 additions & 3 deletions openraft/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,12 @@ pub struct Config {

/// The distance behind in log replication a follower must fall before it is considered lagging
///
/// Once a replication stream transition into line-rate state, the target node will be considered safe to join a
/// cluster.
#[clap(long, default_value = "1000")]
/// A follower falls behind this index are replicated with snapshot.
/// A follower falls within this index are replicated with log entries.
///
/// This value should be greater than snapshot_policy.SnapshotPolicy.LogsSinceLast, otherwise transmitting a
/// snapshot may not fix the lagging.
#[clap(long, default_value = "5000")]
pub replication_lag_threshold: u64,

/// The snapshot policy to use for a Raft node.
Expand Down
10 changes: 5 additions & 5 deletions openraft/src/config/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn test_config_defaults() {

assert_eq!(50, cfg.heartbeat_interval);
assert_eq!(300, cfg.max_payload_entries);
assert_eq!(1000, cfg.replication_lag_threshold);
assert_eq!(5000, cfg.replication_lag_threshold);

assert_eq!(3 * 1024 * 1024, cfg.snapshot_max_chunk_size);
assert_eq!(SnapshotPolicy::LogsSinceLast(5000), cfg.snapshot_policy);
Expand Down Expand Up @@ -57,8 +57,8 @@ fn test_build() -> anyhow::Result<()> {
"--send-snapshot-timeout=199",
"--install-snapshot-timeout=200",
"--max-payload-entries=201",
"--replication-lag-threshold=202",
"--snapshot-policy=since_last:203",
"--snapshot-policy=since_last:202",
"--replication-lag-threshold=203",
"--snapshot-max-chunk-size=204",
"--max-in-snapshot-log-to-keep=205",
"--purge-batch-size=207",
Expand All @@ -71,8 +71,8 @@ fn test_build() -> anyhow::Result<()> {
assert_eq!(199, config.send_snapshot_timeout);
assert_eq!(200, config.install_snapshot_timeout);
assert_eq!(201, config.max_payload_entries);
assert_eq!(202, config.replication_lag_threshold);
assert_eq!(SnapshotPolicy::LogsSinceLast(203), config.snapshot_policy);
assert_eq!(SnapshotPolicy::LogsSinceLast(202), config.snapshot_policy);
assert_eq!(203, config.replication_lag_threshold);
assert_eq!(204, config.snapshot_max_chunk_size);
assert_eq!(205, config.max_in_snapshot_log_to_keep);
assert_eq!(207, config.purge_batch_size);
Expand Down
115 changes: 47 additions & 68 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures::StreamExt;
use futures::TryFutureExt;
use maplit::btreeset;
use pin_utils::pin_mut;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
Expand Down Expand Up @@ -77,11 +76,10 @@ use crate::raft_types::LogIdOptionExt;
use crate::raft_types::RaftLogId;
use crate::replication::Replicate;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationHandle;
use crate::replication::ReplicationSessionId;
use crate::replication::ReplicationStream;
use crate::runtime::RaftRuntime;
use crate::storage::RaftSnapshotBuilder;
use crate::storage::Snapshot;
use crate::storage::StorageHelper;
use crate::versioned::Updatable;
use crate::versioned::Versioned;
Expand Down Expand Up @@ -111,7 +109,7 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
/// A mapping of node IDs the replication state of the target node.
// TODO(xp): make it a field of RaftCore. it does not have to belong to leader.
// It requires the Engine to emit correct add/remove replication commands
pub(super) nodes: BTreeMap<C::NodeId, ReplicationStream<C::NodeId>>,
pub(super) nodes: BTreeMap<C::NodeId, ReplicationHandle<C::NodeId>>,

/// The metrics of all replication streams
pub(crate) replication_metrics: Versioned<ReplicationMetrics<C::NodeId>>,
Expand Down Expand Up @@ -765,13 +763,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// At this point, we are clear to begin a new compaction process.
let mut builder = self.storage.get_snapshot_builder().await;
let (abort_handle, reg) = AbortHandle::new_pair();
let (chan_tx, _) = broadcast::channel(1);
let tx_api = self.tx_api.clone();

self.snapshot_state = SnapshotState::Snapshotting {
abort_handle,
sender: chan_tx.clone(),
};
self.snapshot_state = SnapshotState::Snapshotting { abort_handle };

tokio::spawn(
async move {
Expand All @@ -781,10 +775,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(res) => match res {
Ok(snapshot) => {
let _ = tx_api.send(RaftMsg::BuildingSnapshotResult {
result: SnapshotResult::Ok(snapshot.meta.clone()),
result: SnapshotResult::Ok(snapshot.meta),
});
// This will always succeed.
let _ = chan_tx.send(snapshot.meta.last_log_id);
}
Err(err) => {
tracing::error!({error=%err}, "error while generating snapshot");
Expand Down Expand Up @@ -938,7 +930,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
&mut self,
target: C::NodeId,
progress_entry: ProgressEntry<C::NodeId>,
) -> Result<ReplicationStream<C::NodeId>, N::ConnectionError> {
) -> Result<ReplicationHandle<C::NodeId>, N::ConnectionError> {
// Safe unwrap(): target must be in membership
let target_node = self.engine.state.membership_state.effective.get_node(&target).unwrap();

Expand Down Expand Up @@ -974,10 +966,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
);

for (target, s) in nodes {
let handle = s.handle;
let handle = s.join_handle;

// Drop sender to notify the task to shutdown
drop(s.repl_tx);
drop(s.tx_repl);

tracing::debug!("joining removed replication: {}", target);
let _x = handle.await;
Expand Down Expand Up @@ -1161,7 +1153,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let _ = tx.send(self.handle_vote_request(rpc).await.extract_fatal()?);
}
RaftMsg::VoteResponse { target, resp, vote } => {
if self.does_vote_match(vote, "VoteResponse") {
if self.does_vote_match(&vote, "VoteResponse") {
self.handle_vote_resp(resp, target).await?;
}
}
Expand Down Expand Up @@ -1291,7 +1283,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
higher,
vote,
} => {
if self.does_vote_match(vote, "HigherVote") {
if self.does_vote_match(&vote, "HigherVote") {
// Rejected vote change is ok.
let _ = self.engine.handle_vote_change(&higher);
self.run_engine_commands::<Entry<C>>(&[]).await?;
Expand All @@ -1303,18 +1295,26 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
result,
session_id,
} => {
if self.does_vote_match(session_id.vote, "UpdateReplicationMatched") {
// If membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if session_id.membership_log_id == self.engine.state.membership_state.effective.log_id {
self.handle_update_matched(target, result).await?;
}
// If vote or membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") {
self.handle_update_matched(target, result).await?;
}
}
RaftMsg::NeedsSnapshot {
target: _,
tx,
session_id,
} => {
if self.does_replication_session_match(&session_id, "NeedsSnapshot") {
let snapshot = self.storage.get_current_snapshot().await?;

RaftMsg::NeedsSnapshot { target: _, tx, vote } => {
if self.does_vote_match(vote, "NeedsSnapshot") {
self.handle_needs_snapshot(tx).await?;
if let Some(snapshot) = snapshot {
let _ = tx.send(snapshot);
return Ok(());
}

unreachable!("A log is lacking, which means a snapshot is already built");
}
}
RaftMsg::ReplicationFatal => {
Expand Down Expand Up @@ -1382,8 +1382,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

/// If a message is sent by a previous server state but is received by current server state,
/// it is a stale message and should be just ignored.
fn does_vote_match(&self, vote: Vote<C::NodeId>, msg: impl Display) -> bool {
if vote != self.engine.state.vote {
fn does_vote_match(&self, vote: &Vote<C::NodeId>, msg: impl Display) -> bool {
if vote != &self.engine.state.vote {
tracing::warn!(
"vote changed: msg sent by: {:?}; curr: {}; ignore when ({})",
vote,
Expand All @@ -1395,48 +1395,27 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
true
}
}

/// A replication streams requesting for snapshot info.
///
/// The snapshot has to include `must_include`.
#[tracing::instrument(level = "debug", skip(self, tx))]
async fn handle_needs_snapshot(
&mut self,
tx: oneshot::Sender<Snapshot<C::NodeId, C::Node, S::SnapshotData>>,
) -> Result<(), StorageError<C::NodeId>> {
// Check for existence of current snapshot.
let current_snapshot_opt = self.storage.get_current_snapshot().await?;

if let Some(snapshot) = current_snapshot_opt {
let _ = tx.send(snapshot);
return Ok(());
/// If a message is sent by a previous replication session but is received by current server state,
/// it is a stale message and should be just ignored.
fn does_replication_session_match(
&self,
session_id: &ReplicationSessionId<C::NodeId>,
msg: impl Display + Copy,
) -> bool {
if !self.does_vote_match(&session_id.vote, msg) {
return false;
}

// Check if snapshot creation is already in progress. If so, we spawn a task to await its
// completion (or cancellation), and respond to the replication stream. The repl stream
// will wait for the completion and will then send another request to fetch the finished snapshot.
// Else we just drop any other state and continue. Leaders never enter `Streaming` state.
if let SnapshotState::Snapshotting { sender, .. } = &self.snapshot_state {
let mut chan = sender.subscribe();
tokio::spawn(
async move {
let _ = chan.recv().await;
// TODO(xp): send another ReplicaEvent::NeedSnapshot to raft core
drop(tx);
}
.instrument(tracing::debug_span!("spawn-recv-and-drop")),
if session_id.membership_log_id != self.engine.state.membership_state.effective.log_id {
tracing::warn!(
"membership_log_id changed: msg sent by: {:?}; curr: {}; ignore when ({})",
session_id.membership_log_id,
self.engine.state.membership_state.effective.log_id.summary(),
msg
);
return Ok(());
return false;
}

// At this point, we just attempt to request a snapshot. Under normal circumstances, the
// leader will always be keeping up-to-date with its snapshotting, and the latest snapshot
// will always be found and this block will never even be executed.
//
// If this block is executed, and a snapshot is needed, the repl stream will submit another
// request here shortly, and will hit the above logic where it will await the snapshot completion.
self.trigger_snapshot_if_needed(false).await;
Ok(())
true
}
}

Expand Down Expand Up @@ -1509,7 +1488,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::ReplicateCommitted { committed } => {
if let Some(l) = &self.leader_data {
for node in l.nodes.values() {
let _ = node.repl_tx.send(Replicate::Committed(*committed));
let _ = node.tx_repl.send(Replicate::Committed(*committed));
}
} else {
unreachable!("it has to be a leader!!!");
Expand All @@ -1530,7 +1509,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::ReplicateEntries { upto } => {
if let Some(l) = &self.leader_data {
for node in l.nodes.values() {
let _ = node.repl_tx.send(Replicate::Entries(*upto));
let _ = node.tx_repl.send(Replicate::Entries(*upto));
}
} else {
unreachable!("it has to be a leader!!!");
Expand Down
4 changes: 0 additions & 4 deletions openraft/src/core/snapshot_state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use futures::future::AbortHandle;
use tokio::sync::broadcast;

use crate::core::streaming_state::StreamingState;
use crate::LogId;
use crate::Node;
use crate::NodeId;
use crate::RaftTypeConfig;
Expand All @@ -17,8 +15,6 @@ pub(crate) enum SnapshotState<C: RaftTypeConfig, SD> {
Snapshotting {
/// A handle to abort the compaction process early if needed.
abort_handle: AbortHandle,
/// A sender for notifying any other tasks of the completion of this compaction.
sender: broadcast::Sender<Option<LogId<C::NodeId>>>,
},
/// The Raft node is streaming in a snapshot from the leader.
Streaming(StreamingState<C, SD>),
Expand Down
8 changes: 5 additions & 3 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,8 +956,8 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
/// The response channel for delivering the snapshot data.
tx: oneshot::Sender<Snapshot<C::NodeId, C::Node, S::SnapshotData>>,

/// Which ServerState sent this message
vote: Vote<C::NodeId>,
/// Which replication session sent this message
session_id: ReplicationSessionId<C::NodeId>,
},

/// Some critical error has taken place, and Raft needs to shutdown.
Expand Down Expand Up @@ -1040,7 +1040,9 @@ where
)
}
RaftMsg::NeedsSnapshot {
ref target, ref vote, ..
ref target,
session_id: ref vote,
..
} => {
format!("NeedsSnapshot: target: {}, server_state_vote: {}", target, vote)
}
Expand Down
Loading

0 comments on commit 89c270d

Please sign in to comment.