diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 440b4c4af..47ce0f511 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -46,6 +46,13 @@ jobs: with: command: test args: -p async-raft --test client_writes + + - name: Integration Test | Receive Conflict when appending empty entries. + uses: actions-rs/cargo@v1 + with: + command: test + args: -p async-raft --test conflict_with_empty_entries + - name: Integration Test | Singlenode uses: actions-rs/cargo@v1 with: diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index ef00e0f0b..45a6a2d06 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -45,19 +45,6 @@ impl, S: RaftStorage> Ra self.set_target_state(State::Follower); } - // If this is just a heartbeat, then respond. - if msg.entries.is_empty() { - self.replicate_to_state_machine_if_needed(msg.entries).await; - if report_metrics { - self.report_metrics(); - } - return Ok(AppendEntriesResponse { - term: self.current_term, - success: true, - conflict_opt: None, - }); - } - // If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local // log info, then replication is g2g. let msg_prev_index_is_min = msg.prev_log_index == u64::min_value(); diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index 57d068b46..c0f1e230a 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -377,7 +377,7 @@ pub struct AppendEntriesResponse { /// This implementation of Raft uses this value to more quickly synchronize a leader with its /// followers which may be some distance behind in replication, may have conflicting entries, or /// which may be new to the cluster. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct ConflictOpt { /// The term of the most recent entry which does not conflict with the received request. pub term: u64, diff --git a/async-raft/tests/conflict_with_empty_entries.rs b/async-raft/tests/conflict_with_empty_entries.rs new file mode 100644 index 000000000..a41812eef --- /dev/null +++ b/async-raft/tests/conflict_with_empty_entries.rs @@ -0,0 +1,119 @@ +use std::sync::Arc; + +use anyhow::Result; + +use async_raft::{Config, RaftNetwork}; +use async_raft::raft::{AppendEntriesRequest, ConflictOpt, Entry, EntryNormal, EntryPayload}; +use fixtures::RaftRouter; +use memstore::ClientRequest; + +mod fixtures; + +/// Cluster conflict_with_empty_entries test. +/// +/// `append_entries` should get a reponse with non-none ConflictOpt even if the entries in message +/// is empty. +/// Otherwise if no conflict is found the leader will never be able to sync logs to a new added +/// NonVoter, until a next log is proposed on leader. +/// +/// What does this test do? +/// +/// - brings a 1 NonVoter node online. +/// +/// - send `append_logs` message to it with empty `entries` and some non-zero `prev_log_index`. +/// - asserts that a reponse with ConflictOpt set. +/// +/// - feed several logs to it. +/// +/// - send `append_logs` message with conflicting prev_log_index and empty `entries`. +/// - asserts that a reponse with ConflictOpt set. +/// +/// RUST_LOG=async_raft,memstore,conflict_with_empty_entries=trace cargo test -p async-raft --test conflict_with_empty_entries +#[tokio::test(flavor = "multi_thread", worker_threads = 6)] +async fn conflict_with_empty_entries() -> Result<()> { + fixtures::init_tracing(); + + // Setup test dependencies. + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); + let router = Arc::new(RaftRouter::new(config.clone())); + + router.new_raft_node(0).await; + + + // Expect conflict even if the message contains no entries. + + let rpc = AppendEntriesRequest:: { + term: 1, + leader_id: 1, + prev_log_index: 5, + prev_log_term: 1, + entries: vec![], + leader_commit: 5, + }; + + let resp = router.append_entries(0, rpc).await?; + assert_eq!(false, resp.success); + assert!(resp.conflict_opt.is_some()); + let c = resp.conflict_opt.unwrap(); + assert_eq!(ConflictOpt { + term: 0, + index: 0, + }, c); + + + // Feed 2 logs + + let rpc = AppendEntriesRequest:: { + term: 1, + leader_id: 1, + prev_log_index: 0, + prev_log_term: 1, + entries: vec![ + Entry { + term: 1, + index: 1, + payload: EntryPayload::Blank, + }, + Entry { + term: 1, + index: 2, + payload: EntryPayload::Normal(EntryNormal { + data: ClientRequest { + client: "foo".to_string(), + serial: 1, + status: "bar".to_string(), + }, + }), + }, + ], + leader_commit: 5, + }; + + let resp = router.append_entries(0, rpc).await?; + assert_eq!(true, resp.success); + assert!(resp.conflict_opt.is_none()); + + + // Expect a conflict with prev_log_index == 3 + + let rpc = AppendEntriesRequest:: { + term: 1, + leader_id: 1, + prev_log_index: 3, + prev_log_term: 1, + entries: vec![], + leader_commit: 5, + }; + + let resp = router.append_entries(0, rpc).await?; + assert_eq!(false, resp.success); + assert!(resp.conflict_opt.is_some()); + let c = resp.conflict_opt.unwrap(); + assert_eq!(ConflictOpt { + term: 1, + index: 2, + }, c); + + Ok(()) +} +