Skip to content

Commit

Permalink
change: change-membership should be log driven but not channel driven
Browse files Browse the repository at this point in the history
A membership change involves two steps: the joint config phase and the
final config phase.
Each phase has a corresponding log invovled.

Previously the raft setup several channel to organize this workflow,
which makes the logic hard to understand and introduces complexity when
restarting or leadership transfered: it needs to re-establish the channels and tasks.

According to the gist of raft, all workflow should be log driven.
Thus the new approach:
- Write two log(the joint and the final) at once it recevies a
  change-membership request.
- All following job is done according to just what log is committed.

This simplifies the workflow and makes it more reliable and intuitive to
understand.

Related changes:

- When `change_membership` is called, append 2 logs at once.

- Introduce universal response channel type to send back a message when
  some internal task is done: `ResponseTx`, and a universal response
  error type: `ResponseError`.

- Internal response channel is now an `Option<ResponseTx>`, since the
  first step of membership change does not need to respond to the
  caller.

- When a new leaser established, if the **last** log is a joint config
  log, append a final config log to let the partial change-membership be
  able to complete.

  And the test is added.

- Removed membership related channels.

- Refactor: convert several func from async to sync.
  • Loading branch information
drmingdrmer committed Aug 18, 2021
1 parent beb0302 commit 6350514
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 153 deletions.
118 changes: 56 additions & 62 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::collections::BTreeSet;
use std::collections::HashSet;

use futures::future::FutureExt;
use futures::future::TryFutureExt;
use tokio::sync::oneshot;

use crate::core::client::ClientRequestEntry;
use crate::core::ConsensusState;
use crate::core::LeaderState;
Expand All @@ -14,14 +10,14 @@ use crate::core::State;
use crate::core::UpdateCurrentLeader;
use crate::error::ChangeConfigError;
use crate::error::InitializeError;
use crate::error::RaftError;
use crate::raft::ChangeMembershipTx;
use crate::raft::ClientWriteRequest;
use crate::raft::MembershipConfig;
use crate::raft::ResponseTx;
use crate::replication::RaftEvent;
use crate::AppData;
use crate::AppDataResponse;
use crate::NodeId;
use crate::RaftError;
use crate::RaftNetwork;
use crate::RaftStorage;

Expand Down Expand Up @@ -69,7 +65,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// Add a new node to the cluster as a non-voter, bringing it up-to-speed, and then responding
/// on the given channel.
#[tracing::instrument(level = "trace", skip(self, tx))]
pub(super) fn add_member(&mut self, target: NodeId, tx: oneshot::Sender<Result<(), ChangeConfigError>>) {
pub(super) fn add_member(&mut self, target: NodeId, tx: ResponseTx) {
// Ensure the node doesn't already exist in the current config, in the set of new nodes
// alreading being synced, or in the nodes being removed.
if self.core.membership.members.contains(&target)
Expand All @@ -83,7 +79,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
|| self.non_voters.contains_key(&target)
{
tracing::debug!("target node is already a cluster member or is being synced");
let _ = tx.send(Err(ChangeConfigError::Noop));
let _ = tx.send(Err(ChangeConfigError::Noop.into()));
return;
}

Expand All @@ -98,18 +94,18 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

#[tracing::instrument(level = "trace", skip(self, tx))]
pub(super) async fn change_membership(&mut self, members: BTreeSet<NodeId>, tx: ChangeMembershipTx) {
pub(super) async fn change_membership(&mut self, members: BTreeSet<NodeId>, tx: ResponseTx) {
// Ensure cluster will have at least one node.
if members.is_empty() {
let _ = tx.send(Err(ChangeConfigError::InoperableConfig));
let _ = tx.send(Err(ChangeConfigError::InoperableConfig.into()));
return;
}

// Only allow config updates when currently in a uniform consensus state.
match &self.consensus_state {
ConsensusState::Uniform => (),
ConsensusState::NonVoterSync { .. } | ConsensusState::Joint { .. } => {
let _ = tx.send(Err(ChangeConfigError::ConfigChangeInProgress));
let _ = tx.send(Err(ChangeConfigError::ConfigChangeInProgress.into()));
return;
}
}
Expand Down Expand Up @@ -153,54 +149,67 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.is_stepping_down = true;
}
self.consensus_state = ConsensusState::Joint { is_committed: false };
self.core.membership.members_after_consensus = Some(members);
self.core.membership.members_after_consensus = Some(members.clone());

// Create final_config first, the joint config may be committed at once if the cluster has only 1 node
// and changes core.membership.
let final_config = MembershipConfig {
members: members.clone(),
members_after_consensus: None,
};

let joint_config = self.core.membership.clone();

let res = self.append_membership_log(joint_config, None).await;
if let Err(e) = res {
tracing::error!("append joint log error: {:?}", e);
}

// Propagate the command as any other client request.
let payload = ClientWriteRequest::<D>::new_config(self.core.membership.clone());
let (tx_joint, rx_join) = oneshot::channel();
let entry = match self.append_payload_to_log(payload.entry).await {
let res = self.append_membership_log(final_config, Some(tx)).await;
if let Err(e) = res {
tracing::error!("append final log error: {:?}", e);
}
}

#[tracing::instrument(level = "trace", skip(self, resp_tx), fields(id=self.core.id))]
pub async fn append_membership_log(
&mut self,
mem: MembershipConfig,
resp_tx: Option<ResponseTx>,
) -> Result<(), RaftError> {
let payload = ClientWriteRequest::<D>::new_config(mem);
let res = self.append_payload_to_log(payload.entry).await;
let entry = match res {
Ok(entry) => entry,
Err(err) => {
let _ = tx.send(Err(err.into()));
return;
let err_str = err.to_string();
if let Some(tx) = resp_tx {
let send_res = tx.send(Err(err.into()));
if let Err(e) = send_res {
tracing::error!("send response res error: {:?}", e);
}
}
return Err(RaftError::RaftStorage(anyhow::anyhow!(err_str)));
}
};
let cr_entry = ClientRequestEntry::from_entry(entry, tx_joint);

let cr_entry = ClientRequestEntry::from_entry(entry, resp_tx);
self.replicate_client_request(cr_entry).await;
self.leader_report_metrics();

// Setup channels for eventual response to the 2-phase config change.
let (tx_cfg_change, rx_cfg_change) = oneshot::channel();
self.propose_config_change_cb = Some(tx_cfg_change); // Once the entire process is done, this is our response channel.
self.joint_consensus_cb.push(rx_join); // Receiver for when the joint consensus is committed.
tokio::spawn(async move {
let res = rx_cfg_change
.map_err(|_| RaftError::ShuttingDown)
.into_future()
.then(|res| {
futures::future::ready(match res {
Ok(Ok(_)) => Ok(()),
Ok(Err(err)) => Err(ChangeConfigError::from(err)),
Err(err) => Err(ChangeConfigError::from(err)),
})
})
.await;
let _ = tx.send(res);
});
Ok(())
}

/// Handle the commitment of a joint consensus cluster configuration.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn handle_joint_consensus_committed(&mut self) -> Result<(), RaftError> {
pub(super) fn handle_joint_consensus_committed(&mut self) {
if let ConsensusState::Joint { is_committed, .. } = &mut self.consensus_state {
*is_committed = true; // Mark as committed.
}
// Only proceed to finalize this joint consensus if there are no remaining nodes being synced.
if self.consensus_state.is_joint_consensus_safe_to_finalize() {
self.update_replication_state().await?;
self.finalize_joint_consensus().await?;
self.update_replication_state();
self.finalize_joint_consensus();
}
Ok(())
}

/// When the joint membership is committed(not the uniform membership),
Expand All @@ -211,7 +220,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// - When a leader is established it adds all node_id found in `membership` to `nodes`.
/// - When membership change is committed, i.e., a joint membership or a uniform membership.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn update_replication_state(&mut self) -> Result<(), RaftError> {
pub(super) fn update_replication_state(&mut self) {
tracing::debug!("update_replication_state");

let new_node_ids = self
Expand Down Expand Up @@ -248,17 +257,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let non_voter_state = self.non_voters.remove(node_id).unwrap();
self.nodes.insert(*node_id, non_voter_state.state);
}

Ok(())
}

/// Finalize the committed joint consensus.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn finalize_joint_consensus(&mut self) -> Result<(), RaftError> {
pub(super) fn finalize_joint_consensus(&mut self) {
// Only proceed if it is safe to do so.
if !self.consensus_state.is_joint_consensus_safe_to_finalize() {
tracing::error!("attempted to finalize joint consensus when it was not safe to do so");
return Ok(());
return;
}

// Cut the cluster config over to the new membership config.
Expand All @@ -276,29 +283,17 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// and the old nodes may not revert to non-voter state using the above mechanism. That is fine.
// The Raft spec accounts for this using the 3rd safety measure of cluster configuration changes
// described at the very end of §6. This measure is already implemented and in place.

// Propagate the next command as any other client request.
let payload = ClientWriteRequest::<D>::new_config(self.core.membership.clone());
let (tx_uniform, rx_uniform) = oneshot::channel();
let entry = self.append_payload_to_log(payload.entry).await?;
let cr_entry = ClientRequestEntry::from_entry(entry, tx_uniform);
self.replicate_client_request(cr_entry).await;
self.leader_report_metrics();

// Setup channel for eventual commitment of the uniform consensus config.
self.uniform_consensus_cb.push(rx_uniform); // Receiver for when the uniform consensus is committed.
Ok(())
}

/// Handle the commitment of a uniform consensus cluster configuration.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn handle_uniform_consensus_committed(&mut self, index: u64) -> Result<(), RaftError> {
pub(super) fn handle_uniform_consensus_committed(&mut self, index: u64) {
// Step down if needed.
if self.is_stepping_down {
tracing::debug!("raft node is stepping down");
self.core.set_target_state(State::NonVoter);
self.core.update_current_leader(UpdateCurrentLeader::Unknown);
return Ok(());
return;
}

// Remove any replication streams which have replicated this config & which are no longer
Expand Down Expand Up @@ -336,6 +331,5 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
self.leader_report_metrics();
Ok(())
}
}
Loading

0 comments on commit 6350514

Please sign in to comment.