Skip to content

Commit

Permalink
change: rename MembershipConfig to Membership
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Dec 29, 2021
1 parent cc26134 commit 3511e43
Show file tree
Hide file tree
Showing 25 changed files with 147 additions and 175 deletions.
30 changes: 15 additions & 15 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::error::InitializeError;
use crate::raft::AddNonVoterResponse;
use crate::raft::ClientWriteRequest;
use crate::raft::ClientWriteResponse;
use crate::raft::MembershipConfig;
use crate::raft::Membership;
use crate::raft::RaftRespTx;
use crate::AppData;
use crate::AppDataResponse;
Expand Down Expand Up @@ -43,15 +43,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// Build a new membership config from given init data & assign it as the new cluster
// membership config in memory only.
self.core.membership = EffectiveMembership {
self.core.effective_membership = EffectiveMembership {
log_id: LogId { term: 1, index: 1 },
membership: MembershipConfig::new_single(members),
membership: Membership::new_single(members),
};

// Become a candidate and start campaigning for leadership. If this node is the only node
// in the cluster, then become leader without holding an election. If members len == 1, we
// know it is our ID due to the above code where we ensure our own ID is present.
if self.core.membership.membership.all_nodes().len() == 1 {
if self.core.effective_membership.membership.all_nodes().len() == 1 {
self.core.current_term += 1;
self.core.voted_for = Some(self.core.id);
self.core.set_target_state(State::Leader);
Expand Down Expand Up @@ -121,18 +121,18 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// The last membership config is not committed yet.
// Can not process the next one.
if self.core.commit_index < self.core.membership.log_id.index {
if self.core.commit_index < self.core.effective_membership.log_id.index {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::InProgress {
membership_log_id: self.core.membership.log_id,
membership_log_id: self.core.effective_membership.log_id,
},
)));
return;
}

let new_config;

let curr = &self.core.membership.membership;
let curr = &self.core.effective_membership.membership;

if let Some(next_membership) = curr.get_ith_config(1) {
// When it is in joint state, it is only allowed to change to the `members_after_consensus`
Expand All @@ -145,11 +145,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
)));
return;
} else {
new_config = MembershipConfig::new_single(next_membership.clone());
new_config = Membership::new_single(next_membership.clone());
}
} else {
// currently it is uniform config, enter joint state
new_config = MembershipConfig::new_multi(vec![curr.get_ith_config(0).unwrap().clone(), members.clone()]);
new_config = Membership::new_multi(vec![curr.get_ith_config(0).unwrap().clone(), members.clone()]);
}

tracing::debug!(?new_config, "new_config");
Expand All @@ -165,7 +165,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// TODO(xp): 111 test adding a node that is not non-voter.
// TODO(xp): 111 test adding a node that is lagging.
for new_node in members.difference(&self.core.membership.membership.get_ith_config(0).unwrap()) {
for new_node in members.difference(&self.core.effective_membership.membership.get_ith_config(0).unwrap()) {
match self.nodes.get(&new_node) {
// Node is ready to join.
Some(node) => {
Expand Down Expand Up @@ -207,14 +207,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "debug", skip(self, resp_tx), fields(id=self.core.id))]
pub async fn append_membership_log(
&mut self,
mem: MembershipConfig,
mem: Membership,
resp_tx: Option<RaftRespTx<ClientWriteResponse<R>, ClientWriteError>>,
) -> Result<(), RaftError> {
let payload = ClientWriteRequest::<D>::new_config(mem.clone());
let res = self.append_payload_to_log(payload.entry).await;

// Caveat: membership must be updated before commit check is done with the new config.
self.core.membership = EffectiveMembership {
self.core.effective_membership = EffectiveMembership {
log_id: self.core.last_log_id,
membership: mem,
};
Expand Down Expand Up @@ -253,7 +253,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let index = log_id.index;

// Step down if needed.
if !self.core.membership.membership.contains(&self.core.id) {
if !self.core.effective_membership.membership.contains(&self.core.id) {
tracing::debug!("raft node is stepping down");

// TODO(xp): transfer leadership
Expand All @@ -262,7 +262,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return;
}

let membership = &self.core.membership.membership;
let membership = &self.core.effective_membership.membership;

let all = membership.all_nodes();
for (id, state) in self.nodes.iter_mut() {
Expand All @@ -274,7 +274,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
"set remove_after_commit for {} = {}, membership: {:?}",
id,
index,
self.core.membership
self.core.effective_membership
);

state.remove_since = Some(index)
Expand Down
12 changes: 6 additions & 6 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let last_index = self.core.last_log_id.index;

let req: ClientWriteRequest<D> = if last_index == 0 {
ClientWriteRequest::new_config(self.core.membership.membership.clone())
ClientWriteRequest::new_config(self.core.effective_membership.membership.clone())
} else {
ClientWriteRequest::new_blank_payload()
};
Expand Down Expand Up @@ -95,7 +95,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Setup sentinel values to track when we've received majority confirmation of leadership.
let mut c0_confirmed = 0usize;

let mems = &self.core.membership.membership;
let mems = &self.core.effective_membership.membership;

// Will never be zero, as we don't allow it when proposing config changes.
let len_members = mems.get_ith_config(0).unwrap().len();
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// Spawn parallel requests, all with the standard timeout for heartbeats.
let mut pending = FuturesUnordered::new();
let all_members = self.core.membership.membership.all_nodes();
let all_members = self.core.effective_membership.membership.all_nodes();
for (id, node) in self.nodes.iter() {
if !all_members.contains(id) {
continue;
Expand Down Expand Up @@ -179,11 +179,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

// If the term is the same, then it means we are still the leader.
if self.core.membership.membership.get_ith_config(0).unwrap().contains(&target) {
if self.core.effective_membership.membership.get_ith_config(0).unwrap().contains(&target) {
c0_confirmed += 1;
}

let second = self.core.membership.membership.get_ith_config(1);
let second = self.core.effective_membership.membership.get_ith_config(1);

if let Some(joint) = second {
if joint.contains(&target) {
Expand Down Expand Up @@ -260,7 +260,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// TODO(xp): calculate nodes set that need to replicate to, when updating membership
// TODO(xp): Or add to-non-voter replication into self.nodes.

let all_members = self.core.membership.membership.all_nodes();
let all_members = self.core.effective_membership.membership.all_nodes();

let nodes = self.nodes.keys().collect::<Vec<_>>();
tracing::debug!(?nodes, ?all_members, "replicate_client_request");
Expand Down
49 changes: 23 additions & 26 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::raft::ClientWriteRequest;
use crate::raft::ClientWriteResponse;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::MembershipConfig;
use crate::raft::Membership;
use crate::raft::RaftMsg;
use crate::raft::RaftRespTx;
use crate::replication::RaftEvent;
Expand Down Expand Up @@ -80,18 +80,19 @@ pub struct EffectiveMembership {
/// The id of the log that applies this membership config
pub log_id: LogId,

pub membership: MembershipConfig,
pub membership: Membership,
}

/// The core type implementing the Raft protocol.
pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
/// This node's ID.
id: NodeId,

/// This node's runtime config.
config: Arc<Config>,

/// The cluster's current membership configuration.
membership: EffectiveMembership,
effective_membership: EffectiveMembership,

/// The `RaftNetwork` implementation.
network: Arc<N>,
Expand All @@ -102,23 +103,14 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
/// The target state of the system.
target_state: State,

/// The index of the highest log entry known to be committed cluster-wide.
///
/// The definition of a committed log is that the leader which has created the log has
/// successfully replicated the log to a majority of the cluster. This value is updated via
/// AppendEntries RPC from the leader, or if a node is the leader, it will update this value
/// as new entries have been successfully replicated to a majority of the cluster.
/// The index of the last known committed entry.
///
/// Is initialized to 0, and increases monotonically. This is always based on the leader's
/// commit index which is communicated to other members via the AppendEntries protocol.
/// I.e.:
/// - a log that is replicated to a quorum of the cluster and it is of the term of the leader.
/// - A quorum could be a joint quorum.
commit_index: u64,

/// The log id of the highest log entry which has been applied to the local state machine.
///
/// Is initialized to 0,0 for a pristine node; else, for nodes with existing state it is
/// is initialized to the value returned from the `RaftStorage::get_initial_state` on startup.
/// This value increases following the `commit_index` as logs are applied to the state
/// machine (via the storage interface).
last_applied: LogId,

/// The current term.
Expand All @@ -127,8 +119,10 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
/// the leader's term which is communicated to other members via the AppendEntries protocol,
/// but this may also be incremented when a follower becomes a candidate.
current_term: u64,

/// The ID of the current leader of the Raft cluster.
current_leader: Option<NodeId>,

/// The ID of the candidate which received this node's vote for the current term.
///
/// Each server will vote for at most one candidate in a given term, on a
Expand All @@ -152,14 +146,17 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt

/// The last time a heartbeat was received.
last_heartbeat: Option<Instant>,

/// The duration until the next election timeout.
next_election_timeout: Option<Instant>,

tx_compaction: mpsc::Sender<SnapshotUpdate>,
rx_compaction: mpsc::Receiver<SnapshotUpdate>,

rx_api: mpsc::UnboundedReceiver<(RaftMsg<D, R>, Span)>,

tx_metrics: watch::Sender<RaftMetrics>,

rx_shutdown: oneshot::Receiver<()>,
}

Expand All @@ -173,12 +170,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
tx_metrics: watch::Sender<RaftMetrics>,
rx_shutdown: oneshot::Receiver<()>,
) -> JoinHandle<RaftResult<()>> {
let membership = MembershipConfig::new_initial(id); // This is updated from storage in the main loop.
let membership = Membership::new_initial(id); // This is updated from storage in the main loop.
let (tx_compaction, rx_compaction) = mpsc::channel(1);
let this = Self {
id,
config,
membership: EffectiveMembership {
effective_membership: EffectiveMembership {
log_id: LogId::default(),
membership,
},
Expand Down Expand Up @@ -214,7 +211,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.last_log_id = state.last_log_id;
self.current_term = state.hard_state.current_term;
self.voted_for = state.hard_state.voted_for;
self.membership = state.last_membership.clone();
self.effective_membership = state.last_membership.clone();
self.last_applied = state.last_applied;
// NOTE: this is repeated here for clarity. It is unsafe to initialize the node's commit
// index to any other value. The commit index must be determined by a leader after
Expand All @@ -228,8 +225,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

let has_log = self.last_log_id.index != u64::MIN;
let single = self.membership.membership.all_nodes().len() == 1;
let is_voter = self.membership.membership.contains(&self.id);
let single = self.effective_membership.membership.all_nodes().len() == 1;
let is_voter = self.effective_membership.membership.contains(&self.id);

self.target_state = match (has_log, single, is_voter) {
// A restarted raft that already received some logs but was not yet added to a cluster.
Expand Down Expand Up @@ -298,7 +295,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_applied: self.last_applied.index,
current_leader: self.current_leader,
// TODO(xp): 111 metrics should also track the membership log id
membership_config: self.membership.clone(),
membership_config: self.effective_membership.clone(),
snapshot: self.snapshot_last_log_id,
leader_metrics,
});
Expand All @@ -321,7 +318,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Update core's target state, ensuring all invariants are upheld.
#[tracing::instrument(level = "debug", skip(self), fields(id=self.id))]
fn set_target_state(&mut self, target_state: State) {
if target_state == State::Follower && !self.membership.membership.contains(&self.id) {
if target_state == State::Follower && !self.effective_membership.membership.contains(&self.id) {
self.target_state = State::NonVoter;
} else {
self.target_state = target_state;
Expand Down Expand Up @@ -412,8 +409,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// - the node has been removed from the cluster. The parent application can observe the
// transition to the non-voter state as a signal for when it is safe to shutdown a node
// being removed.
self.membership = cfg;
if self.membership.membership.contains(&self.id) {
self.effective_membership = cfg;
if self.effective_membership.membership.contains(&self.id) {
if self.target_state == State::NonVoter {
// The node is a NonVoter and the new config has it configured as a normal member.
// Transition to follower.
Expand Down Expand Up @@ -699,7 +696,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Spawn replication streams.
let targets = self
.core
.membership
.effective_membership
.membership
.all_nodes()
.iter()
Expand Down

0 comments on commit 3511e43

Please sign in to comment.