Skip to content

Commit

Permalink
fix: when calc quorum, the non-voter should be count
Browse files Browse the repository at this point in the history
Counting only the follower(nodes) as quorum for new config(c1) results
in unexpected log commit.
E.g.: change from 012 to 234, when 3 and 4 are unreachable, the first
log of joint should not be committed.
  • Loading branch information
drmingdrmer committed Jun 16, 2021
1 parent cbf9f23 commit d882e74
Showing 1 changed file with 43 additions and 18 deletions.
61 changes: 43 additions & 18 deletions async-raft/src/core/replication.rs
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use tokio::sync::oneshot;

use crate::config::SnapshotPolicy;
Expand Down Expand Up @@ -173,35 +175,25 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

// Determine the new commit index of the current membership config nodes.
let mut indices_c0 = self
.nodes
.iter()
.filter(|(id, _)| self.core.membership.members.contains(id))
.map(|(_, node)| (node.match_index, node.match_term))
.collect::<Vec<_>>();
if !self.is_stepping_down {
indices_c0.push((self.core.last_log_index, self.core.last_log_term));
}
let indices_c0 = self.get_match_indexes(&self.core.membership.members);
tracing::debug!("indices_c0: {:?}", indices_c0);

let commit_index_c0 =
calculate_new_commit_index(indices_c0, self.core.commit_index, self.core.current_term);

tracing::debug!("commit_index_c0: {}", commit_index_c0);

tracing::debug!("c1: {:?}", self.core.membership.members_after_consensus);
tracing::debug!("nodes: {:?}", self.nodes.keys().collect::<Vec<_>>());
tracing::debug!(
"follower nodes: {:?}",
self.nodes.keys().collect::<Vec<_>>()
);

// If we are in joint consensus, then calculate the new commit index of the new membership config nodes.
let mut commit_index_c1 = commit_index_c0; // Defaults to just matching C0.
if let Some(members) = &self.core.membership.members_after_consensus {
let indices_c1 = self
.nodes
.iter()
.filter(|(id, _)| members.contains(id))
.map(|(_, node)| (node.match_index, node.match_term))
.collect();

let indices_c1 = self.get_match_indexes(members);
tracing::debug!("indices_c1: {:?}", indices_c1);

commit_index_c1 = calculate_new_commit_index(
indices_c1,
self.core.commit_index,
Expand Down Expand Up @@ -252,6 +244,39 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
Ok(())
}

/// Extract the matching index/term of the replication state of specified nodes.
fn get_match_indexes(&self, node_ids: &HashSet<NodeId>) -> Vec<(u64, u64)> {
tracing::debug!("to get match indexes of nodes: {:?}", node_ids);

let mut rst = Vec::with_capacity(node_ids.len());
for id in node_ids.iter() {
// this node is me, the leader
if *id == self.core.id {
// TODO: can it be sure that self.core.last_log_term is the term of this leader?
rst.push((self.core.last_log_index, self.core.last_log_term));
continue;
}

// this node is a follower
let repl_state = self.nodes.get(id);
if let Some(x) = repl_state {
rst.push((x.match_index, x.match_term));
continue;
}

// this node is a non-voter
let repl_state = self.non_voters.get(id);
if let Some(x) = repl_state {
rst.push((x.state.match_index, x.state.match_term));
continue;
}
panic!("node {} not found in nodes or non-voters", id);
}

tracing::debug!("match indexes of nodes: {:?}: {:?}", node_ids, rst);
rst
}

/// Handle events from replication streams requesting for snapshot info.
#[tracing::instrument(level = "trace", skip(self, tx))]
async fn handle_needs_snapshot(
Expand Down

0 comments on commit d882e74

Please sign in to comment.