Skip to content

Commit

Permalink
Refactor: Remove repliation algorithm from ReplicationCore
Browse files Browse the repository at this point in the history
Let Engine totally decide what to do, ReplicationCore becomes a passive executor.
Engine emit a replication request and let ReplicationCore execute it.
But it is not an exact one request one reply model, e.g., Engine does not expect the reply for a broadcast-committed request.

Engine keeps the track of every inflight replication request, and update
the `inflight` state when a reply is received from ReplicationCore.
A new type `Inflight` is introduced to manage the state of inflight
replication data, e.g., a series of log entries or a snapshot.

ReplicationCore does not need to ask for RaftCore for snapshot anymore.
But instead, if snapshot replication is needed, Engine will just send a
snapshot to ReplicationCore.

Introduce `ReplicationHandler` and `LogHandler` as a sub set function
container of Engine, in order to make `Engine` not that big.

Add more `Validate` implementations, to assert internal consistency.

Add ProgressEntry::next_send() to calculate what entries/snapshot to send

Remove ProgressEntry.mid; It can be calculated when needed.

- Fix: datafuselabs#650
  • Loading branch information
drmingdrmer committed Jan 22, 2023
1 parent c3763de commit fa5f31c
Show file tree
Hide file tree
Showing 28 changed files with 1,379 additions and 699 deletions.
2 changes: 1 addition & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "debug", skip_all)]
async fn purge_logs_upto(&mut self, log_id: LogId<MemNodeId>) -> Result<(), StorageError<MemNodeId>> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

Expand Down
88 changes: 51 additions & 37 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use futures::StreamExt;
use futures::TryFutureExt;
use maplit::btreeset;
use pin_utils::pin_mut;
use tokio::io::AsyncRead;
use tokio::io::AsyncSeek;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
Expand Down Expand Up @@ -57,6 +59,7 @@ use crate::metrics::RaftMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::UpdateMatchedLogId;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::quorum::QuorumSet;
use crate::raft::AddLearnerResponse;
Expand All @@ -76,6 +79,7 @@ use crate::raft_types::RaftLogId;
use crate::replication::Replicate;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationHandle;
use crate::replication::ReplicationResult;
use crate::replication::ReplicationSessionId;
use crate::runtime::RaftRuntime;
use crate::storage::RaftSnapshotBuilder;
Expand All @@ -100,14 +104,16 @@ use crate::Vote;
/// Data for a Leader.
///
/// It is created when RaftCore enters leader state, and will be dropped when it quits leader state.
pub(crate) struct LeaderData<C: RaftTypeConfig> {
pub(crate) struct LeaderData<C: RaftTypeConfig, SD>
where SD: AsyncRead + AsyncSeek + Send + Unpin + 'static
{
/// Channels to send result back to client when logs are committed.
pub(crate) client_resp_channels: BTreeMap<u64, ClientWriteTx<C, C::NodeId, C::Node>>,

/// A mapping of node IDs the replication state of the target node.
// TODO(xp): make it a field of RaftCore. it does not have to belong to leader.
// It requires the Engine to emit correct add/remove replication commands
pub(super) nodes: BTreeMap<C::NodeId, ReplicationHandle<C::NodeId>>,
pub(super) nodes: BTreeMap<C::NodeId, ReplicationHandle<C::NodeId, C::Node, SD>>,

/// The metrics of all replication streams
pub(crate) replication_metrics: Versioned<ReplicationMetrics<C::NodeId>>,
Expand All @@ -116,7 +122,9 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
pub(crate) next_heartbeat: Instant,
}

impl<C: RaftTypeConfig> LeaderData<C> {
impl<C: RaftTypeConfig, SD> LeaderData<C, SD>
where SD: AsyncRead + AsyncSeek + Send + Unpin + 'static
{
pub(crate) fn new() -> Self {
Self {
client_resp_channels: Default::default(),
Expand Down Expand Up @@ -145,7 +153,7 @@ pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<

pub(crate) engine: Engine<C::NodeId, C::Node>,

pub(crate) leader_data: Option<LeaderData<C>>,
pub(crate) leader_data: Option<LeaderData<C, S::SnapshotData>>,

/// The node's current snapshot state.
pub(crate) snapshot_state: SnapshotState<C, S::SnapshotData>,
Expand Down Expand Up @@ -606,7 +614,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn report_metrics(&self, replication: Update<Option<Versioned<ReplicationMetrics<C::NodeId>>>>) {
let replication = match replication {
Update::Update(v) => v,
Expand Down Expand Up @@ -914,7 +922,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
&mut self,
target: C::NodeId,
progress_entry: ProgressEntry<C::NodeId>,
) -> Result<ReplicationHandle<C::NodeId>, N::ConnectionError> {
) -> Result<ReplicationHandle<C::NodeId, C::Node, S::SnapshotData>, N::ConnectionError> {
// Safe unwrap(): target must be in membership
let target_node = self.engine.state.membership_state.effective.get_node(&target).unwrap();

Expand All @@ -928,7 +936,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
session_id,
self.config.clone(),
self.engine.state.committed,
progress_entry,
progress_entry.matching,
network,
self.storage.get_log_reader().await,
self.tx_api.clone(),
Expand Down Expand Up @@ -1278,29 +1286,14 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

RaftMsg::UpdateReplicationProgress {
target,
id,
result,
session_id,
} => {
// If vote or membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") {
self.handle_update_matched(target, result).await?;
}
}
RaftMsg::NeedsSnapshot {
target: _,
tx,
session_id,
} => {
if self.does_replication_session_match(&session_id, "NeedsSnapshot") {
let snapshot = self.storage.get_current_snapshot().await?;

if let Some(snapshot) = snapshot {
let _ = tx.send(snapshot);
return Ok(());
}

unreachable!("A log is lacking, which means a snapshot is already built");
self.handle_replication_progress(target, id, result).await?;
}
}
RaftMsg::ReplicationFatal => {
Expand All @@ -1311,10 +1304,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

#[tracing::instrument(level = "debug", skip_all)]
async fn handle_update_matched(
async fn handle_replication_progress(
&mut self,
target: C::NodeId,
result: Result<ProgressEntry<C::NodeId>, String>,
id: u64,
result: Result<ReplicationResult<C::NodeId>, String>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(
target = display(target),
Expand All @@ -1332,14 +1326,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

let progress = match result {
Ok(p) => p,
Err(_err_str) => {
return Ok(());
}
};

self.engine.replication_handler().update_progress(target, Some(progress.matching.unwrap()));
self.engine.replication_handler().update_progress(target, id, result);
self.run_engine_commands::<Entry<C>>(&[]).await?;

Ok(())
Expand Down Expand Up @@ -1490,10 +1477,37 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
} => {
self.apply_to_state_machine(committed.next_index(), upto.index).await?;
}
Command::ReplicateEntries { upto } => {
Command::ReplicateEnt { req, target } => {
if let Some(l) = &self.leader_data {
for node in l.nodes.values() {
let _ = node.tx_repl.send(Replicate::Entries(*upto));
// TODO: consider remove the returned error from new_client().
// Node may not exist because `RaftNetworkFactory::new_client()` returns an error.
let node = &l.nodes.get(target);

if let Some(node) = node {
match req {
Inflight::None => {
unreachable!("Inflight::None");
}
Inflight::Logs { id, log_id_range } => {
let _ = node.tx_repl.send(Replicate::Ent {
id: *id,
log_id_range: *log_id_range,
});
}
Inflight::Snapshot { id, last_log_id } => {
let snapshot = self.storage.get_current_snapshot().await?;
tracing::debug!("snapshot: {}", snapshot.as_ref().map(|x| &x.meta).summary());

if let Some(snapshot) = snapshot {
debug_assert_eq!(last_log_id, &snapshot.meta.last_log_id);
let _ = node.tx_repl.send(Replicate::Snapshot { id: *id, snapshot });
} else {
unreachable!("No snapshot");
}
}
}
} else {
// TODO(1): if no such node, return an RemoteError?
}
} else {
unreachable!("it has to be a leader!!!");
Expand Down
12 changes: 9 additions & 3 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::ops::Range;
use std::sync::Arc;

use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::raft::VoteRequest;
use crate::EffectiveMembership;
use crate::LogId;
Expand Down Expand Up @@ -51,9 +52,14 @@ where
upto: LogId<NID>,
},

/// Replicate entries upto log id `upto`, inclusive.
ReplicateEntries { upto: Option<LogId<NID>> },
/// Replicate entries to a target.
ReplicateEnt { target: NID, req: Inflight<NID> },

// /// Replicate a snapshot to a target.
// ReplicateSnapshot {
// target: NID,
// snapshot_last_log_id: Option<LogId<NID>>,
// },
/// Membership config changed, need to update replication streams.
UpdateMembership {
// TODO: not used yet.
Expand Down Expand Up @@ -125,7 +131,7 @@ where
Command::ReplicateCommitted { .. } => {}
Command::LeaderCommit { .. } => flags.set_data_changed(),
Command::FollowerCommit { .. } => flags.set_data_changed(),
Command::ReplicateEntries { .. } => {}
Command::ReplicateEnt { .. } => {}
Command::UpdateMembership { .. } => flags.set_cluster_changed(),
Command::UpdateReplicationStreams { .. } => flags.set_replication_changed(),
Command::UpdateReplicationMetrics { .. } => flags.set_replication_changed(),
Expand Down
25 changes: 7 additions & 18 deletions openraft/src/engine/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fn m12() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.log_ids = LogIdList::new([LogId::new(LeaderId::new(0, 0), 0)]);
eng.state.enable_validate = false; // Disable validation for incomplete state
eng
}
Expand Down Expand Up @@ -73,28 +74,22 @@ fn test_elect() -> anyhow::Result<()> {
Command::AppendBlankLog {
log_id: LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
index: 0,
index: 1,
},
},
Command::ReplicateCommitted {
committed: Some(LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
index: 0,
index: 1,
},),
},
Command::LeaderCommit {
already_committed: None,
upto: LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
index: 0,
index: 1,
},
},
Command::ReplicateEntries {
upto: Some(LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
index: 0,
},),
},
],
eng.output.commands
);
Expand Down Expand Up @@ -140,28 +135,22 @@ fn test_elect() -> anyhow::Result<()> {
Command::AppendBlankLog {
log_id: LogId {
leader_id: LeaderId { term: 2, node_id: 1 },
index: 0,
index: 1,
},
},
Command::ReplicateCommitted {
committed: Some(LogId {
leader_id: LeaderId { term: 2, node_id: 1 },
index: 0,
index: 1,
},),
},
Command::LeaderCommit {
already_committed: None,
upto: LogId {
leader_id: LeaderId { term: 2, node_id: 1 },
index: 0,
index: 1,
},
},
Command::ReplicateEntries {
upto: Some(LogId {
leader_id: LeaderId { term: 2, node_id: 1 },
index: 0,
},),
},
],
eng.output.commands
);
Expand Down
Loading

0 comments on commit fa5f31c

Please sign in to comment.