Skip to content

Commit

Permalink
refactor: rename vars to underscore style;
Browse files Browse the repository at this point in the history
Use simpler tracing macro syntax,  to reduce false syntax error in
clion.

Refine rustfmt config, use longer line: 120 chars;
  • Loading branch information
drmingdrmer committed Jun 28, 2021
1 parent e3a6a75 commit 4d1a03c
Show file tree
Hide file tree
Showing 36 changed files with 438 additions and 1,198 deletions.
2 changes: 1 addition & 1 deletion async-raft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ rand = "0.8"
serde = { version="1", features=["derive"] }
thiserror = "1.0.20"
tokio = { version="1.7", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = "0.1"
tracing = "0.1.26"
tracing-futures = "0.2.4"

[dev-dependencies]
Expand Down
29 changes: 7 additions & 22 deletions async-raft/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,35 +201,23 @@ impl ConfigBuilder {
/// Validate the state of this builder and produce a new `Config` instance if valid.
pub fn validate(self) -> Result<Config, ConfigError> {
// Roll a random election time out based on the configured min & max or their respective defaults.
let election_timeout_min = self
.election_timeout_min
.unwrap_or(DEFAULT_ELECTION_TIMEOUT_MIN);
let election_timeout_max = self
.election_timeout_max
.unwrap_or(DEFAULT_ELECTION_TIMEOUT_MAX);
let election_timeout_min = self.election_timeout_min.unwrap_or(DEFAULT_ELECTION_TIMEOUT_MIN);
let election_timeout_max = self.election_timeout_max.unwrap_or(DEFAULT_ELECTION_TIMEOUT_MAX);
if election_timeout_min >= election_timeout_max {
return Err(ConfigError::InvalidElectionTimeoutMinMax);
}
// Get other values or their defaults.
let heartbeat_interval = self
.heartbeat_interval
.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL);
let heartbeat_interval = self.heartbeat_interval.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL);
if election_timeout_min <= heartbeat_interval {
return Err(ConfigError::InvalidElectionTimeoutMinMax);
}
let max_payload_entries = self
.max_payload_entries
.unwrap_or(DEFAULT_MAX_PAYLOAD_ENTRIES);
let max_payload_entries = self.max_payload_entries.unwrap_or(DEFAULT_MAX_PAYLOAD_ENTRIES);
if max_payload_entries == 0 {
return Err(ConfigError::MaxPayloadEntriesTooSmall);
}
let replication_lag_threshold = self
.replication_lag_threshold
.unwrap_or(DEFAULT_REPLICATION_LAG_THRESHOLD);
let replication_lag_threshold = self.replication_lag_threshold.unwrap_or(DEFAULT_REPLICATION_LAG_THRESHOLD);
let snapshot_policy = self.snapshot_policy.unwrap_or_else(SnapshotPolicy::default);
let snapshot_max_chunk_size = self
.snapshot_max_chunk_size
.unwrap_or(DEFAULT_SNAPSHOT_CHUNKSIZE);
let snapshot_max_chunk_size = self.snapshot_max_chunk_size.unwrap_or(DEFAULT_SNAPSHOT_CHUNKSIZE);
Ok(Config {
cluster_name: self.cluster_name,
election_timeout_min,
Expand Down Expand Up @@ -287,10 +275,7 @@ mod tests {

#[test]
fn test_invalid_election_timeout_config_produces_expected_error() {
let res = Config::build("cluster0".into())
.election_timeout_min(1000)
.election_timeout_max(700)
.validate();
let res = Config::build("cluster0".into()).election_timeout_min(1000).election_timeout_max(700).validate();
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err, ConfigError::InvalidElectionTimeoutMinMax);
Expand Down
45 changes: 10 additions & 35 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftStorage;

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>>
NonVoterState<'a, D, R, N, S>
{
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> NonVoterState<'a, D, R, N, S> {
/// Handle the admin `init_with_config` command.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn handle_init_with_config(
Expand Down Expand Up @@ -66,17 +64,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>>
LeaderState<'a, D, R, N, S>
{
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
/// Add a new node to the cluster as a non-voter, bringing it up-to-speed, and then responding
/// on the given channel.
#[tracing::instrument(level = "trace", skip(self, tx))]
pub(super) fn add_member(
&mut self,
target: NodeId,
tx: oneshot::Sender<Result<(), ChangeConfigError>>,
) {
pub(super) fn add_member(&mut self, target: NodeId, tx: oneshot::Sender<Result<(), ChangeConfigError>>) {
// Ensure the node doesn't already exist in the current config, in the set of new nodes
// alreading being synced, or in the nodes being removed.
if self.core.membership.members.contains(&target)
Expand Down Expand Up @@ -105,11 +97,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

#[tracing::instrument(level = "trace", skip(self, tx))]
pub(super) async fn change_membership(
&mut self,
members: HashSet<NodeId>,
tx: ChangeMembershipTx,
) {
pub(super) async fn change_membership(&mut self, members: HashSet<NodeId>, tx: ChangeMembershipTx) {
// Ensure cluster will have at least one node.
if members.is_empty() {
let _ = tx.send(Err(ChangeConfigError::InoperableConfig));
Expand Down Expand Up @@ -155,21 +143,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// If there are new nodes which need to sync, then we need to wait until they are synced.
// Once they've finished, this routine will be called again to progress further.
if !awaiting.is_empty() {
self.consensus_state = ConsensusState::NonVoterSync {
awaiting,
members,
tx,
};
self.consensus_state = ConsensusState::NonVoterSync { awaiting, members, tx };
return;
}

// Enter into joint consensus if we are not awaiting any new nodes.
if !members.contains(&self.core.id) {
self.is_stepping_down = true;
}
self.consensus_state = ConsensusState::Joint {
is_committed: false,
};
self.consensus_state = ConsensusState::Joint { is_committed: false };
self.core.membership.members_after_consensus = Some(members);

// Propagate the command as any other client request.
Expand Down Expand Up @@ -309,16 +291,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

/// Handle the commitment of a uniform consensus cluster configuration.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn handle_uniform_consensus_committed(
&mut self,
index: u64,
) -> Result<(), RaftError> {
pub(super) async fn handle_uniform_consensus_committed(&mut self, index: u64) -> Result<(), RaftError> {
// Step down if needed.
if self.is_stepping_down {
tracing::debug!("raft node is stepping down");
self.core.set_target_state(State::NonVoter);
self.core
.update_current_leader(UpdateCurrentLeader::Unknown);
self.core.update_current_leader(UpdateCurrentLeader::Unknown);
return Ok(());
}

Expand Down Expand Up @@ -348,12 +326,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
tracing::debug!("nodes_to_remove: {:?}", nodes_to_remove);

for node in nodes_to_remove {
tracing::debug!(
{ target = node },
"removing target node from replication pool"
);
tracing::debug!({ target = node }, "removing target node from replication pool");
if let Some(node) = self.nodes.remove(&node) {
let _ = node.replstream.repltx.send(RaftEvent::Terminate);
let _ = node.replstream.repl_tx.send(RaftEvent::Terminate);
}
}
self.core.report_metrics();
Expand Down
24 changes: 8 additions & 16 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local
// log info, then replication is g2g.
let msg_prev_index_is_min = msg.prev_log_index == u64::min_value();
let msg_index_and_term_match = (msg.prev_log_index == self.last_log_index)
&& (msg.prev_log_term == self.last_log_term);
let msg_index_and_term_match =
(msg.prev_log_index == self.last_log_index) && (msg.prev_log_term == self.last_log_term);
if msg_prev_index_is_min || msg_index_and_term_match {
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
Expand All @@ -79,7 +79,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
//// Begin Log Consistency Check ////
tracing::trace!("begin log consistency check");

// Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its result.
// Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its
// result.
let entries = self
.storage
.get_log_entries(msg.prev_log_index, msg.prev_log_index + 1)
Expand Down Expand Up @@ -113,11 +114,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.delete_logs_from(target_entry.index + 1, None)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let membership = self
.storage
.get_membership_config()
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let membership =
self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
self.update_membership(membership)?;
}
}
Expand All @@ -134,10 +132,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.get_log_entries(start, msg.prev_log_index)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let opt = match old_entries
.iter()
.find(|entry| entry.term == msg.prev_log_term)
{
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log_term) {
Some(entry) => Some(ConflictOpt {
term: entry.term,
index: entry.index,
Expand Down Expand Up @@ -193,10 +188,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
};

// Replicate entries to log (same as append, but in follower mode).
self.storage
.replicate_to_log(entries)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
self.storage.replicate_to_log(entries).await.map_err(|err| self.map_fatal_storage_error(err))?;
if let Some(entry) = entries.last() {
self.last_log_index = entry.index;
self.last_log_term = entry.term;
Expand Down
Loading

0 comments on commit 4d1a03c

Please sign in to comment.