Skip to content

Commit

Permalink
fix: too many(50) inconsistent log should not live lock append-entries
Browse files Browse the repository at this point in the history
- Reproduce the bug that when append-entries, if there are more than 50
  inconsistent logs,  the responded `conflict` is always set to
  `self.last_log`, which blocks replication for ever.
  Because the next time append-entries use the same `prev_log_id`, it
  actually does not search backward for the first consistent log entry.

  https://github.com/drmingdrmer/async-raft/blob/79a39970855d80e1d3b761fadbce140ecf1da59e/async-raft/src/core/append_entries.rs#L131-L154

The test to reproduce it fakes a cluster of node 0,1,2:
R0 has 100 uncommitted log at term 2.
R2 has 100 uncommitted log at term 3.

```
R0 ... 2,99 2,100
R1
R2 ... 3,99, 3,00
```

Before this fix, brings up the cluster, R2 becomes leader and will never sync any log to R0.

The fix is also quite simple:

- Search backward instead of searching forward, to find the last log
  entry that matches `prev_log_id.term`, and responds this log id to the
  leader to let it send next `append_entries` RPC since this log id.

- If no such matching term is found, use the first log id it sees, e.g.,
  the entry at index `prev_log_id.index - 50` for next `append_entries`.
  • Loading branch information
drmingdrmer committed Aug 30, 2021
1 parent ab6689d commit 6d53aa1
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 14 deletions.
2 changes: 1 addition & 1 deletion async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::RaftStorage;

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> NonVoterState<'a, D, R, N, S> {
/// Handle the admin `init_with_config` command.
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn handle_init_with_config(
&mut self,
mut members: BTreeSet<NodeId>,
Expand Down
23 changes: 16 additions & 7 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
&mut self,
msg: AppendEntriesRequest<D>,
) -> RaftResult<AppendEntriesResponse> {
tracing::debug!(%self.last_log_id);

// If message's term is less than most recent term, then we do not honor the request.
if msg.term < self.current_term {
tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term");
Expand Down Expand Up @@ -151,21 +153,25 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// The target entry does not have the same term. Fetch the last 50 logs, and use the last
// entry of that payload which is still in the target term for conflict optimization.
else {
let start = if msg.prev_log_id.index >= 50 {
let mut start = if msg.prev_log_id.index >= 50 {
msg.prev_log_id.index - 50
} else {
0
};
if start == 0 {
start = 1
}
let old_entries = self
.storage
.get_log_entries(start..msg.prev_log_id.index)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let opt = match old_entries.iter().find(|entry| entry.log_id.term == msg.prev_log_id.term) {

let first = old_entries.first().map(|x| x.log_id).unwrap_or_default();

let opt = match old_entries.iter().rev().find(|entry| entry.log_id.term <= msg.prev_log_id.term) {
Some(entry) => Some(ConflictOpt { log_id: entry.log_id }),
None => Some(ConflictOpt {
log_id: self.last_log_id,
}),
None => Some(ConflictOpt { log_id: first }),
};
if report_metrics {
self.report_metrics(Update::Ignore);
Expand Down Expand Up @@ -312,8 +318,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return;
}

assert!(start <= stop);
if start == stop {
tracing::debug!(start, stop, self.commit_index, %self.last_log_id, "start stop");

// when self.commit_index is not initialized, e.g. the first heartbeat from leader always has a commit_index to
// be 0, because the leader needs one round of heartbeat to find out the commit index.
if start >= stop {
return;
}

Expand Down
8 changes: 4 additions & 4 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

/// The main loop of the Raft protocol.
#[tracing::instrument(level="trace", skip(self), fields(id=self.id, cluster=%self.config.cluster_name))]
#[tracing::instrument(level="debug", skip(self), fields(id=self.id, cluster=%self.config.cluster_name))]
async fn main(mut self) -> RaftResult<()> {
tracing::debug!("raft node is initializing");

Expand All @@ -210,9 +210,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

let has_log = self.last_log_id.index != u64::MIN;
let single = self.membership.members.len() == 1;
let is_candidate = self.membership.contains(&self.id);
let is_voter = self.membership.contains(&self.id);

self.target_state = match (has_log, single, is_candidate) {
self.target_state = match (has_log, single, is_voter) {
// A restarted raft that already received some logs but was not yet added to a cluster.
// It should remain in NonVoter state, not Follower.
(true, true, false) => State::NonVoter,
Expand All @@ -239,7 +239,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// to ensure that restarted nodes don't disrupt a stable cluster by timing out and driving up their
// term before network communication is established.
let inst = Instant::now()
+ Duration::from_secs(30)
+ Duration::from_secs(2)
+ Duration::from_millis(self.config.new_rand_election_timeout());
self.next_election_timeout = Some(inst);
}
Expand Down
4 changes: 3 additions & 1 deletion async-raft/src/core/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// An RPC invoked by candidates to gather votes (§5.2).
///
/// See `receiver implementation: RequestVote RPC` in raft-essentials.md in this repo.
#[tracing::instrument(level = "trace", skip(self, msg))]
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn handle_vote_request(&mut self, msg: VoteRequest) -> RaftResult<VoteResponse> {
tracing::debug!({candidate=msg.candidate_id, self.current_term, rpc_term=msg.term}, "start handle_vote_request");

// If candidate's current term is less than this nodes current term, reject.
if msg.term < self.current_term {
tracing::debug!({candidate=msg.candidate_id, self.current_term, rpc_term=msg.term}, "RequestVote RPC term is less than current term");
Expand Down
132 changes: 132 additions & 0 deletions async-raft/tests/append_inconsistent_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::raft::Entry;
use async_raft::raft::EntryPayload;
use async_raft::storage::HardState;
use async_raft::Config;
use async_raft::LogId;
use async_raft::RaftStorage;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::btreeset;

#[macro_use]
mod fixtures;

/// Too many inconsistent log should not block replication.
///
/// Reproduce the bug that when append-entries, if there are more than 50 inconsistent logs the conflict is always set
/// to `last_log`, which blocks replication for ever.
/// https://github.com/drmingdrmer/async-raft/blob/79a39970855d80e1d3b761fadbce140ecf1da59e/async-raft/src/core/append_entries.rs#L131-L154
///
/// - fake a cluster of node 0,1,2. R0 has ~100 uncommitted log at term 2. R2 has ~100 uncommitted log at term 3.
///
/// ```
/// R0 ... 2,99 2,100
/// R1
/// R2 ... 3,99, 3,00
/// ```
///
/// - Start the cluster and node 2 start to replicate logs.
/// - test the log should be replicated to node 0.
///
/// RUST_LOG=async_raft,memstore,append_inconsistent_log=trace cargo test -p async-raft --test append_inconsistent_log
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn append_inconsistent_log() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;

let mut n_logs = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {}).await?;

tracing::info!("--- remove all nodes and fake the logs");

let (r0, sto0) = router.remove_node(0).await.unwrap();
let (r1, sto1) = router.remove_node(1).await.unwrap();
let (r2, sto2) = router.remove_node(2).await.unwrap();

r0.shutdown().await?;
r1.shutdown().await?;
r2.shutdown().await?;

for i in n_logs + 1..=100 {
sto0.append_to_log(&[&Entry {
log_id: LogId { term: 2, index: i },
payload: EntryPayload::Blank,
}])
.await?;

sto2.append_to_log(&[&Entry {
log_id: LogId { term: 3, index: i },
payload: EntryPayload::Blank,
}])
.await?;
}

sto0.save_hard_state(&HardState {
current_term: 2,
voted_for: Some(0),
})
.await?;

sto2.save_hard_state(&HardState {
current_term: 3,
voted_for: Some(2),
})
.await?;

n_logs = 100;

tracing::info!("--- restart node 1 and isolate. To let node-2 to become leader, node-1 should not vote for node-0");
{
router.new_raft_node_with_sto(1, sto1.clone()).await;
router.isolate_node(1).await;
}

tracing::info!("--- restart node 0 and 2");
{
router.new_raft_node_with_sto(0, sto0.clone()).await;
router.new_raft_node_with_sto(2, sto2.clone()).await;
}

// leader appends a blank log.
n_logs += 1;

tracing::info!("--- wait for node states");
{
router
.wait_for_state(
&btreeset! {0},
State::Follower,
Some(Duration::from_millis(2000)),
"node 0 become follower",
)
.await?;

router
.wait_for_state(
&btreeset! {2},
State::Leader,
Some(Duration::from_millis(5000)),
"node 2 become leader",
)
.await?;
}

router
.wait(&0, Some(Duration::from_millis(2000)))
.await?
.metrics(|x| x.last_log_index == n_logs, "sync log to node 0")
.await?;

let logs = sto0.get_log_entries(60..=60).await?;
assert_eq!(3, logs.first().unwrap().log_id.term, "log is overridden by leader logs");

Ok(())
}
2 changes: 1 addition & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}
}

#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
async fn save_hard_state(&self, hs: &HardState) -> Result<()> {
self.defensive_incremental_hard_state(hs).await?;

Expand Down

0 comments on commit 6d53aa1

Please sign in to comment.