Skip to content

Commit

Permalink
Change: rename State to ServerState
Browse files Browse the repository at this point in the history
- fix: #286
  • Loading branch information
drmingdrmer committed Apr 17, 2022
1 parent aa67db3 commit 30b485b
Show file tree
Hide file tree
Showing 42 changed files with 184 additions and 167 deletions.
4 changes: 2 additions & 2 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::warn;

use crate::core::LeaderState;
use crate::core::LearnerState;
use crate::core::State;
use crate::core::ServerState;
use crate::entry::EntryRef;
use crate::error::AddLearnerError;
use crate::error::ChangeMembershipError;
Expand Down Expand Up @@ -317,7 +317,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
tracing::debug!("raft node is stepping down");

// TODO(xp): transfer leadership
self.core.set_target_state(State::Learner);
self.core.set_target_state(ServerState::Learner);
return;
}

Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::core::apply_to_state_machine;
use crate::core::RaftCore;
use crate::core::State;
use crate::core::ServerState;
use crate::error::AppendEntriesError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
Expand Down Expand Up @@ -42,8 +42,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.save_vote().await?;

// If not follower, become follower.
if !self.engine.state.target_state.is_follower() && !self.engine.state.target_state.is_learner() {
self.set_target_state(State::Follower); // State update will emit metrics.
if !self.engine.state.server_state.is_follower() && !self.engine.state.server_state.is_learner() {
self.set_target_state(ServerState::Follower); // State update will emit metrics.
}

self.engine.metrics_flags.set_cluster_changed();
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::Instrument;

use crate::core::apply_to_state_machine;
use crate::core::LeaderState;
use crate::core::State;
use crate::core::ServerState;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::QuorumNotEnough;
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
// TODO(xp): deal with storage error
self.core.save_vote().await.unwrap();
// TODO(xp): if receives error about a higher term, it should stop at once?
self.core.set_target_state(State::Follower);
self.core.set_target_state(ServerState::Follower);
}

granted.insert(target);
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use tokio::io::AsyncWriteExt;

use crate::core::purge_applied_logs;
use crate::core::RaftCore;
use crate::core::ServerState;
use crate::core::SnapshotState;
use crate::core::State;
use crate::error::InstallSnapshotError;
use crate::error::SnapshotMismatch;
use crate::raft::InstallSnapshotRequest;
Expand Down Expand Up @@ -49,8 +49,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.save_vote().await?;

// If not follower, become follower.
if !self.engine.state.target_state.is_follower() && !self.engine.state.target_state.is_learner() {
self.set_target_state(State::Follower); // State update will emit metrics.
if !self.engine.state.server_state.is_follower() && !self.engine.state.server_state.is_learner() {
self.set_target_state(ServerState::Follower); // State update will emit metrics.
}

self.engine.metrics_flags.set_data_changed();
Expand Down
92 changes: 47 additions & 45 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,29 +298,29 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
const IS_VOTER: bool = true;
const IS_LEARNER: bool = false;

self.engine.state.target_state = match (has_log, single, is_voter) {
self.engine.state.server_state = match (has_log, single, is_voter) {
// A restarted raft that already received some logs but was not yet added to a cluster.
// It should remain in Learner state, not Follower.
(HAS_LOG, SINGLE, IS_LEARNER) => State::Learner,
(HAS_LOG, MULTI, IS_LEARNER) => State::Learner,
(HAS_LOG, SINGLE, IS_LEARNER) => ServerState::Learner,
(HAS_LOG, MULTI, IS_LEARNER) => ServerState::Learner,

(NO_LOG, SINGLE, IS_LEARNER) => State::Learner, // impossible: no logs but there are other members.
(NO_LOG, MULTI, IS_LEARNER) => State::Learner, // impossible: no logs but there are other members.
(NO_LOG, SINGLE, IS_LEARNER) => ServerState::Learner, // impossible: no logs but there are other members.
(NO_LOG, MULTI, IS_LEARNER) => ServerState::Learner, // impossible: no logs but there are other members.

// If this is the only configured member and there is live state, then this is
// a single-node cluster. Become leader.
(HAS_LOG, SINGLE, IS_VOTER) => State::Leader,
(HAS_LOG, SINGLE, IS_VOTER) => ServerState::Leader,

// The initial state when a raft is created from empty store.
(NO_LOG, SINGLE, IS_VOTER) => State::Learner,
(NO_LOG, SINGLE, IS_VOTER) => ServerState::Learner,

// Otherwise it is Follower.
(HAS_LOG, MULTI, IS_VOTER) => State::Follower,
(HAS_LOG, MULTI, IS_VOTER) => ServerState::Follower,

(NO_LOG, MULTI, IS_VOTER) => State::Follower, // impossible: no logs but there are other members.
(NO_LOG, MULTI, IS_VOTER) => ServerState::Follower, // impossible: no logs but there are other members.
};

if self.engine.state.target_state == State::Follower {
if self.engine.state.server_state == ServerState::Follower {
// Here we use a 30 second overhead on the initial next_election_timeout. This is because we need
// to ensure that restarted nodes don't disrupt a stable cluster by timing out and driving up their
// term before network communication is established.
Expand All @@ -329,19 +329,19 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.next_election_timeout = Some(inst);
}

tracing::debug!("id={} target_state: {:?}", self.id, self.engine.state.target_state);
tracing::debug!("id={} target_state: {:?}", self.id, self.engine.state.server_state);

// This is central loop of the system. The Raft core assumes a few different roles based
// on cluster state. The Raft core will delegate control to the different state
// controllers and simply awaits the delegated loop to return, which will only take place
// if some error has been encountered, or if a state change is required.
loop {
match &self.engine.state.target_state {
State::Leader => LeaderState::new(self).run().await?,
State::Candidate => CandidateState::new(self).run().await?,
State::Follower => FollowerState::new(self).run().await?,
State::Learner => LearnerState::new(self).run().await?,
State::Shutdown => {
match &self.engine.state.server_state {
ServerState::Leader => LeaderState::new(self).run().await?,
ServerState::Candidate => CandidateState::new(self).run().await?,
ServerState::Follower => FollowerState::new(self).run().await?,
ServerState::Learner => LearnerState::new(self).run().await?,
ServerState::Shutdown => {
tracing::info!("node has shutdown");
return Ok(());
}
Expand Down Expand Up @@ -383,7 +383,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
snapshot: self.snapshot_last_log_id,

// --- cluster ---
state: self.engine.state.target_state,
state: self.engine.state.server_state,
current_leader: self.current_leader(),
membership_config: self.engine.state.effective_membership.clone(),

Expand Down Expand Up @@ -415,13 +415,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

/// Update core's target state, ensuring all invariants are upheld.
#[tracing::instrument(level = "trace", skip(self), fields(id=display(self.id)))]
fn set_target_state(&mut self, target_state: State) {
fn set_target_state(&mut self, target_state: ServerState) {
tracing::debug!(id = display(self.id), ?target_state, "set_target_state");

if target_state == State::Follower && !self.engine.state.effective_membership.membership.is_member(&self.id) {
self.engine.state.target_state = State::Learner;
if target_state == ServerState::Follower
&& !self.engine.state.effective_membership.membership.is_member(&self.id)
{
self.engine.state.server_state = ServerState::Learner;
} else {
self.engine.state.target_state = target_state;
self.engine.state.server_state = target_state;
}
}

Expand Down Expand Up @@ -468,13 +470,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// being removed.
self.engine.state.effective_membership = Arc::new(cfg);
if self.engine.state.effective_membership.membership.is_member(&self.id) {
if self.engine.state.target_state == State::Learner {
if self.engine.state.server_state == ServerState::Learner {
// The node is a Learner and the new config has it configured as a normal member.
// Transition to follower.
self.set_target_state(State::Follower);
self.set_target_state(ServerState::Follower);
}
} else {
self.set_target_state(State::Learner);
self.set_target_state(ServerState::Learner);
}
}

Expand Down Expand Up @@ -585,7 +587,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let id = self.engine.state.vote.node_id;

if id == self.id {
if self.engine.state.target_state == State::Leader {
if self.engine.state.server_state == ServerState::Leader {
Some(id)
} else {
None
Expand Down Expand Up @@ -698,7 +700,7 @@ pub(self) enum SnapshotUpdate<C: RaftTypeConfig> {

/// All possible states of a Raft node.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum State {
pub enum ServerState {
/// The node is completely passive; replicating entries, but neither voting nor timing out.
Learner,
/// The node is replicating logs from the leader.
Expand All @@ -711,13 +713,13 @@ pub enum State {
Shutdown,
}

impl Default for State {
impl Default for ServerState {
fn default() -> Self {
Self::Follower
}
}

impl State {
impl ServerState {
/// Check if currently in learner state.
pub fn is_learner(&self) -> bool {
matches!(self, Self::Learner)
Expand Down Expand Up @@ -822,11 +824,11 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
self.core.report_metrics(Update::Update(Some(self.replication_metrics.clone())));

loop {
if !self.core.engine.state.target_state.is_leader() {
if !self.core.engine.state.server_state.is_leader() {
tracing::info!(
"id={} state becomes: {:?}",
self.core.id,
self.core.engine.state.target_state
self.core.engine.state.server_state
);

// implicit drop replication_rx
Expand Down Expand Up @@ -854,7 +856,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

Ok(_) = &mut self.core.rx_shutdown => {
tracing::info!("leader recv from rx_shudown");
self.core.set_target_state(State::Shutdown);
self.core.set_target_state(ServerState::Shutdown);
}
}
}
Expand Down Expand Up @@ -896,7 +898,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
tx,
} => self.change_membership(members, blocking, turn_to_learner, tx).await?,
RaftMsg::ExternalRequest { req } => {
req(State::Leader, &mut self.core.storage, &mut self.core.network);
req(ServerState::Leader, &mut self.core.storage, &mut self.core.network);
}
};

Expand Down Expand Up @@ -978,7 +980,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida
self.core.report_metrics(Update::Update(None));

loop {
if !self.core.engine.state.target_state.is_candidate() {
if !self.core.engine.state.server_state.is_candidate() {
return Ok(());
}

Expand All @@ -1002,7 +1004,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida
self.core.id,
)
.await?;
if !self.core.engine.state.target_state.is_candidate() {
if !self.core.engine.state.server_state.is_candidate() {
return Ok(());
}

Expand All @@ -1011,7 +1013,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida

// Inner processing loop for this Raft state.
loop {
if !self.core.engine.state.target_state.is_candidate() {
if !self.core.engine.state.server_state.is_candidate() {
return Ok(());
}

Expand All @@ -1033,7 +1035,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida

Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),

Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(ServerState::Shutdown),
}
}
}
Expand Down Expand Up @@ -1068,7 +1070,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::ExternalRequest { req } => {
req(State::Candidate, &mut self.core.storage, &mut self.core.network);
req(ServerState::Candidate, &mut self.core.storage, &mut self.core.network);
}
};
Ok(())
Expand Down Expand Up @@ -1106,7 +1108,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Followe
self.core.report_metrics(Update::Update(None));

loop {
if !self.core.engine.state.target_state.is_follower() {
if !self.core.engine.state.server_state.is_follower() {
return Ok(());
}

Expand All @@ -1119,7 +1121,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Followe
// If an election timeout is hit, then we need to transition to candidate.
_ = election_timeout => {
tracing::debug!("timeout to recv a event, change to CandidateState");
self.core.set_target_state(State::Candidate)
self.core.set_target_state(ServerState::Candidate)
},

Some((msg,span)) = self.core.rx_api.recv() => {
Expand All @@ -1128,7 +1130,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Followe

Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),

Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(ServerState::Shutdown),
}
}
}
Expand Down Expand Up @@ -1163,7 +1165,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Followe
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::ExternalRequest { req } => {
req(State::Follower, &mut self.core.storage, &mut self.core.network);
req(ServerState::Follower, &mut self.core.storage, &mut self.core.network);
}
};
Ok(())
Expand Down Expand Up @@ -1200,7 +1202,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Learner
self.core.report_metrics(Update::Update(None));

loop {
if !self.core.engine.state.target_state.is_learner() {
if !self.core.engine.state.server_state.is_learner() {
return Ok(());
}

Expand All @@ -1219,7 +1221,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Learner
self.core.update_snapshot_state(update);
},

Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(ServerState::Shutdown),
}
}
}
Expand Down Expand Up @@ -1255,7 +1257,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Learner
self.core.reject_with_forward_to_leader(tx);
}
RaftMsg::ExternalRequest { req } => {
req(State::Learner, &mut self.core.storage, &mut self.core.network);
req(ServerState::Learner, &mut self.core.storage, &mut self.core.network);
}
};
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use tracing_futures::Instrument;
use crate::config::SnapshotPolicy;
use crate::core::LeaderState;
use crate::core::ReplicationState;
use crate::core::ServerState;
use crate::core::SnapshotState;
use crate::core::State;
use crate::error::AddLearnerError;
use crate::metrics::UpdateMatchedLogId;
use crate::raft::AddLearnerResponse;
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
self.handle_needs_snapshot(must_include, tx).await?;
}
ReplicaEvent::Shutdown => {
self.core.set_target_state(State::Shutdown);
self.core.set_target_state(ServerState::Shutdown);
}
};

Expand All @@ -93,7 +93,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
if vote > self.core.engine.state.vote {
self.core.engine.state.vote = vote;
self.core.save_vote().await?;
self.core.set_target_state(State::Follower);
self.core.set_target_state(ServerState::Follower);
}
Ok(())
}
Expand Down

0 comments on commit 30b485b

Please sign in to comment.