Skip to content

Commit

Permalink
change: remove ConflictOpt, which is a wrapper of log_id; add matched…
Browse files Browse the repository at this point in the history
… log id in AppendEntriesResponse
  • Loading branch information
drmingdrmer committed Dec 23, 2021
1 parent 6155117 commit c61b4c4
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 103 deletions.
21 changes: 14 additions & 7 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::core::UpdateCurrentLeader;
use crate::error::RaftResult;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ConflictOpt;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::ActiveMembership;
Expand Down Expand Up @@ -36,8 +35,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term");
return Ok(AppendEntriesResponse {
term: self.current_term,
success: false,
conflict_opt: None,
matched: None,
conflict: None,
});
}

Expand Down Expand Up @@ -247,11 +246,19 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

return Ok(AppendEntriesResponse {
term: self.current_term,
success: false,
conflict_opt: Some(ConflictOpt { log_id: *prev_log_id }),
matched: None,
conflict: Some(*prev_log_id),
});
}

// If prev_log_id matches local entry, then every inconsistent entries will be removed.
// Thus the last known matching log id has to be the last entry id.
let matched = Some(if entries.is_empty() {
*prev_log_id
} else {
entries[entries.len() - 1].log_id
});

// The entries left are all inconsistent log or absent
let (n_matching, entries) = self.skip_matching_entries(entries).await?;

Expand Down Expand Up @@ -279,8 +286,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

Ok(AppendEntriesResponse {
term: self.current_term,
success: true,
conflict_opt: None,
matched,
conflict: None,
})
}

Expand Down
32 changes: 15 additions & 17 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,26 +520,23 @@ impl<D: AppData> MessageSummary for AppendEntriesRequest<D> {
pub struct AppendEntriesResponse {
/// The responding node's current term, for leader to update itself.
pub term: u64,
/// Will be true if follower contained entry matching `prev_log_index` and `prev_log_term`.
pub success: bool,
/// A value used to implement the _conflicting term_ optimization outlined in §5.3.

/// The last matching log id on follower.
///
/// It is a successful append-entry iff `matched` is `Some()`.
pub matched: Option<LogId>,

/// The log id that is different from the leader on follower.
///
/// This value will only be present, and should only be considered, when `success` is `false`.
pub conflict_opt: Option<ConflictOpt>,
/// `conflict` is None if `matched` is `Some()`, because if there is a matching entry, all following inconsistent
/// entries will be deleted.
pub conflict: Option<LogId>,
}

/// A struct used to implement the _conflicting term_ optimization outlined in §5.3 for log replication.
///
/// This value will only be present, and should only be considered, when an `AppendEntriesResponse`
/// object has a `success` value of `false`.
///
/// This implementation of Raft uses this value to more quickly synchronize a leader with its
/// followers which may be some distance behind in replication, may have conflicting entries, or
/// which may be new to the cluster.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ConflictOpt {
/// The most recent entry which does not conflict with the received request.
pub log_id: LogId,
impl AppendEntriesResponse {
pub fn success(&self) -> bool {
self.matched.is_some()
}
}

/// A Raft log entry.
Expand Down Expand Up @@ -683,6 +680,7 @@ impl MembershipConfig {
pub struct VoteRequest {
/// The candidate's current term.
pub term: u64,

/// The candidate's ID.
pub candidate_id: u64,

Expand Down
23 changes: 7 additions & 16 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
break (prev_log_id, logs);
};

let last_log_id = if logs.is_empty() {
prev_log_id
} else {
logs[logs.len() - 1].log_id
};

// Build the heartbeat frame to be sent to the follower.
let payload = AppendEntriesRequest {
term: self.term,
Expand Down Expand Up @@ -350,13 +344,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
}
};

tracing::debug!(%last_log_id, "append_entries resp: {:?}", append_resp);
tracing::debug!("append_entries resp: {:?}", append_resp);

// Handle success conditions.
if append_resp.success {
self.matched = last_log_id;
if append_resp.success() {
self.matched = append_resp.matched.unwrap();
// TODO(xp): if matched does not change, do not bother the core.
self.update_max_possible_matched_index(last_log_id.index);
self.update_max_possible_matched_index(self.matched.index);
self.update_matched();

return Ok(());
Expand All @@ -375,21 +369,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
}

// Replication was not successful, handle conflict optimization record, else decrement `next_index`.
let conflict = append_resp.conflict_opt.unwrap();
let conflict = append_resp.conflict.unwrap();

tracing::debug!(
?conflict,
append_resp.term,
"append entries failed, handling conflict opt"
);

assert_eq!(
conflict.log_id, prev_log_id,
"if conflict, it is always the prev_log_id"
);
assert_eq!(conflict, prev_log_id, "if conflict, it is always the prev_log_id");

// Continue to find the matching log id on follower.
self.max_possible_matched_index = conflict.log_id.index - 1;
self.max_possible_matched_index = conflict.index - 1;
Ok(())
}

Expand Down
56 changes: 20 additions & 36 deletions async-raft/tests/append_conflicts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use anyhow::Result;
use async_raft::raft::AppendEntriesRequest;
use async_raft::raft::ConflictOpt;
use async_raft::raft::Entry;
use async_raft::Config;
use async_raft::LogId;
Expand Down Expand Up @@ -55,8 +54,8 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req.clone()).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

check_logs(&sto0, vec![0]).await?;

Expand All @@ -72,16 +71,16 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req.clone()).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?;

tracing::info!("--- case 0: prev_log_id.index == 0, last_log_id mismatch");

let resp = r0.append_entries(req.clone()).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?;

Expand All @@ -97,8 +96,8 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?;

Expand All @@ -114,8 +113,8 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

check_logs(&sto0, vec![0, 1, 1, 2]).await?;

Expand All @@ -129,13 +128,8 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req).await?;
assert!(!resp.success);
assert_eq!(
Some(ConflictOpt {
log_id: LogId { term: 1, index: 2000 }
}),
resp.conflict_opt
);
assert!(!resp.success());
assert_eq!(Some(LogId::new(1, 2000)), resp.conflict);

check_logs(&sto0, vec![0, 1, 1, 2]).await?;

Expand All @@ -150,14 +144,9 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req).await?;
assert!(!resp.success);
assert!(!resp.success());
// returns the id just before prev_log_id.index
assert_eq!(
Some(ConflictOpt {
log_id: LogId { term: 3, index: 3 }
}),
resp.conflict_opt
);
assert_eq!(Some(LogId::new(3, 3)), resp.conflict);

check_logs(&sto0, vec![0, 1, 1]).await?;

Expand All @@ -172,8 +161,8 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

// check prepared store
check_logs(&sto0, vec![0, 1, 1, 2, 2, 2]).await?;
Expand All @@ -188,8 +177,8 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

check_logs(&sto0, vec![0, 1, 1, 2, 3]).await?;

Expand All @@ -205,13 +194,8 @@ async fn append_conflicts() -> Result<()> {
};

let resp = r0.append_entries(req).await?;
assert!(!resp.success);
assert_eq!(
Some(ConflictOpt {
log_id: LogId { term: 1, index: 200 }
}),
resp.conflict_opt
);
assert!(!resp.success());
assert_eq!(Some(LogId::new(1, 200)), resp.conflict);

Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions async-raft/tests/append_updates_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ async fn append_updates_membership() -> Result<()> {
};

let resp = r0.append_entries(req.clone()).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

r0.wait(timeout()).members(btreeset! {1,2,3,4}, "append-entries update membership").await?;
}
Expand All @@ -88,8 +88,8 @@ async fn append_updates_membership() -> Result<()> {
};

let resp = r0.append_entries(req.clone()).await?;
assert!(resp.success);
assert_eq!(None, resp.conflict_opt);
assert!(resp.success());
assert_eq!(None, resp.conflict);

r0.wait(timeout()).members(btreeset! {1,2}, "deleting inconsistent lgos updates membership").await?;
}
Expand Down
4 changes: 2 additions & 2 deletions async-raft/tests/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ async fn compaction() -> Result<()> {
})
.await?;

assert!(res.success);
assert_eq!(None, res.conflict_opt);
assert!(res.success());
assert_eq!(None, res.conflict);
}

Ok(())
Expand Down
31 changes: 10 additions & 21 deletions async-raft/tests/conflict_with_empty_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use anyhow::Result;
use async_raft::raft::AppendEntriesRequest;
use async_raft::raft::ConflictOpt;
use async_raft::raft::Entry;
use async_raft::raft::EntryPayload;
use async_raft::Config;
Expand Down Expand Up @@ -57,15 +56,10 @@ async fn conflict_with_empty_entries() -> Result<()> {
};

let resp = router.send_append_entries(0, rpc).await?;
assert!(!resp.success);
assert!(resp.conflict_opt.is_some());
let c = resp.conflict_opt.unwrap();
assert_eq!(
ConflictOpt {
log_id: LogId { term: 1, index: 5 }
},
c
);
assert!(!resp.success());
assert!(resp.conflict.is_some());
let c = resp.conflict.unwrap();
assert_eq!(LogId { term: 1, index: 5 }, c);

// Feed 2 logs

Expand All @@ -91,8 +85,8 @@ async fn conflict_with_empty_entries() -> Result<()> {
};

let resp = router.send_append_entries(0, rpc).await?;
assert!(resp.success);
assert!(resp.conflict_opt.is_none());
assert!(resp.success());
assert!(resp.conflict.is_none());

// Expect a conflict with prev_log_index == 3

Expand All @@ -105,15 +99,10 @@ async fn conflict_with_empty_entries() -> Result<()> {
};

let resp = router.send_append_entries(0, rpc).await?;
assert!(!resp.success);
assert!(resp.conflict_opt.is_some());
let c = resp.conflict_opt.unwrap();
assert_eq!(
ConflictOpt {
log_id: LogId { term: 1, index: 3 }
},
c
);
assert!(!resp.success());
assert!(resp.conflict.is_some());
let c = resp.conflict.unwrap();
assert_eq!(LogId::new(1, 3), c);

Ok(())
}

0 comments on commit c61b4c4

Please sign in to comment.