Skip to content

Commit

Permalink
fix: a conflict is expected even when appending empty enties
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jun 1, 2021
1 parent 856a081 commit 6202138
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 14 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ jobs:
with:
command: test
args: -p async-raft --test client_writes

- name: Integration Test | Receive Conflict when appending empty entries.
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test conflict_with_empty_entries

- name: Integration Test | Singlenode
uses: actions-rs/cargo@v1
with:
Expand Down
13 changes: 0 additions & 13 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.set_target_state(State::Follower);
}

// If this is just a heartbeat, then respond.
if msg.entries.is_empty() {
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
return Ok(AppendEntriesResponse {
term: self.current_term,
success: true,
conflict_opt: None,
});
}

// If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local
// log info, then replication is g2g.
let msg_prev_index_is_min = msg.prev_log_index == u64::min_value();
Expand Down
2 changes: 1 addition & 1 deletion async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ pub struct AppendEntriesResponse {
/// This implementation of Raft uses this value to more quickly synchronize a leader with its
/// followers which may be some distance behind in replication, may have conflicting entries, or
/// which may be new to the cluster.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ConflictOpt {
/// The term of the most recent entry which does not conflict with the received request.
pub term: u64,
Expand Down
119 changes: 119 additions & 0 deletions async-raft/tests/conflict_with_empty_entries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use std::sync::Arc;

use anyhow::Result;

use async_raft::{Config, RaftNetwork};
use async_raft::raft::{AppendEntriesRequest, ConflictOpt, Entry, EntryNormal, EntryPayload};
use fixtures::RaftRouter;
use memstore::ClientRequest;

mod fixtures;

/// Cluster conflict_with_empty_entries test.
///
/// `append_entries` should get a reponse with non-none ConflictOpt even if the entries in message
/// is empty.
/// Otherwise if no conflict is found the leader will never be able to sync logs to a new added
/// NonVoter, until a next log is proposed on leader.
///
/// What does this test do?
///
/// - brings a 1 NonVoter node online.
///
/// - send `append_logs` message to it with empty `entries` and some non-zero `prev_log_index`.
/// - asserts that a reponse with ConflictOpt set.
///
/// - feed several logs to it.
///
/// - send `append_logs` message with conflicting prev_log_index and empty `entries`.
/// - asserts that a reponse with ConflictOpt set.
///
/// RUST_LOG=async_raft,memstore,conflict_with_empty_entries=trace cargo test -p async-raft --test conflict_with_empty_entries
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn conflict_with_empty_entries() -> Result<()> {
fixtures::init_tracing();

// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));

router.new_raft_node(0).await;


// Expect conflict even if the message contains no entries.

let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
term: 1,
leader_id: 1,
prev_log_index: 5,
prev_log_term: 1,
entries: vec![],
leader_commit: 5,
};

let resp = router.append_entries(0, rpc).await?;
assert_eq!(false, resp.success);
assert!(resp.conflict_opt.is_some());
let c = resp.conflict_opt.unwrap();
assert_eq!(ConflictOpt {
term: 0,
index: 0,
}, c);


// Feed 2 logs

let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
term: 1,
leader_id: 1,
prev_log_index: 0,
prev_log_term: 1,
entries: vec![
Entry {
term: 1,
index: 1,
payload: EntryPayload::Blank,
},
Entry {
term: 1,
index: 2,
payload: EntryPayload::Normal(EntryNormal {
data: ClientRequest {
client: "foo".to_string(),
serial: 1,
status: "bar".to_string(),
},
}),
},
],
leader_commit: 5,
};

let resp = router.append_entries(0, rpc).await?;
assert_eq!(true, resp.success);
assert!(resp.conflict_opt.is_none());


// Expect a conflict with prev_log_index == 3

let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
term: 1,
leader_id: 1,
prev_log_index: 3,
prev_log_term: 1,
entries: vec![],
leader_commit: 5,
};

let resp = router.append_entries(0, rpc).await?;
assert_eq!(false, resp.success);
assert!(resp.conflict_opt.is_some());
let c = resp.conflict_opt.unwrap();
assert_eq!(ConflictOpt {
term: 1,
index: 2,
}, c);

Ok(())
}

0 comments on commit 6202138

Please sign in to comment.