Skip to content

Commit

Permalink
fix: consider joint config when starting up and committing.
Browse files Browse the repository at this point in the history
- Change: MembershipConfig support more than 2 configs

- Makes fields in MembershipConfig privates.
  Provides methods to manipulate membership.

- Fix: commit without replication only when membership contains only one
  node. Previously it just checks the first config, which results in
  data loss if the cluster is in a joint config.

- Fix: when starting up, count all nodes but not only the nodes in the
  first config to decide if it is a single node cluster.
  • Loading branch information
drmingdrmer committed Dec 25, 2021
1 parent 2320b8a commit 6c0ccaf
Show file tree
Hide file tree
Showing 26 changed files with 280 additions and 308 deletions.
2 changes: 1 addition & 1 deletion async-raft/Cargo.toml
Expand Up @@ -19,6 +19,7 @@ byte-unit = "4.0.12"
bytes = "1.0"
derive_more = { version="0.99.9" }
futures = "0.3"
maplit = "1.0.2"
rand = "0.8"
serde = { version="1", features=["derive"] }
structopt = "0.3"
Expand All @@ -29,7 +30,6 @@ tracing-futures = "0.2.4"

[dev-dependencies]
lazy_static = "1.4.0"
maplit = "1.0.2"
memstore = { version="0.2.0", path="../memstore" }
pretty_assertions = "1.0.0"
tracing-appender = "0.2.0"
Expand Down
23 changes: 8 additions & 15 deletions async-raft/src/core/admin.rs
Expand Up @@ -45,16 +45,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// membership config in memory only.
self.core.membership = EffectiveMembership {
log_id: LogId { term: 1, index: 1 },
membership: MembershipConfig {
members,
members_after_consensus: None,
},
membership: MembershipConfig::new_single(members),
};

// Become a candidate and start campaigning for leadership. If this node is the only node
// in the cluster, then become leader without holding an election. If members len == 1, we
// know it is our ID due to the above code where we ensure our own ID is present.
if self.core.membership.membership.members.len() == 1 {
if self.core.membership.membership.all_nodes().len() == 1 {
self.core.current_term += 1;
self.core.voted_for = Some(self.core.id);
self.core.set_target_state(State::Leader);
Expand Down Expand Up @@ -137,7 +134,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

let curr = &self.core.membership.membership;

if let Some(ref next_membership) = curr.members_after_consensus {
if let Some(next_membership) = curr.get_ith_config(1) {
// When it is in joint state, it is only allowed to change to the `members_after_consensus`
if &members != next_membership {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
Expand All @@ -148,19 +145,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
)));
return;
} else {
new_config = MembershipConfig {
members: next_membership.clone(),
members_after_consensus: None,
};
new_config = MembershipConfig::new_single(next_membership.clone());
}
} else {
// currently it is uniform config, enter joint state
new_config = MembershipConfig {
members: curr.members.clone(),
members_after_consensus: Some(members.clone()),
};
new_config = MembershipConfig::new_multi(vec![curr.get_ith_config(0).unwrap().clone(), members.clone()]);
}

tracing::debug!(?new_config, "new_config");

// Check the proposed config for any new nodes. If ALL new nodes already have replication
// streams AND are ready to join, then we can immediately proceed with entering joint
// consensus. Else, new nodes need to first be brought up-to-speed.
Expand All @@ -172,7 +165,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// TODO(xp): 111 test adding a node that is not non-voter.
// TODO(xp): 111 test adding a node that is lagging.
for new_node in members.difference(&self.core.membership.membership.members) {
for new_node in members.difference(&self.core.membership.membership.get_ith_config(0).unwrap()) {
match self.nodes.get(&new_node) {
// Node is ready to join.
Some(node) => {
Expand Down
46 changes: 21 additions & 25 deletions async-raft/src/core/client.rs
Expand Up @@ -94,32 +94,30 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
pub(super) async fn handle_client_read_request(&mut self, tx: RaftRespTx<(), ClientReadError>) {
// Setup sentinel values to track when we've received majority confirmation of leadership.
let mut c0_confirmed = 0usize;

let mems = &self.core.membership.membership;

// Will never be zero, as we don't allow it when proposing config changes.
let len_members = self.core.membership.membership.members.len();
let len_members = mems.get_ith_config(0).unwrap().len();

let c0_needed = quorum::majority_of(len_members);

let mut c1_confirmed = 0usize;
let mut c1_needed = 0usize;
if let Some(joint_members) = &self.core.membership.membership.members_after_consensus {

let second = mems.get_ith_config(1);

if let Some(joint_members) = second {
let len = joint_members.len(); // Will never be zero, as we don't allow it when proposing config changes.
c1_needed = quorum::majority_of(len);

if joint_members.contains(&self.core.id) {
c1_confirmed += 1;
}
}

// Increment confirmations for self, including post-joint-consensus config if applicable.
c0_confirmed += 1;
let is_in_post_join_consensus_config = self
.core
.membership
.membership
.members_after_consensus
.as_ref()
.map(|members| members.contains(&self.core.id))
.unwrap_or(false);

if is_in_post_join_consensus_config {
c1_confirmed += 1;
}

// If we already have all needed confirmations — which would be the case for single node
// clusters — then respond.
Expand Down Expand Up @@ -181,20 +179,18 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

// If the term is the same, then it means we are still the leader.
if self.core.membership.membership.members.contains(&target) {
if self.core.membership.membership.get_ith_config(0).unwrap().contains(&target) {
c0_confirmed += 1;
}
if self
.core
.membership
.membership
.members_after_consensus
.as_ref()
.map(|members| members.contains(&target))
.unwrap_or(false)
{
c1_confirmed += 1;

let second = self.core.membership.membership.get_ith_config(1);

if let Some(joint) = second {
if joint.contains(&target) {
c1_confirmed += 1;
}
}

if c0_confirmed >= c0_needed && c1_confirmed >= c1_needed {
let _ = tx.send(Ok(()));
return;
Expand Down
29 changes: 17 additions & 12 deletions async-raft/src/core/mod.rs
Expand Up @@ -9,6 +9,9 @@ pub(crate) mod replication;
mod replication_state_test;
mod vote;

#[cfg(test)]
mod startup_test;

use std::collections::BTreeMap;
use std::sync::Arc;

Expand Down Expand Up @@ -223,7 +226,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

let has_log = self.last_log_id.index != u64::MIN;
let single = self.membership.membership.members.len() == 1;
let single = self.membership.membership.all_nodes().len() == 1;
let is_voter = self.membership.membership.contains(&self.id);

self.target_state = match (has_log, single, is_voter) {
Expand Down Expand Up @@ -408,12 +411,14 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// transition to the non-voter state as a signal for when it is safe to shutdown a node
// being removed.
self.membership = cfg;
if !self.membership.membership.contains(&self.id) {
if self.membership.membership.contains(&self.id) {
if self.target_state == State::NonVoter {
// The node is a NonVoter and the new config has it configured as a normal member.
// Transition to follower.
self.set_target_state(State::Follower);
}
} else {
self.set_target_state(State::NonVoter);
} else if self.target_state == State::NonVoter && self.membership.membership.members.contains(&self.id) {
// The node is a NonVoter and the new config has it configured as a normal member.
// Transition to follower.
self.set_target_state(State::Follower);
}
Ok(())
}
Expand Down Expand Up @@ -695,13 +700,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.membership
.membership
.all_nodes()
.into_iter()
.filter(|elem| elem != &self.core.id)
.iter()
.filter(|elem| *elem != &self.core.id)
.collect::<Vec<_>>();

for target in targets {
let state = self.spawn_replication_stream(target, None);
self.nodes.insert(target, state);
let state = self.spawn_replication_stream(*target, None);
self.nodes.insert(*target, state);
}

// Setup state as leader.
Expand Down Expand Up @@ -859,8 +864,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// Setup initial state per term.
self.votes_granted_old = 1; // We must vote for ourselves per the Raft spec.
self.votes_needed_old = ((self.core.membership.membership.members.len() / 2) + 1) as u64; // Just need a majority.
if let Some(nodes) = &self.core.membership.membership.members_after_consensus {
self.votes_needed_old = ((self.core.membership.membership.get_ith_config(0).unwrap().len() / 2) + 1) as u64; // Just need a majority.
if let Some(nodes) = self.core.membership.membership.get_ith_config(1) {
self.votes_granted_new = 1; // We must vote for ourselves per the Raft spec.
self.votes_needed_new = ((nodes.len() / 2) + 1) as u64; // Just need a majority.
}
Expand Down
4 changes: 2 additions & 2 deletions async-raft/src/core/replication.rs
Expand Up @@ -167,12 +167,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

#[tracing::instrument(level = "trace", skip(self))]
fn calc_commit_index(&self) -> u64 {
let c0_index = self.calc_members_commit_index(&self.core.membership.membership.members, "c0");
let c0_index = self.calc_members_commit_index(self.core.membership.membership.get_ith_config(0).unwrap(), "c0");

// If we are in joint consensus, then calculate the new commit index of the new membership config nodes.
let mut c1_index = c0_index; // Defaults to just matching C0.

if let Some(members) = &self.core.membership.membership.members_after_consensus {
if let Some(members) = &self.core.membership.membership.get_ith_config(1) {
c1_index = self.calc_members_commit_index(members, "c1");
}

Expand Down
5 changes: 5 additions & 0 deletions async-raft/src/core/startup_test.rs
@@ -0,0 +1,5 @@
#[test]
fn test_raft_core_initial_state() -> anyhow::Result<()> {
// TODO(xp): test initial state decided by has_log, is single and is_voter
Ok(())
}
14 changes: 3 additions & 11 deletions async-raft/src/core/vote.rs
Expand Up @@ -143,19 +143,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// If peer granted vote, then update campaign state.
if res.vote_granted {
// Handle vote responses from the C0 config group.
if self.core.membership.membership.members.contains(&target) {
if self.core.membership.membership.is_in_ith_config(0, &target) {
self.votes_granted_old += 1;
}
// Handle vote responses from members of C1 config group.
if self
.core
.membership
.membership
.members_after_consensus
.as_ref()
.map(|members| members.contains(&target))
.unwrap_or(false)
{
if self.core.membership.membership.is_in_ith_config(1, &target) {
self.votes_granted_new += 1;
}
// If we've received enough votes from both config groups, then transition to leader state`.
Expand All @@ -173,7 +165,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// Spawn parallel vote requests to all cluster members.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) fn spawn_parallel_vote_requests(&self) -> mpsc::Receiver<(VoteResponse, NodeId)> {
let all_members = self.core.membership.membership.all_nodes();
let all_members = self.core.membership.membership.all_nodes().clone();
let (tx, rx) = mpsc::channel(all_members.len());
for member in all_members.into_iter().filter(|member| member != &self.core.id) {
let rpc = VoteRequest::new(
Expand Down
3 changes: 3 additions & 0 deletions async-raft/src/lib.rs
Expand Up @@ -16,6 +16,9 @@ pub mod storage;
mod storage_error;
mod summary;

#[cfg(test)]
mod membership_test;

pub use async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand Down
87 changes: 87 additions & 0 deletions async-raft/src/membership_test.rs
@@ -0,0 +1,87 @@
use maplit::btreeset;

use crate::raft::MembershipConfig;

#[test]
fn test_membership() -> anyhow::Result<()> {
let m1 = MembershipConfig::new_multi(vec![btreeset! {1}]);
let m123 = MembershipConfig::new_multi(vec![btreeset! {1,2,3}]);
let m123_345 = MembershipConfig::new_multi(vec![btreeset! {1,2,3}, btreeset! {3,4,5}]);

assert_eq!(Some(btreeset! {1}), m1.get_ith_config(0).cloned());
assert_eq!(Some(btreeset! {1,2,3}), m123.get_ith_config(0).cloned());
assert_eq!(Some(btreeset! {1,2,3}), m123_345.get_ith_config(0).cloned());

assert_eq!(None, m1.get_ith_config(1).cloned());
assert_eq!(None, m123.get_ith_config(1).cloned());
assert_eq!(Some(btreeset! {3,4,5}), m123_345.get_ith_config(1).cloned());

assert!(m1.is_in_ith_config(0, &1));
assert!(!m1.is_in_ith_config(0, &2));
assert!(!m1.is_in_ith_config(1, &1));
assert!(!m1.is_in_ith_config(1, &2));

assert!(m123.is_in_ith_config(0, &1));
assert!(m123.is_in_ith_config(0, &2));
assert!(!m123.is_in_ith_config(1, &1));
assert!(!m123.is_in_ith_config(1, &2));

assert!(m123_345.is_in_ith_config(0, &1));
assert!(m123_345.is_in_ith_config(0, &2));
assert!(!m123_345.is_in_ith_config(1, &1));
assert!(m123_345.is_in_ith_config(1, &4));

assert_eq!(vec![1], m1.ith_config(0));
assert_eq!(vec![1, 2, 3], m123.ith_config(0));
assert_eq!(vec![1, 2, 3], m123_345.ith_config(0));
assert_eq!(vec![3, 4, 5], m123_345.ith_config(1));

assert_eq!(&btreeset! {1}, m1.all_nodes());
assert_eq!(&btreeset! {1,2,3}, m123.all_nodes());
assert_eq!(&btreeset! {1,2,3,4,5}, m123_345.all_nodes());

assert!(!m1.contains(&0));
assert!(m1.contains(&1));
assert!(m123_345.contains(&4));
assert!(!m123_345.contains(&6));

assert!(!m123.is_in_joint_consensus());
assert!(m123_345.is_in_joint_consensus());

assert_eq!(
MembershipConfig::new_single(btreeset! {3,4,5}),
m123_345.to_final_config()
);

Ok(())
}

#[test]
fn test_membership_update() -> anyhow::Result<()> {
// --- replace

let mut m123 = MembershipConfig::new_single(btreeset! {1,2,3});
m123.replace(vec![btreeset! {2,3}, btreeset! {3,4}]);

assert_eq!(&btreeset! {2,3,4}, m123.all_nodes());
assert_eq!(&vec![btreeset! {2,3}, btreeset! {3,4}], m123.get_configs());

// --- push

m123.push(btreeset! {3,5});

assert_eq!(&btreeset! {2,3,4,5}, m123.all_nodes());
assert_eq!(
&vec![btreeset! {2,3}, btreeset! {3,4}, btreeset! {3,5}],
m123.get_configs()
);

// --- to final

let got = m123.to_final_config();

assert_eq!(&btreeset! {3,5}, got.all_nodes());
assert_eq!(&vec![btreeset! {3,5}], got.get_configs());

Ok(())
}
10 changes: 3 additions & 7 deletions async-raft/src/metrics.rs
Expand Up @@ -190,7 +190,7 @@ impl Wait {
#[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn members(&self, want_members: BTreeSet<NodeId>, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
self.metrics(
|x| x.membership_config.membership.members == want_members,
|x| x.membership_config.membership.get_ith_config(0).cloned().unwrap() == want_members,
&format!("{} .membership_config.members -> {:?}", msg.to_string(), want_members),
)
.await
Expand All @@ -204,12 +204,8 @@ impl Wait {
msg: impl ToString,
) -> Result<RaftMetrics, WaitError> {
self.metrics(
|x| x.membership_config.membership.members_after_consensus == want_members,
&format!(
"{} .membership_config.members_after_consensus -> {:?}",
msg.to_string(),
want_members
),
|x| x.membership_config.membership.get_ith_config(1) == want_members.as_ref(),
&format!("{} .membership_config.next -> {:?}", msg.to_string(), want_members),
)
.await
}
Expand Down

0 comments on commit 6c0ccaf

Please sign in to comment.