Skip to content

Commit

Permalink
Move state machine replication off of AppendEntries RPC path.
Browse files Browse the repository at this point in the history
With this change, we are also caching entries which come from the leader
replication protocol. As entries come in, we append them to the log and
then cache the entry. When it is safe to apply entries to the state
machine, we will take them directly from the in-memory cache instead of
going to disk.

Moreover, and most importantly, we are not longer blocking the
AppendEntries RPC handler with the logic of the state machine
replication workflow. There is a small amount of async task juggling to
ensure that we don't run into situations where we would have two writers
attempting to write to the state machine at the same time. This is
easily avoided in our algorithm.

closes #12
closes #76
  • Loading branch information
thedodd committed Nov 20, 2020
1 parent f5f9a41 commit 7db81f7
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 31 deletions.
120 changes: 94 additions & 26 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if msg_prev_index_is_min || msg_index_and_term_match {
// If this is just a heartbeat, then respond.
if msg.entries.is_empty() {
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
Expand All @@ -65,7 +65,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// Else, append log entries.
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
Expand Down Expand Up @@ -156,7 +156,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
tracing::trace!("end log consistency check");

self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
Expand Down Expand Up @@ -198,20 +198,93 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
Ok(())
}

/// Replicate outstanding logs to the state machine if needed.
#[tracing::instrument(level = "trace", skip(self, report_metrics))]
async fn replicate_to_state_machine_if_needed(&mut self, report_metrics: &mut bool) -> RaftResult<()> {
if self.commit_index > self.last_applied {
// Fetch the series of entries which must be applied to the state machine, and apply them.
let stop = std::cmp::min(self.commit_index, self.last_log_index) + 1;
let entries = self
.storage
.get_log_entries(self.last_applied + 1, stop)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
/// Replicate any outstanding entries to the state machine for which it is safe to do so.
///
/// Very importantly, this routine must not block the main control loop main task, else it
/// may cause the Raft leader to timeout the requests to this node.
#[tracing::instrument(level = "trace", skip(self))]
async fn replicate_to_state_machine_if_needed(&mut self, entries: Vec<Entry<D>>) {
// Update cache. Always.
for entry in entries {
self.entries_cache.insert(entry.index, entry);
}
// Perform initial replication to state machine if needed.
if !self.has_completed_initial_replication_to_sm {
// Optimistic update, as failures will cause shutdown.
self.has_completed_initial_replication_to_sm = true;
return self.initial_replicate_to_state_machine().await;
}
// If we already have an active replication task, then do nothing.
if !self.replicate_to_sm_handle.is_empty() {
return;
}
// If we don't have any new entries to replicate, then do nothing.
if self.commit_index <= self.last_applied {
return;
}
// If we have no cached entries, then do nothing.
let first_idx = match self.entries_cache.iter().next() {
Some((_, entry)) => entry.index,
None => return,
};

// Drain entries from the beginning of the cache up to commit index.
let mut last_entry_seen: Option<u64> = None;
let entries: Vec<_> = (first_idx..=self.commit_index)
.filter_map(|idx| {
if let Some(entry) = self.entries_cache.remove(&idx) {
last_entry_seen = Some(entry.index);
match entry.payload {
EntryPayload::Normal(inner) => Some((entry.index, inner.data)),
_ => None,
}
} else {
None
}
})
.collect();
// If we actually have some cached entries to apply, then we optimistically update, as
// `self.last_applied` is held in-memory only, and if an error does come up, then
// Raft will go into shutdown.
if let Some(index) = last_entry_seen {
self.last_applied = index;
self.report_metrics();
}
// If we have no data entries to apply, then do nothing.
if entries.is_empty() {
return;
}
// Create a new vector of references to the entries data ... might have to change this
// interface a bit before 1.0.
let storage = self.storage.clone();
let handle = tokio::spawn(async move {
let entries_refs: Vec<_> = entries.iter().map(|(k, v)| (k, v)).collect();
storage.replicate_to_state_machine(&entries_refs).await?;
Ok(None)
});
self.replicate_to_sm_handle.push(handle);
}

/// Perform an initial replication of outstanding entries to the state machine.
///
/// This will only be executed once when a Raft node first comes online.
#[tracing::instrument(level = "trace", skip(self))]
async fn initial_replicate_to_state_machine(&mut self) {
let stop = std::cmp::min(self.commit_index, self.last_log_index) + 1;
let start = self.last_applied + 1;
let storage = self.storage.clone();

// If we already have an active replication task, then do nothing.
if !self.replicate_to_sm_handle.is_empty() {
return;
}

// Fetch the series of entries which must be applied to the state machine, then apply them.
let handle = tokio::spawn(async move {
let mut new_last_applied: Option<u64> = None;
let entries = storage.get_log_entries(start, stop).await?;
if let Some(entry) = entries.last() {
self.last_applied = entry.index;
*report_metrics = true;
new_last_applied = Some(entry.index);
}
let data_entries: Vec<_> = entries
.iter()
Expand All @@ -221,16 +294,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
})
.collect();
if data_entries.is_empty() {
return Ok(());
return Ok(new_last_applied);
}
self.storage
.replicate_to_state_machine(&data_entries)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;

// Request async compaction, if needed.
self.trigger_log_compaction_if_needed();
}
Ok(())
storage.replicate_to_state_machine(&data_entries).await?;
Ok(new_last_applied)
});
self.replicate_to_sm_handle.push(handle);
}
}
14 changes: 11 additions & 3 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// Check to see if we have any config change logs newer than our commit index. If so, then
// we need to drive the commitment of the config change to the cluster.
let mut pending_config = None; // The inner bool represents `is_in_join_consensus`.
let mut pending_config = None; // The inner bool represents `is_in_joint_consensus`.
if self.core.last_log_index > self.core.commit_index {
let (stale_logs_start, stale_logs_stop) = (self.core.commit_index + 1, self.core.last_log_index + 1);
pending_config = self.core.storage.get_log_entries(stale_logs_start, stale_logs_stop).await
Expand All @@ -80,8 +80,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.report_metrics();

// Setup any callbacks needed for responding to commitment of a pending config.
if let Some(is_in_join_consensus) = pending_config {
if is_in_join_consensus {
if let Some(is_in_joint_consensus) = pending_config {
if is_in_joint_consensus {
self.joint_consensus_cb.push(rx_payload_committed); // Receiver for when the joint consensus is committed.
} else {
self.uniform_consensus_cb.push(rx_payload_committed); // Receiver for when the uniform consensus is committed.
Expand Down Expand Up @@ -349,6 +349,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}

// Before we can safely apply this entry to the state machine, we need to ensure there is
// no pending task to replicate entries to the state machine. This is edge case, and would only
// happen once very early in a new leader's term.
if !self.core.replicate_to_sm_handle.is_empty() {
if let Some(Ok(replicate_to_sm_result)) = self.core.replicate_to_sm_handle.next().await {
self.core.handle_replicate_to_sm_result(replicate_to_sm_result)?;
}
}
// Apply this entry to the state machine and return its data response.
let res = self
.core
Expand Down
53 changes: 52 additions & 1 deletion async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::config::{Config, SnapshotPolicy};
use crate::core::client::ClientRequestEntry;
use crate::error::{ChangeConfigError, ClientReadError, ClientWriteError, InitializeError, RaftError, RaftResult};
use crate::metrics::RaftMetrics;
use crate::raft::{ChangeMembershipTx, ClientReadResponseTx, ClientWriteRequest, ClientWriteResponseTx, MembershipConfig, RaftMsg};
use crate::raft::{ChangeMembershipTx, ClientReadResponseTx, ClientWriteRequest, ClientWriteResponseTx, Entry, MembershipConfig, RaftMsg};
use crate::replication::{RaftEvent, ReplicaEvent, ReplicationStream};
use crate::storage::HardState;
use crate::{AppData, AppDataResponse, NodeId, RaftNetwork, RaftStorage};
Expand Down Expand Up @@ -85,6 +85,25 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
/// This is primarily used in making a determination on when a compaction job needs to be triggered.
snapshot_index: u64,

/// A cache of entries which are waiting to be replicated to the state machine.
///
/// It is important to note that this cache must only be populated from the AppendEntries RPC
/// handler, as these values must only ever represent the entries which have been sent from
/// the current cluster leader.
///
/// Whenever there is a leadership change, this value will be cleared.
entries_cache: BTreeMap<u64, Entry<D>>,
/// The stream of join handles from state machine replication tasks. There will only ever be
/// a maximum of 1 element at a time.
///
/// This abstraction is needed to ensure that replicating to the state machine does not block
/// the AppendEntries RPC flow, and to ensure that we have a smooth transition to becoming
/// leader without concern over duplicate application of entries to the state machine.
replicate_to_sm_handle: FuturesOrdered<JoinHandle<anyhow::Result<Option<u64>>>>,
/// A bool indicating if this system has performed its initial replication of
/// outstanding entries to the state machine.
has_completed_initial_replication_to_sm: bool,

/// The last time a heartbeat was received.
last_heartbeat: Option<Instant>,
/// The duration until the next election timeout.
Expand Down Expand Up @@ -121,6 +140,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_log_term: 0,
snapshot_state: None,
snapshot_index: 0,
entries_cache: Default::default(),
replicate_to_sm_handle: FuturesOrdered::new(),
has_completed_initial_replication_to_sm: false,
last_heartbeat: None,
next_election_timeout: None,
tx_compaction,
Expand Down Expand Up @@ -250,6 +272,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Update the value of the `current_leader` property.
#[tracing::instrument(level = "trace", skip(self))]
fn update_current_leader(&mut self, update: UpdateCurrentLeader) {
self.entries_cache.clear();
match update {
UpdateCurrentLeader::ThisNode => {
self.current_leader = Some(self.id);
Expand Down Expand Up @@ -367,6 +390,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
);
}

/// Handle the output of an async task replicating entries to the state machine.
#[tracing::instrument(level = "trace", skip(self, res))]
pub(self) fn handle_replicate_to_sm_result(&mut self, res: anyhow::Result<Option<u64>>) -> RaftResult<()> {
let last_applied_opt = res.map_err(|err| self.map_fatal_storage_error(err))?;
if let Some(last_applied) = last_applied_opt {
self.last_applied = last_applied;
}
self.report_metrics();
self.trigger_log_compaction_if_needed();
Ok(())
}

/// Reject an init config request due to the Raft node being in a state which prohibits the request.
#[tracing::instrument(level = "trace", skip(self, tx))]
fn reject_init_with_config(&self, tx: oneshot::Sender<Result<(), InitializeError>>) {
Expand Down Expand Up @@ -626,6 +661,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
Some(event) = self.replicationrx.next() => self.handle_replica_event(event).await,
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
Expand Down Expand Up @@ -777,6 +816,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
Expand Down Expand Up @@ -837,6 +880,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
Expand Down Expand Up @@ -892,6 +939,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
Expand Down
2 changes: 1 addition & 1 deletion async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// will always be found and this block will never even be executed.
//
// If this block is executed, and a snapshot is needed, the repl stream will submit another
// request here shortly, and will hit the above logic where it will await the snapshot complection.
// request here shortly, and will hit the above logic where it will await the snapshot completion.
self.core.trigger_log_compaction_if_needed();
Ok(())
}
Expand Down

0 comments on commit 7db81f7

Please sign in to comment.