Skip to content

Commit

Permalink
fix: race condition of concurrent snapshot-install and apply.
Browse files Browse the repository at this point in the history
Problem:

Concurrent snapshot-install and apply mess up `last_applied`.

`finalize_snapshot_installation` runs in the `RaftCore` thread.
`apply_to_state_machine` runs in a separate tokio task(thread).

Thus there is chance the `last_applied` being reset to a previous value:

- `apply_to_state_machine` is called and finished in a thread.

- `finalize_snapshot_installation` is called in `RaftCore` thread and
  finished with `last_applied` updated.

- `RaftCore` thread finished waiting for `apply_to_state_machine`, and
  updated `last_applied` to a previous value.

```
RaftCore: -.    install-snapshot,         .-> replicate_to_sm_handle.next(),
           |    update last_applied=5     |   update last_applied=2
           |                              |
           v                              |
task:      apply 2------------------------'
--------------------------------------------------------------------> time
```

Solution:

Rule: All changes to state machine must be serialized.

A temporary simple solution for now is to call all methods that modify state
machine in `RaftCore` thread.
But this way it blocks `RaftCore` thread.

A better way is to move all tasks that modifies state machine to a
standalone thread, and send update request back to `RaftCore` to update
its fields such as `last_applied`
  • Loading branch information
drmingdrmer committed Sep 1, 2021
1 parent 4d58a51 commit eed681d
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 131 deletions.
94 changes: 30 additions & 64 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use tracing::Instrument;

use crate::core::RaftCore;
use crate::core::State;
use crate::core::UpdateCurrentLeader;
Expand Down Expand Up @@ -27,7 +25,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
&mut self,
msg: AppendEntriesRequest<D>,
) -> RaftResult<AppendEntriesResponse> {
tracing::debug!(%self.last_log_id);
tracing::debug!(%self.last_log_id, %self.last_applied);

let mut msg_entries = msg.entries.as_slice();
let mut prev_log_id = msg.prev_log_id;
Expand Down Expand Up @@ -323,21 +321,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
///
/// Very importantly, this routine must not block the main control loop main task, else it
/// may cause the Raft leader to timeout the requests to this node.
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
async fn replicate_to_state_machine_if_needed(&mut self) -> Result<(), RaftError> {
tracing::debug!("replicate_to_sm_if_needed: last_applied: {}", self.last_applied,);

// Perform initial replication to state machine if needed.
if !self.has_completed_initial_replication_to_sm {
// Optimistic update, as failures will cause shutdown.
self.has_completed_initial_replication_to_sm = true;
self.initial_replicate_to_state_machine().await;
return Ok(());
}

// If we already have an active replication task, then do nothing.
if !self.replicate_to_sm_handle.is_empty() {
tracing::debug!("replicate_to_sm_handle is not empty, return");
self.initial_replicate_to_state_machine().await?;
return Ok(());
}

Expand All @@ -353,44 +345,27 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// Drain entries from the beginning of the cache up to commit index.

// TODO(xp): logs in storage must be consecutive.
let entries = self
.storage
.get_log_entries(self.last_applied.index + 1..=self.commit_index)
.await
.map_err(|e| self.map_fatal_storage_error(e))?;

let last_log_id = entries.last().map(|x| x.log_id);
let last_log_id = entries.last().map(|x| x.log_id).unwrap();

tracing::debug!("entries: {:?}", entries.iter().map(|x| x.log_id).collect::<Vec<_>>());
tracing::debug!("entries: {}", entries.as_slice().summary());
tracing::debug!(?last_log_id);

// If we have no data entries to apply, then do nothing.
if entries.is_empty() {
if let Some(log_id) = last_log_id {
self.last_applied = log_id;
self.report_metrics(Update::Ignore);
}
tracing::debug!("entries is empty, return");
return Ok(());
}
let entries_refs: Vec<_> = entries.iter().collect();
self.storage
.apply_to_state_machine(&entries_refs)
.await
.map_err(|e| self.map_fatal_storage_error(e))?;

// Spawn task to replicate these entries to the state machine.
// Linearizability is guaranteed by `replicate_to_sm_handle`, which is the mechanism used
// to ensure that only a single task can replicate data to the state machine, and that is
// owned by a single task, not shared between multiple threads/tasks.
let storage = self.storage.clone();
let handle = tokio::spawn(
async move {
// Create a new vector of references to the entries data ... might have to change this
// interface a bit before 1.0.
let entries_refs: Vec<_> = entries.iter().collect();
storage.apply_to_state_machine(&entries_refs).await?;
Ok(last_log_id)
}
.instrument(tracing::debug_span!("spawn")),
);
self.replicate_to_sm_handle.push(handle);
self.last_applied = last_log_id;

self.report_metrics(Update::Ignore);
self.trigger_log_compaction_if_needed(false);

Ok(())
}
Expand All @@ -399,42 +374,33 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
///
/// This will only be executed once, and only in response to its first payload of entries
/// from the AppendEntries RPC handler.
#[tracing::instrument(level = "trace", skip(self))]
async fn initial_replicate_to_state_machine(&mut self) {
#[tracing::instrument(level = "debug", skip(self))]
async fn initial_replicate_to_state_machine(&mut self) -> Result<(), RaftError> {
let stop = std::cmp::min(self.commit_index, self.last_log_id.index) + 1;
let start = self.last_applied.index + 1;
let storage = self.storage.clone();

// If we already have an active replication task, then do nothing.
if !self.replicate_to_sm_handle.is_empty() {
return;
}

tracing::debug!(start, stop, self.commit_index, %self.last_log_id, "start stop");

// when self.commit_index is not initialized, e.g. the first heartbeat from leader always has a commit_index to
// be 0, because the leader needs one round of heartbeat to find out the commit index.
if start >= stop {
return;
return Ok(());
}

// Fetch the series of entries which must be applied to the state machine, then apply them.
let handle = tokio::spawn(
async move {
let mut new_last_applied: Option<LogId> = None;
let entries = storage.get_log_entries(start..stop).await?;
if let Some(entry) = entries.last() {
new_last_applied = Some(entry.log_id);
}
let data_entries: Vec<_> = entries.iter().collect();
if data_entries.is_empty() {
return Ok(new_last_applied);
}
storage.apply_to_state_machine(&data_entries).await?;
Ok(new_last_applied)
}
.instrument(tracing::debug_span!("spawn-init-replicate-to-sm")),
);
self.replicate_to_sm_handle.push(handle);

let entries = storage.get_log_entries(start..stop).await.map_err(|e| self.map_fatal_storage_error(e))?;

let new_last_applied = entries.last().unwrap();

let data_entries: Vec<_> = entries.iter().collect();
storage.apply_to_state_machine(&data_entries).await.map_err(|e| self.map_fatal_storage_error(e))?;

self.last_applied = new_last_applied.log_id;
self.report_metrics(Update::Ignore);
self.trigger_log_compaction_if_needed(false);

Ok(())
}
}
12 changes: 2 additions & 10 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

/// Handle the post-commit logic for a client request.
#[tracing::instrument(level = "trace", skip(self, req))]
#[tracing::instrument(level = "debug", skip(self, req))]
pub(super) async fn client_request_post_commit(&mut self, req: ClientRequestEntry<D, R>) {
let entry = &req.entry;

Expand Down Expand Up @@ -419,7 +419,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

/// Apply the given log entry to the state machine.
#[tracing::instrument(level = "trace", skip(self, entry))]
#[tracing::instrument(level = "debug", skip(self, entry))]
pub(super) async fn apply_entry_to_state_machine(&mut self, entry: &Entry<D>) -> RaftResult<R> {
// First, we just ensure that we apply any outstanding up to, but not including, the index
// of the given entry. We need to be able to return the data response from applying this
Expand Down Expand Up @@ -453,14 +453,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}

// Before we can safely apply this entry to the state machine, we need to ensure there is
// no pending task to replicate entries to the state machine. This is edge case, and would only
// happen once very early in a new leader's term.
if !self.core.replicate_to_sm_handle.is_empty() {
if let Some(Ok(replicate_to_sm_result)) = self.core.replicate_to_sm_handle.next().await {
self.core.handle_replicate_to_sm_result(replicate_to_sm_result)?;
}
}
// Apply this entry to the state machine and return its data response.
let res = self.core.storage.apply_to_state_machine(&[entry]).await.map_err(|err| {
if err.downcast_ref::<S::ShutdownError>().is_some() {
Expand Down
53 changes: 44 additions & 9 deletions async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,52 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
) -> RaftResult<()> {
snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?;

self.storage
// Caveat: All changes to state machine must be serialized
//
// If `finalize_snapshot_installation` is run in RaftCore thread,
// there is chance the last_applied being reset to a previous value:
//
// ```
// RaftCore: -. install-snapc, .-> replicate_to_sm_handle.next(),
// | update last_applied=5 | update last_applied=2
// | |
// v |
// task: apply 2------------------------'
// --------------------------------------------------------------------> time
// ```

let changes = self
.storage
.finalize_snapshot_installation(&req.meta, snapshot)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;

let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
self.update_membership(membership)?;
self.last_log_id = req.meta.last_log_id;
self.last_applied = req.meta.last_log_id;
self.snapshot_last_log_id = req.meta.last_log_id;
self.report_metrics(Update::Ignore);
.map_err(|e| self.map_fatal_storage_error(e))?;

tracing::debug!("update after apply or install-snapshot: {:?}", changes);

// After installing snapshot, no inconsistent log is removed.
// This does not affect raft consistency.
// If you have any question about this, let me know: drdr.xp at gmail.com

if let Some(last_applied) = changes.last_applied {
// snapshot is installed
self.last_applied = last_applied;

if self.last_log_id < self.last_applied {
self.last_log_id = self.last_applied;
}

// There could be unknown membership in the snapshot.
let membership =
self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;

self.update_membership(membership)?;

self.snapshot_last_log_id = self.last_applied;
self.report_metrics(Update::Ignore);
} else {
// snapshot not installed
}

Ok(())
}
}
44 changes: 0 additions & 44 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use std::sync::Arc;

use futures::future::AbortHandle;
use futures::future::Abortable;
use futures::stream::FuturesOrdered;
use futures::stream::StreamExt;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -120,13 +118,6 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
/// This is primarily used in making a determination on when a compaction job needs to be triggered.
snapshot_last_log_id: LogId,

/// The stream of join handles from state machine replication tasks. There will only ever be
/// a maximum of 1 element at a time.
///
/// This abstraction is needed to ensure that replicating to the state machine does not block
/// the AppendEntries RPC flow, and to ensure that we have a smooth transition to becoming
/// leader without concern over duplicate application of entries to the state machine.
replicate_to_sm_handle: FuturesOrdered<JoinHandle<anyhow::Result<Option<LogId>>>>,
/// A bool indicating if this system has performed its initial replication of
/// outstanding entries to the state machine.
has_completed_initial_replication_to_sm: bool,
Expand Down Expand Up @@ -171,7 +162,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_log_id: LogId { term: 0, index: 0 },
snapshot_state: None,
snapshot_last_log_id: LogId { term: 0, index: 0 },
replicate_to_sm_handle: FuturesOrdered::new(),
has_completed_initial_replication_to_sm: false,
last_heartbeat: None,
next_election_timeout: None,
Expand Down Expand Up @@ -464,22 +454,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
);
}

/// Handle the output of an async task replicating entries to the state machine.
#[tracing::instrument(level = "trace", skip(self, res))]
pub(self) fn handle_replicate_to_sm_result(&mut self, res: anyhow::Result<Option<LogId>>) -> RaftResult<()> {
let last_applied_opt = res.map_err(|err| self.map_fatal_storage_error(err))?;

tracing::debug!("last_applied:{:?}", last_applied_opt);

if let Some(last_applied) = last_applied_opt {
self.last_applied = last_applied;
}

self.report_metrics(Update::Ignore);
self.trigger_log_compaction_if_needed(false);
Ok(())
}

/// Reject an init config request due to the Raft node being in a state which prohibits the request.
#[tracing::instrument(level = "trace", skip(self, tx))]
fn reject_init_with_config(&self, tx: oneshot::Sender<Result<(), InitializeError>>) {
Expand Down Expand Up @@ -731,12 +705,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let _ent = span.enter();
self.handle_replica_event(event).await;
}
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
tracing::info!("leader recv from replicate_to_sm_handle: {:?}", repl_sm_result);

// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => {
tracing::info!("leader recv from rx_shudown");
self.core.set_target_state(State::Shutdown);
Expand Down Expand Up @@ -909,10 +877,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
Expand Down Expand Up @@ -980,10 +944,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
Expand Down Expand Up @@ -1047,10 +1007,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
Expand Down
1 change: 1 addition & 0 deletions async-raft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use crate::raft::Raft;
pub use crate::raft_types::LogId;
pub use crate::raft_types::SnapshotId;
pub use crate::raft_types::SnapshotSegmentId;
pub use crate::raft_types::StateMachineChanges;
pub use crate::raft_types::Update;
pub use crate::replication::ReplicationMetrics;
pub use crate::storage::RaftStorage;
Expand Down
8 changes: 8 additions & 0 deletions async-raft/src/raft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,11 @@ pub enum Update<T> {
Update(T),
Ignore,
}

/// The changes of a state machine.
/// E.g. when applying a log to state machine, or installing a state machine from snapshot.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StateMachineChanges {
pub last_applied: Option<LogId>,
pub is_snapshot: bool,
}
3 changes: 2 additions & 1 deletion async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::io::AsyncWrite;
use crate::raft::Entry;
use crate::raft::MembershipConfig;
use crate::raft_types::SnapshotId;
use crate::raft_types::StateMachineChanges;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
Expand Down Expand Up @@ -269,7 +270,7 @@ where
&self,
meta: &SnapshotMeta,
snapshot: Box<Self::SnapshotData>,
) -> Result<()>;
) -> Result<StateMachineChanges>;

/// Get a readable handle to the current snapshot, along with its metadata.
///
Expand Down
2 changes: 1 addition & 1 deletion async-raft/tests/snapshot_overrides_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod fixtures;
///
/// export RUST_LOG=async_raft,memstore,snapshot_overrides_membership=trace
/// cargo test -p async-raft --test snapshot_overrides_membership
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn snapshot_overrides_membership() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();
Expand Down
Loading

0 comments on commit eed681d

Please sign in to comment.